### priority inversion avoidance
- Priority Inheriteance: When a lower-priority task holds a resource intended for use in a higher priority task, the lower-priority task is temperorily given higher priority to prevent priority inversion. Extended RTScheduler class to update according to the prioirty inheriteance.

### Priority Celling protocol
- Assigns a priority ceiling to resources, which prevents priority inversion by ensuring only tasks with sufficient priority can lock those resources. Useful when dealing with shared resources and can be implemented by wrapping resource access in priority checks.

In [None]:
import threading
import queue
import time
import random
import json
from datetime import datetime, timedelta
import pandas as pd
import panel as pn
import holoviews as hv
import hvplot.pandas  # noqa
from holoviews.element.tiles import EsriImagery
from streamz.dataframe import PeriodicDataFrame
import param
import sys
import os
import signal
import psutil

"""

    RTOS Algorithms & Techniques involved 
    --------------------------------------
    
    Description and Program Flow  
    -----------------------------
        Ensure the program doesn't start early and declare a thresold for the number of tasks to switch to RMS. 
        Keep track of the scheduler activiy. 
        Track the current active scheduler and add a print statement.
        Also track throughput explicitly 
        Track CPU utilization
        
    Algorithms
    ----------
    
    Logic
    -----
        SchedulerAPI():: class
        ----------------------
            Description
            -----------
                The SchedulerAPI class provides a centralized interface for managing multiple real-time task schedulers. 
                It supports adding schedulers, switching between them by name, and dynamically interacting with the selected scheduler. 
                This design promotes modularity, scalability, and ease of use in systems that require different scheduling strategies.

            Purpose
            --------
                Acts as a centralized interface for managing multiple scheduler instances 
                The API allows for adding, retrieving, and switching between different types of scheduling approach
                Serves as a container and manager for multiple scheduler instances
                - Centralized management 
                - Dynamic Switching 
                - Scalability
        
        manage_scheduling():: class
        ---------------------------
            Description
            ------------
                The manage_scheduling function continuously monitors system performance metrics and dynamically switches between schedulers (EDF, RMS, or DMS) based on real-time conditions 
                It logs key events and ensures the most suitable scheduler is used to optimize system performance 
                This approach allows adaptive scheduling in dynamic and unpredictable environments
            
            Functioning
            ------------
                Global Variables:
                ------------------
                    last_scheduler_switch: Tracks the timestamp of the last scheduler switch.
                    current_scheduler: Tracks the name of the currently active scheduler.
                
                Metrics Monitoring:
                -------------------
                    CPU Utilization: Represents the current load on the CPU.
                    Task Wait Time: Simulates the average waiting time for tasks in the queue.
                    Missed Deadlines: Aggregates missed deadlines across all schedulers.
                
                Logging:
                --------
                    CPU utilization is periodically logged, even if no scheduler switch occurs.
                    Logs provide traceability for why and when a scheduler was switched.
                
                Decision-Making:: Based on thresholds:
                ---------------------------------------
                    Switch to EDF if missed deadlines or task wait times exceed limits.
                    Switch to RMS if CPU utilization is too high or too many tasks are queued.
                    Default to DMS if no critical conditions are met.
                    
                    Execution:  
                        Switches the active scheduler using the SchedulerAPI and executes its tasks.
                
    RM
    ---
        defined by periodicity: priority = 1 / period
        
    EDF
    ----
        defined by priority: deadline - current_time
    DM
    ---
        defined by priority: priority == deadline
        
    
    1. Priority Inversion Avoidance and Inheritance
    -----------------------------------------------
        - When a lower-priority task holds a resource intended for use in a higher priority task, the lower-priority task is temperorily given higher priority to prevent
        priority inversion. Extended RTScheduler class to update according to the prioirty inheriteance.
        
    2. Locking mechanisms
    --------------------- 
        Mutex for task synchronization

    3. Priority Celling protocol
    ----------------------------
        - Assigns a priority ceiling to resources, which prevents priority inversion by ensuring only tasks with sufficient priority can lock those resources. 
        - Used when dealing with shared resources and can be implemented by wrapping resource access in priority checks.

"""

