<a href="https://colab.research.google.com/github/rodchimb/rodchimb/blob/main/Python_Automation_with_Schedule.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# network_monitor_scheduler.py

"""
A network monitoring script that is automated using the Python 'schedule' library.
This script will run indefinitely, checking devices at a specified interval.

Prerequisites:
- The previous monitoring script functions.
- pip install schedule
"""

from napalm import get_network_driver
import json
import datetime
import time
import schedule
import logging
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- InfluxDB Configuration (same as before) ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "your_influxdb_token"
INFLUXDB_ORG = "your_influxdb_org"
INFLUXDB_BUCKET = "network_metrics"

# --- Device and Alerting Configuration (same as before) ---
DEVICES = [
    {
        'hostname': 'cisco_iosxe_device_ip',
        'vendor': 'ios',
        'username': 'your_cisco_username',
        'password': 'your_cisco_password',
        'optional_args': {
            'port': 830,
        }
    },
    {
        'hostname': 'juniper_device_ip',
        'vendor': 'junos',
        'username': 'your_juniper_username',
        'password': 'your_juniper_password',
        'optional_args': {
            'port': 830,
        }
    }
]

CPU_ALERT_THRESHOLD = 80 # in percent

# --- Functions from previous scripts ---

def get_device_info(device_info):
    """Connects to a single device and retrieves operational data."""
    # (function body is the same as in the original script)
    hostname = device_info['hostname']
    vendor = device_info['vendor']
    username = device_info['username']
    password = device_info['password']
    optional_args = device_info.get('optional_args', {})

    logging.info(f"Attempting to connect to {hostname} ({vendor})...")

    try:
        driver = get_network_driver(vendor)
        device = driver(
            hostname=hostname,
            username=username,
            password=password,
            optional_args=optional_args
        )
        device.open()

        facts = device.get_facts()
        cpu_metrics = device.get_environment()
        memory_metrics = device.get_environment()

        device.close()

        cpu_load = cpu_metrics['cpu'][0]['%usage'] if 'cpu' in cpu_metrics and cpu_metrics['cpu'] else 'N/A'

        memory_total = sum(mem.get('total_ram', 0) for mem in memory_metrics['memory'] if 'total_ram' in mem) if 'memory' in memory_metrics else 'N/A'
        memory_used = sum(mem.get('used_ram', 0) for mem in memory_metrics['memory'] if 'used_ram' in mem) if 'memory' in memory_metrics else 'N/A'
        memory_utilization = (memory_used / memory_total) * 100 if memory_total != 0 else 'N/A'

        result = {
            'timestamp': datetime.datetime.now(),
            'hostname': hostname,
            'vendor': facts.get('vendor', 'N/A'),
            'os_version': facts.get('os_version', 'N/A'),
            'uptime_seconds': facts.get('uptime', 'N/A'),
            'cpu_utilization_percent': cpu_load,
            'memory_utilization_percent': f"{memory_utilization:.2f}" if isinstance(memory_utilization, float) else memory_utilization
        }

        logging.info(f"Successfully collected data from {hostname}.")
        return result

    except Exception as e:
        logging.error(f"Failed to connect to {hostname}: {e}")
        return {
            'timestamp': datetime.datetime.now(),
            'hostname': hostname,
            'status': 'error',
            'error_message': str(e)
        }

def send_to_influxdb(data):
    """Sends a list of monitoring data points to InfluxDB."""
    client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
    write_api = client.write_api(write_options=SYNCHRONOUS)

    points = []
    for record in data:
        if record.get('status') == 'error':
            continue

        cpu_point = Point("device_metrics").tag("hostname", record['hostname']).tag("vendor", record['vendor']).field("cpu_utilization_percent", float(record['cpu_utilization_percent'])).time(record['timestamp'], WritePrecision.NS)
        points.append(cpu_point)

        if isinstance(record['memory_utilization_percent'], (int, float)):
            memory_point = Point("device_metrics").tag("hostname", record['hostname']).tag("vendor", record['vendor']).field("memory_utilization_percent", float(record['memory_utilization_percent'])).time(record['timestamp'], WritePrecision.NS)
            points.append(memory_point)

    if points:
        try:
            write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
            logging.info("Successfully wrote monitoring data to InfluxDB.")
        except Exception as e:
            logging.error(f"Failed to write to InfluxDB: {e}")
    else:
        logging.warning("No valid data points to write to InfluxDB.")

    client.close()

def job():
    """
    The main job to be scheduled. This function performs the monitoring
    and data sending.
    """
    logging.info("Starting scheduled network monitoring job...")
    monitoring_results = []
    for device_config in DEVICES:
        result = get_device_info(device_config)
        monitoring_results.append(result)

    send_to_influxdb(monitoring_results)
    logging.info("Scheduled job finished.")

# --- Scheduling the Job ---
# Run the job every 5 minutes
schedule.every(5).minutes.do(job)

logging.info("Scheduler started. Monitoring job will run every 5 minutes...")

# The main loop that keeps the script running and checks the schedule
while True:
    schedule.run_pending()
    time.sleep(1) # Wait one second before checking the schedule again

ModuleNotFoundError: No module named 'napalm'

In [None]:
!pip install napalm