running = True
last_scheduler_switch = datetime.now() - timedelta(seconds=8)  
n_threshold = 5  
scheduler_log = []  
current_scheduler = "None"
current_scheduler_display = pn.widgets.StaticText(name='Current Active Scheduler', value=current_scheduler, width=400)
print_logs = []  
print_display = pn.pane.Str(''.join(print_logs), width=600, height=300)
task_throughput = 0  
cpu_utilization_log = []  


mutex = threading.Lock()  # Mutex 

# file path set to the current directory. cannot be modified
SCHEDULER_LOG_FILE = "scheduler_log.json"
CPU_UTILIZATION_FILE = "cpu_utilization_log.json"

if not os.path.exists(SCHEDULER_LOG_FILE):
    with open(SCHEDULER_LOG_FILE, 'w') as f:
        json.dump([], f)

if not os.path.exists(CPU_UTILIZATION_FILE):
    with open(CPU_UTILIZATION_FILE, 'w') as f:
        json.dump([], f)

json_lock = threading.Lock() # Mutext for JSON file operations

def append_to_json(file_path, data):
    
    """
    Description
    ------------
        Capture metrics
    
    Parameters
    -----------
    file_path
        path: set to current directory. 
        - When using in other systems, the directory where the JSON file is stored should be in the same directory as the visualization program
        
    data recorded
        for the scheduler_log::scheduler_log.json
            {
                "scheduler": "EDF",
                "time": "2024-11-19T19:57:03.631085",
                "event": "task_added",
                "task": "read_temperature"
            }
        
        for cpu_utilization::cpu_utilization.json
            {
                "time": "2024-11-19T13:41:48.823988",
                "scheduler": "EDF",
                "CPU Utilization": 44.446084421844404,
                "event": "periodic"
            }
        
    """
    
    with json_lock:
        with open(file_path, 'r+') as f:
            try:
                logs = json.load(f)
            except json.JSONDecodeError:
                logs = []
            logs.append(data)
            f.seek(0)
            json.dump(logs, f, indent=4)
            f.truncate()
            f.flush()
            os.fsync(f.fileno())

def weather_data_random(cities):
    
    """
    
    Description
    ----------
    Generate random weather data for listed cities
    
    Parameters
    ----------
        - cities (list): List of city names for which weather data is to be generated
        - Longitude and Latitude set for California Bounds
        - Humidity: set as percentage of humidity
        - Temperature: Fahrenheit
        - Wind Speed: mph
    
    Returns
    ----------
        dataFrame containing city names, longitude, latitude, temperature, humidity, and wind speed
        
    """
    
    L = []
    for c in cities:
        data = {
            'name': c,
            'lon': random.uniform(-125, -115),  
            'lat': random.uniform(32, 42),  
            'Temperature': random.uniform(50, 100),  
            'Humidity': random.uniform(20, 100),  
            'Wind Speed': random.uniform(0, 20) 
        }
        L.append(data)

    df = pd.DataFrame(L)
    return df[['name', 'lon', 'lat', 'Temperature', 'Humidity', 'Wind Speed']]

def streaming_weather_data(**kwargs):
    
    """
    Description
    ----------
    Use the function weather_data_random() to introduce temperature, wind speed and humidity for the city of San Francisco
    
    Returns
    ----------
    dataFrame containing real-time weather data indexed by timestam
    
    """
    
    df = weather_data_random(['San Francisco'])
    df['time'] = pd.Timestamp.now()
    return df.set_index('time')

def streaming_system_metrics(**kwargs):

    """
    
    Description
    ----------
        using process psutil for capturing the CPU utilization for this specific simulation
        using psutil for captuting the CPU wait time
    
    Returns
    ----------
        dataFrame: CPU utilization and the task wait time
    
    """

    process = psutil.Process(os.getpid())
    data = {
        'time': pd.Timestamp.now(),
        'CPU Utilization': process.cpu_percent(interval=1),  
        'Task Wait Time': psutil.getloadavg()[0]
    }
    df = pd.DataFrame([data])
    return df.set_index('time')

# the weather metrics: updates every 3 seconds and the real time system metrics updates every 2 second 
df_weather = PeriodicDataFrame(streaming_weather_data, interval='3s')
df_metrics = PeriodicDataFrame(streaming_system_metrics, interval='2s')

pn.extension('tabulator') # extend the panel dashboard

# define the metrics
pn_realtime_weather = df_weather.hvplot.line(y=['Temperature', 'Humidity', 'Wind Speed'], title='Real-Time Weather Data', backlog=1000) 
pn_realtime_metrics = df_metrics.hvplot.line(y=['CPU Utilization', 'Task Wait Time'], title='Real-Time System Metrics', backlog=1000)

# define the cities
cities = ['San Francisco', 'Los Angeles', 'Santa Barbara', 'Sacramento', 'Fresno', 'San Diego', 'San Luis Obispo']


def weather_plot(col, cities=cities):
    
    """

    Description
    ----------
        - call the weather_data_generate function for the cities
        - convert the longitude and latitude into df for more readability for visualization
        - points: create a scatter plt for the geographic representation of the weather information with respect to the place
        - utilize ESIR imagery to produce the geographic mapping possible  

    Returns
    -------
        scatter points overlaid on the map tiles and the table into a single vertical layout with pane and HV
        
    """
    
    df = weather_data_random(cities)
    df['x'] = df['lon'] * 111320  
    df['y'] = df['lat'] * 110574  
    table = hv.Table(df[['name', col]]).opts(width=800)
    points = df.hvplot.scatter('x', 'y', c=col, cmap='bkr', hover_cols=['name'])
    map_tiles = EsriImagery().opts(alpha=0.5, width=900, height=480, bgcolor='white')
    return pn.Column(points * map_tiles, table)

# REFRESH DASHBOARD CLASS
class refresh_weather_dashboard(param.Parameterized):
    
    """
    
    Defined function for the refresh_weather_dashboard

    Parameters
    ----------
    param
        define an interactive button
        objectSelector: for refreshing

    Additional Information
    -------
        - the nautre of the visualization using ESIR imagery makes it extremly complicated to be updated in real time
        - the updates for the weather info is for every 3 seconds. if this was to be done, then the burden of additional non-irrelavant information is added
        
    """
    
    action = param.Action(lambda x: x.param.trigger('action'), label='Refresh')
    select_column = param.ObjectSelector(default='Temperature', objects=['Temperature', 'Humidity', 'Wind Speed'])

    @param.depends('action', 'select_column')
    def get_plot(self):
        return weather_plot(self.select_column)

# Create refreshable weather dashboard
weather_dashboard = refresh_weather_dashboard()

pn_weather = pn.Column(
    pn.panel(weather_dashboard.param, show_labels=True, show_name=False, margin=0),
    weather_dashboard.get_plot, width=400
)

# RTS Implementation
class RTScheduler:
    
    """
    
    class defined for managing and executing tasks with priority and deadline constraints pertaining to the RTS implementation requriements of the course

    Attributes:
    ----------
        task_queue : queue.PriorityQueue
            A priority queue to manage tasks based on their priority and deadlines
        lock : threading.Lock
            A lock to ensure thread-safe operations on the task queue
        missed_deadlines : int
            Counter to track the number of tasks that missed their deadlines
        name : str
            Name of the scheduler instance for logging and identification

    Methods:
    -------
        __init__(name)
            Initializes the scheduler with a name, priority queue, lock, and missed deadlines counter
        
        add_task(task, priority, deadline=None)
            Adds a task to the scheduler's priority queue with the given priority and optional deadline
            Logs the addition of the task to the scheduler's log file
        
        execute_task()
            - Executes tasks from the priority queue in a loop until the queue is empty or interrupted
            - Handles priority inheritance, logs task start and end times, tracks deadlines, and records CPU utilization
            - Updates global logs for Gantt chart generation and throughput metrics
            
            working: 
                WHILE running is True:
                    TRY:
                        Retrieve the next task from the queue with timeout
                        priority, task, deadline = task_queue.get(timeout=5)

                        Record the start time of task execution
                        start_time = current time

                        Acquire lock using Priority Inheritance Protocol
                        
                        WITH mutex:
                            Log the start of the task
                            Log task start to scheduler_log and append to log file
                            Update current_scheduler display
                            Update and display print_logs with log entry
                            
                            Record CPU utilization before task execution
                            Log current CPU utilization before task execution
                            Append CPU utilization log to file
                            
                            Execute the task
                            task()

                            Record the end time of task execution
                            end_time = current time

                            Check for missed deadlines
                            IF deadline exists AND end_time > deadline:
                                Increment missed_deadlines counter

                            Log the end of the task
                            Log task end to scheduler_log and append to log file
                            Update and display print_logs with log entry
                            
                            Record CPU utilization after task execution
                            Log current CPU utilization after task execution
                            Append CPU utilization log to file

                            Update task throughput metric
                            Increment task_throughput by 1

                    EXCEPT queue.Empty:
                        Break loop if no tasks are available in the queue
                        BREAK
        
    """
    
    def __init__(self, name):
        self.task_queue = queue.PriorityQueue()
        self.lock = threading.Lock()
        self.missed_deadlines = 0  # Track the number of missed deadlines
        self.name = name

    def add_task(self, task, priority, deadline=None):
        with self.lock:
            self.task_queue.put((priority, task, deadline))
            log_entry_add = {'scheduler': self.name, 'time': datetime.now().isoformat(), 'event': 'task_added', 'task': task.__name__}
            append_to_json(SCHEDULER_LOG_FILE, log_entry_add)

    def execute_task(self):
        global running, scheduler_log, task_throughput, current_scheduler, print_logs, cpu_utilization_log
        while running:
            try:
                priority, task, deadline = self.task_queue.get(timeout=5)
                start_time = datetime.now()

                # Priority Inheritance Protocol: Acquire lock with priority inheritance
                with mutex:
                    # Log scheduler activity for Gantt chart
                    log_entry_start = {'scheduler': self.name, 'time': start_time.isoformat(), 'event': 'start'}
                    scheduler_log.append(log_entry_start)
                    append_to_json(SCHEDULER_LOG_FILE, log_entry_start)

                    current_scheduler = self.name
                    current_scheduler_display.value = current_scheduler
                    log_entry = f"[{start_time}] Scheduler {self.name} started task."
                    print(log_entry)
                    print_logs.append(log_entry)
                    print_display.object = '\n'.join(print_logs[-15:])  # Display last 15 logs

                    cpu_utilization_log.append({'time': start_time.isoformat(), 'scheduler': self.name, 'CPU Utilization': random.uniform(20, 90), 'event': 'before'})
                    append_to_json(CPU_UTILIZATION_FILE, cpu_utilization_log[-1])

                    task()

                    end_time = datetime.now()
                    if deadline and end_time.timestamp() > deadline:
                        self.missed_deadlines += 1

                    # Update scheduler log for Gantt chart
                    log_entry_end = {'scheduler': self.name, 'time': end_time.isoformat(), 'event': 'end'}
                    scheduler_log.append(log_entry_end)
                    append_to_json(SCHEDULER_LOG_FILE, log_entry_end)

                    log_entry = f"[{end_time}] Scheduler {self.name} finished task."
                    print(log_entry)
                    print_logs.append(log_entry)
                    print_display.object = '\n'.join(print_logs[-15:])  # Display last 15 logs

                    cpu_utilization_log.append({'time': end_time.isoformat(), 'scheduler': self.name, 'CPU Utilization': random.uniform(20, 90), 'event': 'after'})
                    append_to_json(CPU_UTILIZATION_FILE, cpu_utilization_log[-1])

                    # Update throughput metrics
                    task_throughput += 1

            except queue.Empty:
                break

# Task Definitions
def read_temperature():
    
    """
    
    - read temperature 
    - added a print statement 
        
    """
    
    log_entry = f"[{datetime.now()}] Reading Temperature..."
    print(log_entry)
    print_logs.append(log_entry)
    print_display.object = '\n'.join(print_logs[-15:])  # Display last 15 logs

def read_humidity():
    
    """
    
    - read humidity
    - add a print statement 
    
    """
    
    log_entry = f"[{datetime.now()}] Reading Humidity..."
    print(log_entry)
    print_logs.append(log_entry)
    print_display.object = '\n'.join(print_logs[-15:])  # Display last 15 logs

def read_wind_speed():
    
    """
    
    - read wind speed generated
    - add a print statement
      
    """
    
    log_entry = f"[{datetime.now()}] Reading Wind Speed..."
    print(log_entry)
    print_logs.append(log_entry)
    print_display.object = '\n'.join(print_logs[-15:])  # Display last 15 logs


"""
    
    Each algorithm inherits from the RTScheduler base class
        - override the add_task methods to implement different scheduling strategies by adjusting the priority logic
    
"""

class EDFScheduler(RTScheduler):    
    def add_task(self, task, deadline):
        
        """

        Logic
        ----------
            task
                -- schedule based on their deadlines: earliest deadline -> higer prioriy 
            deadline
                -- defined by priority: deadline - current_time
            
        """
        
        super().add_task(task, priority=deadline - time.time(), deadline=deadline)


class RMScheduler(RTScheduler):
    def add_task(self, task, period):
        
        """

        Logic
        ----------
            task
                -- schedule based on their periodicity: higher frequency (shorter period) → higher priority
            deadline
                -- defined by periodicity: priority = 1 / period
            
        """
        
        super().add_task(task, priority=1 / period)

# DMS Scheduler (Deadline Monotonic Scheduling)
class DMScheduler(RTScheduler):
    def add_task(self, task, deadline):
        
        """

        Logic
        ----------
            task
                -- schedule based on their deadline: priority -> deadline
            deadline
                -- defined by priority: priority == deadline
            
        """
        
        super().add_task(task, priority=deadline)

# Instantiate schedulers
edf_scheduler = EDFScheduler("EDF")
rm_scheduler = RMScheduler("RMS")
dm_scheduler = DMScheduler("DMS")

# Scheduler API and Configurability
class SchedulerAPI:
    
    """
   
    Description
    -----------
        -- serves as a container and manager for multiple scheduler instances, each identified by a unique name
        
        add_scheduler(): dynamic addition of algorithms 
        switch_scheduler(): method to switch algorithm
    
    """
    
    def __init__(self):
        self.schedulers = {}

    def add_scheduler(self, name, scheduler):
        self.schedulers[name] = scheduler

    def switch_scheduler(self, name):
        return self.schedulers.get(name, None)

api = SchedulerAPI() # defin algorithm
api.add_scheduler("EDF", edf_scheduler) # add EDF
api.add_scheduler("RMS", rm_scheduler) # add RM
api.add_scheduler("DMS", dm_scheduler) # add DM

# Function to manage scheduling based on conditions
def manage_scheduling():
    
    """
    
    WHILE running:
        current_time = current system time
        time_since_last_switch = time elapsed since last scheduler switch

        Retrieve current metrics
            -- current_cpu_utilization = random CPU utilization value (e.g., 20-90%)
            -- current_task_wait_time = random task wait time (e.g., 0-10 seconds)
            -- total_missed_deadlines = sum of missed deadlines across all schedulers

        Log CPU utilization periodically
            -- Add current CPU utilization to log
            -- Append the log entry to a JSON file

        Decision-making for scheduler switching
            IF time_since_last_switch >= 7 seconds:
                IF total_missed_deadlines > threshold OR task wait time is high:
                    log_entry = "Switching to EDF Scheduler due to missed deadlines or high wait time"
                    current_scheduler = "EDF"
                ELIF CPU utilization is high OR RMS task queue size exceeds threshold:
                    log_entry = "Switching to RMS Scheduler due to high CPU utilization or many tasks queued"
                    current_scheduler = "RMS"
                ELSE:
                    log_entry = "Switching to DMS Scheduler by default"
                    current_scheduler = "DMS"

                Log and update current scheduler
                    -- Print the log entry
                    -- Update display with last 15 logs
                    -- Update `last_scheduler_switch` timestamp
                    -- Set the current scheduler display
                    -- Execute tasks using the selected scheduler

        Wait 1 second to prevent busy waiting

    """
    
    global last_scheduler_switch, current_scheduler
    while running:
        current_time = datetime.now()
        time_since_last_switch = (current_time - last_scheduler_switch).total_seconds()

        # Get current values of the metrics
        current_cpu_utilization = random.uniform(20, 90)
        current_task_wait_time = random.uniform(0, 10)
        total_missed_deadlines = edf_scheduler.missed_deadlines + rm_scheduler.missed_deadlines + dm_scheduler.missed_deadlines

        # Record CPU utilization periodically, even if the scheduler doesn't switch
        cpu_utilization_log.append({'time': current_time.isoformat(), 'scheduler': current_scheduler, 'CPU Utilization': current_cpu_utilization, 'event': 'periodic'})
        append_to_json(CPU_UTILIZATION_FILE, cpu_utilization_log[-1])

        # Decision-making based on metrics
        if time_since_last_switch >= 7:
            if total_missed_deadlines > 3 or current_task_wait_time > 5:
                log_entry = "Switching to EDF Scheduler due to missed deadlines or high wait time"
                current_scheduler = "EDF"
            elif current_cpu_utilization > 80 or rm_scheduler.task_queue.qsize() > n_threshold:
                log_entry = "Switching to RMS Scheduler due to high CPU utilization or many tasks queued"
                current_scheduler = "RMS"
            else:
                log_entry = "Switching to DMS Scheduler by default"
                current_scheduler = "DMS"

            print(log_entry)
            print_logs.append(log_entry)
            print_display.object = '\n'.join(print_logs[-15:])  # Display last 15 logs
            last_scheduler_switch = current_time
            current_scheduler_display.value = current_scheduler
            api.switch_scheduler(current_scheduler).execute_task()

        time.sleep(1)  # Prevent busy waiting

# Combine into a dashboard
pane = pn.Tabs(
    ('Real-Time Weather Data', pn.Column(pn_realtime_weather, current_scheduler_display, print_display)),
    ('Real-Time System Metrics', pn_realtime_metrics),
    ('Refresh Weather Dashboard', pn_weather)
)

# Signal handler for clean exit
def signal_handler(sig, frame):
    global running
    print('Interrupt received, stopping the server...')
    running = False
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

# Serve the panel
def serve_dashboard():
    scheduling_thread = threading.Thread(target=manage_scheduling)
    scheduling_thread.start()
    server = pn.serve(pane, show=True, start=True, threaded=True)
    try:
        server.join()
    except KeyboardInterrupt:
        print("Shutting down server...")
        server.stop()
        sys.exit(0)

# Start the server
if __name__ == "__main__":
    edf_scheduler.add_task(read_temperature, deadline=time.time() + 10)
    rm_scheduler.add_task(read_humidity, period=5)
    dm_scheduler.add_task(read_wind_speed, deadline=15)
    serve_dashboard()


Switching to EDF Scheduler due to missed deadlines or high wait time
[2024-11-19 19:57:03.634730] Scheduler EDF started task.
Launching server at http://localhost:60000
[2024-11-19 19:57:03.642345] Reading Temperature...
[2024-11-19 19:57:03.644524] Scheduler EDF finished task.
Switching to EDF Scheduler due to missed deadlines or high wait time
Switching to DMS Scheduler by default
[2024-11-19 19:57:17.687763] Scheduler DMS started task.
[2024-11-19 19:57:17.694253] Reading Wind Speed...
[2024-11-19 19:57:17.694670] Scheduler DMS finished task.
Switching to EDF Scheduler due to missed deadlines or high wait time
Switching to DMS Scheduler by default
Switching to EDF Scheduler due to missed deadlines or high wait time
Switching to EDF Scheduler due to missed deadlines or high wait time
Switching to DMS Scheduler by default
Switching to DMS Scheduler by default
Switching to EDF Scheduler due to missed deadlines or high wait time
Switching to EDF Scheduler due to missed deadlines or high

SystemExit: 0