<a href="https://colab.research.google.com/github/nojramu/ALS_Thesis/blob/main/ALS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Adaptive Learning System
Made by: Engr. Marjon D. Umbay  
Created for paper article in compliance of the subject of:  
* Numerical Methods and Techniques  
* Technopreneurship and Innovation

## Random Forest Regression and Classification

### Preprosessor of Data

In [1]:
import pandas as pd

def load_and_preprocess_data(csv_file_path=None, data=None):
    """
    Loads data from a CSV file or accepts a DataFrame and performs preprocessing,
    including initial cleaning.

    Args:
      csv_file_path (str, optional): The path to the CSV file containing the data.
                                     If None, 'data' must be provided.
      data (pd.DataFrame, optional): A DataFrame containing the data.
                                   If None, 'csv_file_path' must be provided.

    Returns:
      Cleaned and preprocessed pandas DataFrame, or None if an error occurs,
      required columns are missing, or both csv_file_path and data are None.
    """
    df = None
    if csv_file_path:
        # Load the data from CSV
        try:
            df = pd.read_csv(csv_file_path)
        except FileNotFoundError:
            print(f"Error: File not found at {csv_file_path}")
            return None
        except Exception as e:
            print(f"Error loading CSV file: {e}")
            return None
    elif data is not None:
        # Use the provided DataFrame
        df = data.copy() # Work on a copy to avoid modifying the original
    else:
        print("Error: Either csv_file_path or data must be provided.")
        return None


    if df is None:
        return None

    # Check if required columns exist
    required_features = ['engagement_rate', 'time_on_task_s', 'hint_ratio', 'interaction_count',
                         'task_completed', 'quiz_score', 'difficulty', 'error_rate',
                         'task_timed_out', 'time_before_hint_used']

    # If it's training data, also check for target columns
    is_training_data = 'engagement_level' in df.columns and 'cognitive_load' in df.columns
    if is_training_data:
         required_cols = required_features + ['engagement_level', 'cognitive_load']
    else:
         required_cols = required_features # Only features are required for new data


    if not all(col in df.columns for col in required_cols):
        missing_cols = [col for col in required_cols if col not in df.columns]
        print(f"Error: Missing required columns in the data: {missing_cols}")
        return None

    # Preprocess the data: Convert to numeric and fill missing values
    # Apply to columns expected to be numeric
    numeric_cols_to_fill = [
        'engagement_rate', 'time_on_task_s', 'hint_ratio', 'interaction_count',
        'quiz_score', 'difficulty', 'error_rate', 'time_before_hint_used'
    ]
    if is_training_data:
        numeric_cols_to_fill.append('cognitive_load') # Include cognitive_load for training data


    for col in numeric_cols_to_fill:
         if col in df.columns: # Check if column exists before processing
            df[col] = pd.to_numeric(df[col], errors='coerce')
            df[col] = df[col].fillna(df[col].median()) # Use median for robustness

    # Convert boolean/integer columns to integer (if they exist and are not already numeric)
    int_cols = ['task_completed', 'task_timed_out']
    if is_training_data:
        int_cols.append('engagement_level') # Include engagement_level for training data

    for col in int_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
            if col in ['task_completed', 'task_timed_out']:
                 df[col] = df[col].fillna(0).astype(int)
            elif col == 'engagement_level':
                 df[col] = df[col].fillna(df[col].median()).astype(int)

    return df

### Random Forest Training

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.metrics import mean_squared_error, accuracy_score
import numpy as np

def train_cognitive_and_engagement_models(csv_file_path, test_set_size=0.2, random_state_value=20, n_estimators_value=100):
  """
  Trains Random Forest models for predicting cognitive load and engagement level
  from a CSV file.

  Args:
    csv_file_path (str): The path to the CSV file containing the data.
    test_set_size (float, optional): The proportion of the dataset to include in the test split. Defaults to 0.2.
    random_state_value (int, optional): Controls the shuffling applied to the data before applying the split.
    n_estimators_value (int, optional): The number of trees in the forest.

  Returns:
    tuple: A tuple containing two trained models:
           - rf_cognitive_load (RandomForestRegressor): Trained model for predicting cognitive load.
           - rf_engagement_level (RandomForestClassifier): Trained model for predicting engagement level.
           Returns (None, None) if data loading or preprocessing fails.
    dict: A dictionary containing the feature names used for training.
  """

  # Load and preprocess the data using the unified function
  df_processed = load_and_preprocess_data(csv_file_path=csv_file_path)

  if df_processed is None:
      return None, None, None # Return None for models and features

  # Define features (X) and targets (y) - 'difficulty' is now a numeric feature
  target_cols = ['cognitive_load', 'engagement_level']
  # Ensure 'difficulty' is included in features if it exists after processing
  features = [col for col in df_processed.columns if col not in target_cols]

  # Ensure all feature columns exist after preprocessing
  # Note: This check is redundant if load_and_preprocess_data already checked required_features
  # but keeping for safety after feature definition.
  if not all(col in df_processed.columns for col in features):
       missing_features = [col for col in features if col not in df_processed.columns]
       print(f"Error: Defined features are missing after preprocessing: {missing_features}")
       return None, None, None


  X = df_processed[features]
  y_cognitive_load = df_processed['cognitive_load']
  y_engagement_level = df_processed['engagement_level']

  # Split data into training and testing sets (optional but good practice)
  # Keeping the original structure for now to match the existing code's apparent intention.
  X_train, X_test, y_cognitive_load_train, y_cognitive_load_test = train_test_split(
      X, y_cognitive_load, test_size=test_set_size, random_state=random_state_value)
  # Split for engagement level - ensure stratification if it's a classification problem and classes are imbalanced
  X_train, X_test, y_engagement_level_train, y_engagement_level_test = train_test_split(
      X, y_engagement_level, test_size=test_set_size, random_state=random_state_value, stratify=y_engagement_level)


  # Initialize and train the Random Forest Regressor for cognitive load
  rf_cognitive_load = RandomForestRegressor(n_estimators=n_estimators_value, random_state=random_state_value)
  rf_cognitive_load.fit(X, y_cognitive_load) # Using all data for training

  # Initialize and train the Random Forest Classifier for engagement level
  rf_engagement_level = RandomForestClassifier(n_estimators=n_estimators_value, random_state=random_state_value)
  rf_engagement_level.fit(X, y_engagement_level) # Using all data for training


  # Evaluate the models on the test set
  cognitive_load_predictions = rf_cognitive_load.predict(X_test)
  print(f"Cognitive Load MSE: {mean_squared_error(y_cognitive_load_test, cognitive_load_predictions)}")

  engagement_level_predictions = rf_engagement_level.predict(X_test)
  print(f"Engagement Level Accuracy: {accuracy_score(y_engagement_level_test, engagement_level_predictions)}")


  # Return trained models and the list of features used for training
  return rf_cognitive_load, rf_engagement_level, features

### Sample Tester

In [3]:
csv_file_path = '/content/drive/MyDrive/The Paper/Numerical/Code/training_data_v2.csv'
train_cognitive_and_engagement_models(csv_file_path, 0.2, 20, 100)

Cognitive Load MSE: 241.2075066445227
Engagement Level Accuracy: 1.0


(RandomForestRegressor(random_state=20),
 RandomForestClassifier(random_state=20),
 ['engagement_rate',
  'time_on_task_s',
  'hint_ratio',
  'interaction_count',
  'task_completed',
  'quiz_score',
  'difficulty',
  'error_rate',
  'task_timed_out',
  'time_before_hint_used'])

### Random Forest Predictor

In [4]:
import pandas as pd
import numpy as np

def predict_cognitive_load_and_engagement(models, feature_names, new_data_path=None, new_data_df=None, new_data_list=None):
  """
  Makes predictions for cognitive load and engagement level using trained models.

  Args:
    models (tuple): A tuple containing the trained cognitive load regressor
                    and engagement level classifier (returned by train_cognitive_and_engagement_models).
    feature_names (list): A list of the feature names the models were trained on.
                          This is returned by train_cognitive_and_engagement_models.
    new_data_path (str, optional): The path to a CSV file containing the new data for prediction.
                                   Either new_data_path, new_data_df, or new_data_list must be provided.
    new_data_df (pd.DataFrame, optional): A DataFrame containing the new data for prediction.
                                        Either new_data_path, new_data_df, or new_data_list must be provided.
    new_data_list (list, optional): A list of values representing a single data point for prediction.
                                    The order of values should match the training features.
                                    Either new_data_path, new_data_df, or new_data_list must be provided.

  Returns:
    tuple: A tuple containing:
           - predicted_cognitive_load (np.array): Predicted cognitive load values.
           - predicted_engagement_level (np.array): Predicted engagement level values.
           Returns (None, None) if models are not provided or data is invalid.
  """
  if models is None or len(models) != 2 or feature_names is None:
    print("Error: Invalid models or feature names provided.")
    return None, None

  if new_data_path is None and new_data_df is None and new_data_list is None:
      print("Error: Either new_data_path, new_data_df, or new_data_list must be provided.")
      return None, None

  rf_cognitive_load, rf_engagement_level = models
  train_features = feature_names # Use feature_names passed from training

  if new_data_list is not None:
      if len(new_data_list) != len(train_features):
          print(f"Error: The number of values in new_data_list ({len(new_data_list)}) does not match the number of training features ({len(train_features)}).")
          return None, None
      # Create a DataFrame from the list, using the training feature names as columns
      # Ensure correct data types here if possible, though load_and_preprocess_data handles conversion
      new_data_for_pred = pd.DataFrame([new_data_list], columns=train_features)

  elif new_data_path is not None:
      # Use the unified load_and_preprocess_data function
      new_data_for_pred = load_and_preprocess_data(csv_file_path=new_data_path)
      if new_data_for_pred is None:
           print(f"Error loading or preprocessing data from {new_data_path}")
           return None, None

  elif new_data_df is not None:
      new_data_for_pred = new_data_df.copy() # Work on a copy

  else:
      return None, None # Should not happen based on checks above


  # Ensure new_data_for_pred has the same columns as the training data
  # Add missing columns from training data with a value of 0
  for col in train_features:
      if col not in new_data_for_pred.columns:
          new_data_for_pred[col] = 0

  # Ensure columns are in the same order as the training data
  # This is crucial for consistent predictions
  try:
      new_data_for_pred = new_data_for_pred[train_features]
  except KeyError as e:
      print(f"Error: Feature '{e}' from training data not found in preprocessed new data.")
      # This might happen if a required original column was missing in the new_data input
      return None, None


  try:
      predicted_cognitive_load = rf_cognitive_load.predict(new_data_for_pred)
      predicted_engagement_level = rf_engagement_level.predict(new_data_for_pred)
      return predicted_cognitive_load, predicted_engagement_level
  except Exception as e:
      print(f"Error during prediction: {e}")
      return None, None

### Prediction Tester

In [5]:
# Define the path to your CSV file (Assuming you have mounted Google Drive and know the path)
# Replace with the actual path to your CSV file if it's different
csv_file_path = '/content/drive/MyDrive/The Paper/Numerical/Code/training_data_v2.csv'

# Train the models and capture the returned models and feature names
# Updated to unpack 3 values as returned by the function
trained_cognitive_model, trained_engagement_model, trained_feature_names = train_cognitive_and_engagement_models(csv_file_path)

# Combine the two trained models into a tuple to pass to the predict function
trained_models = (trained_cognitive_model, trained_engagement_model)


# Example new data for prediction as a DataFrame with original columns, including 'difficulty' as a number (0-10)
# The load_and_preprocess_data function will handle the conversion to numeric and filling missing values.
new_data_point_original_df = pd.DataFrame({
    'engagement_rate': [0.85], # 0-1
    'time_on_task_s': [501], # seconds
    'hint_ratio': [0.67], # 0-1
    'interaction_count': [14],
    'task_completed': [0],  # Can be int (0/1) or boolean (False/True)
    'quiz_score': [89.11], # 0-100
    'difficulty': [2], # Provide the difficulty as a number between 0-10
    'error_rate': [0.59], # 0-1
    'task_timed_out': [0], # Can be int (0/1) or boolean (False/True)
    'time_before_hint_used': [199] # The longer the better
})

# Make predictions using the original new data DataFrame and the captured models and feature names
if trained_models is not None and trained_feature_names is not None:
    predicted_cognitive_load, predicted_engagement_level = predict_cognitive_load_and_engagement(
        models=trained_models,
        feature_names=trained_feature_names,
        new_data_df=new_data_point_original_df # Pass the original new data DataFrame
    )

    if predicted_cognitive_load is not None and predicted_engagement_level is not None:
        print("Predicted Cognitive Load:", predicted_cognitive_load)
        print("Predicted Engagement Level:", predicted_engagement_level)
    else:
        print("Prediction failed.")
else:
    print("Models are not trained. Please check the training process.")

Cognitive Load MSE: 241.2075066445227
Engagement Level Accuracy: 1.0
Predicted Cognitive Load: [70.20109]
Predicted Engagement Level: [5]


## Kalman's Filter

### Loading Sample Predictions

In [19]:
import pandas as pd
csv_file_path2 = '/content/drive/MyDrive/The Paper/Numerical/Code/sample_predictions.csv'
try:
    df_predictions = pd.read_csv(csv_file_path2)
    display(df_predictions.head())
    df_predictions.info()
except FileNotFoundError:
    print(f"Error: File not found at {csv_file_path2}")
except Exception as e:
    print(f"Error loading CSV file: {e}")

Unnamed: 0,cognitive_load,engagement_level
0,75.869,4
1,75.2602,5
2,86.8593,5
3,80.767,5
4,80.0618,4


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 2 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   cognitive_load    100 non-null    float64
 1   engagement_level  100 non-null    int64  
dtypes: float64(1), int64(1)
memory usage: 1.7 KB


### Plot Cognitive Load by Row Number

In [20]:
import plotly.express as px

# Create a line plot of cognitive_load against the row index
fig_line_cognitive_load = px.line(df_predictions, y='cognitive_load',
                                  title='Cognitive Load by Row')
fig_line_cognitive_load.update_layout(xaxis_title='row')
fig_line_cognitive_load.show()

### Kalman Filter

In [22]:
import numpy as np

def apply_kalman_filter(measurements, initial_covariance=1000.0, process_noise=0.1, measurement_noise=1.0):
    """
    Applies a Kalman filter to a sequence of measurements with adjustable parameters.

    Args:
        measurements (np.array): A 1D numpy array of measurements.
        initial_covariance (float, optional): The initial uncertainty in the state estimate (P). Defaults to 1000.0.
        process_noise (float, optional): The covariance of the process noise (Q). Defaults to 0.1.
        measurement_noise (float, optional): The covariance of the measurement noise (R). Defaults to 1.0.


    Returns:
        np.array: A 1D numpy array of smoothed values.
    """
    n_measurements = len(measurements)
    smoothed_values = np.zeros(n_measurements)

    # Initialize state estimate and covariance
    # State [x] - the estimated true value
    # Covariance [P] - the uncertainty in the state estimate
    x_hat = 0.0  # Initial state estimate
    P = initial_covariance # Initial covariance (high uncertainty)

    # Define system parameters (simplified for a static system with noise)
    # State transition matrix [A] - assumes the state doesn't change over time
    A = 1.0
    # Control input matrix [B] - no control input
    B = 0.0
    # Measurement matrix [H] - relates the state to the measurement
    H = 1.0
    # Process noise covariance [Q] - uncertainty in the system model
    Q = process_noise # Small value assuming the true value is relatively stable
    # Measurement noise covariance [R] - uncertainty in the measurements
    R = measurement_noise # Adjust based on expected measurement noise

    for k in range(n_measurements):
        # Prediction Step
        # Predicted state estimate
        x_hat_minus = A * x_hat + B * 0 # No control input (u=0)
        # Predicted covariance
        P_minus = A * P * A + Q

        # Update Step
        # Kalman Gain
        K = P_minus * H / (H * P_minus * H + R)

        # Updated state estimate
        x_hat = x_hat_minus + K * (measurements[k] - H * x_hat_minus)

        # Updated covariance
        P = (1 - K * H) * P_minus

        # Store the smoothed value
        smoothed_values[k] = x_hat

    return smoothed_values

### Loading Measurement and Storing Smooth values

In [23]:
cognitive_load_measurements = df_predictions['cognitive_load'].values
smoothed_cognitive_load = apply_kalman_filter(cognitive_load_measurements)
df_predictions['smoothed_cognitive_load'] = smoothed_cognitive_load
display(df_predictions.head())

Unnamed: 0,cognitive_load,engagement_level,smoothed_cognitive_load
0,75.869,4,75.793214
1,75.2602,5,75.514137
2,86.8593,5,79.871568
3,80.767,5,80.163641
4,80.0618,4,80.133208


### Plot Smoothed Results

In [26]:
import plotly.graph_objects as go

# Create a Plotly figure object
fig = go.Figure()

# Add a scatter trace for the original 'cognitive_load' data
fig.add_trace(go.Scatter(y=df_predictions['cognitive_load'],
                         mode='lines',
                         name='Original Cognitive Load'))

# Add a scatter trace for the 'smoothed_cognitive_load' data
fig.add_trace(go.Scatter(y=df_predictions['smoothed_cognitive_load'],
                         mode='lines',
                         name='Smoothed Cognitive Load'))

# Update the layout with title and axis labels
fig.update_layout(title='Original vs. Smoothed Cognitive Load',
                  xaxis_title='Row Number',
                  yaxis_title='Cognitive Load',
                  legend=dict(
                      orientation="h",
                      yanchor="bottom",
                      y=-0.2,
                      xanchor="center",
                      x=0.5
                  ))

# Display the Plotly figure
fig.show()

## Simmon's Rule Integration

### Simpson's Rule Function

In [45]:
import numpy as np

def simpsons_rule(y, h):
    """
    Applies Simpson's Rule for numerical integration.

    Args:
        y (np.array): A 1D numpy array of function values (the data points).
        h (float): The step size (the distance between consecutive data points).

    Returns:
        float: The approximate value of the integral.
        None: If an error occurs (e.g., less than 3 points after dropping).
    """
    n = len(y)

    # Check if the number of data points is even and drop the first point if necessary
    if n % 2 == 0:
        print("Number of data points is even. Dropping the first data point to apply Simpson's Rule.")
        y = y[1:]
        n = len(y) # Update n after dropping the point

    if n < 3:
        print("Error: Simpson's Rule requires at least 3 points (after potential dropping).")
        return None

    integral = y[0] + y[n-1]
    for i in range(1, n - 1, 2):
        integral += 4 * y[i]
    for i in range(2, n - 2, 2):
        integral += 2 * y[i]

    integral = integral * h / 3
    return integral

### Simmon's Rule Tester

In [57]:

# Assuming the data points are equally spaced by 1 (since they are row numbers)
# If the data represents a time series with a different time step, 'h' should be that time step.
h = 3
cognitive_load_values = df_predictions['smoothed_cognitive_load'].values


# Apply Simpson's Rule
simpsons_integral = simpsons_rule(cognitive_load_values, h)

if simpsons_integral is not None:
    print(f"Approximate integral of smoothed cognitive load using Simpson's Rule: {simpsons_integral}")

Number of data points is even. Dropping the first data point to apply Simpson's Rule.
Approximate integral of smoothed cognitive load using Simpson's Rule: 24016.36171630266


### Discretize Simpson's Rule into Buckets

In [64]:
def discretize_simpsons_result(simpsons_integral_value, num_buckets=5):
    """
    Discretizes the result of Simpson's Rule into a specified number of buckets
    and returns the bucket number as an integer.

    Args:
        simpsons_integral_value (float): The result from the simpsons_rule function.
        num_buckets (int, optional): The number of buckets to discretize into.
                                     Defaults to 5. Must be at least 2.

    Returns:
        int: The bucket number (1-based index).
             Returns None if simpsons_integral_value is None or num_buckets < 2.
    """
    if simpsons_integral_value is None or num_buckets < 2:
        print("Error: Invalid input for discretization.")
        return None

    # Determine the range of the integral values (assuming non-negative, adjust if needed)
    # This assumes the minimum possible integral is 0.
    # A more robust approach would use actual data range if available,
    # but for a single integral value, we'll use a hypothetical range or percentiles.
    # For simplicity here, we'll assume a conceptual range based on the expected nature of the integral.
    # A better approach would be to use the range of values seen in a dataset.
    # Let's use a simplistic approach based on the value itself relative to a potential max.
    # A more realistic scenario would need a defined range based on the problem context.

    # --- A more robust approach would define a range based on historical data or expected bounds ---
    # For this example, let's create a range dynamically based on the input value.
    # This is a simplification and might not be ideal for all scenarios.
    # A fixed, data-driven range is usually better.
    # If the integral is small, the range might be small. If it's large, the range is large.
    # Let's assume a plausible maximum for illustrative purposes or base it on the data used for integration.
    # Using the range of 'smoothed_cognitive_load' as a hint for the integral's scale:
    min_possible_integral = 0  # Assuming non-negative cognitive load
    # A rough estimate of max integral: max_smoothed_load * number of steps * h
    # Assuming max smoothed load is around 10 (as seen in plots) and steps are len(y), h=3
    max_possible_integral = np.max(cognitive_load_values) * len(cognitive_load_values) * h if cognitive_load_values is not None and len(cognitive_load_values) > 0 else 1000

    # If simpsons_integral_value exceeds this rough max, adjust the max
    if simpsons_integral_value > max_possible_integral:
        max_possible_integral = simpsons_integral_value * 1.1 # Add a little buffer


    # Ensure min is less than max for bucket creation
    if min_possible_integral >= max_possible_integral:
        # Fallback or handle case where data range is invalid
        print("Warning: Could not establish a valid range for discretization. Using a default range.")
        min_possible_integral = 0
        max_possible_integral = simpsons_integral_value + 1 # Use the value itself plus a small buffer


    # Define the bucket edges
    bins = np.linspace(min_possible_integral, max_possible_integral, num_buckets + 1)

    # Find which bucket the integral value falls into
    # Use pd.cut to assign the value to a bin
    # We need to put the single value into a Series or DataFrame to use pd.cut
    integral_series = pd.Series([simpsons_integral_value])
    # labels argument can be False or None to just return the bin index
    bucket_index_category = pd.cut(integral_series, bins=bins, include_lowest=True, labels=False)

    # pd.cut returns a categorical type, get the index
    # Handle the case where the value is exactly at the edge or outside the range if include_lowest is False
    # With include_lowest=True, the first bin is inclusive of the lower bound.
    # If the value is exactly the upper bound of the last bin, it's included there.
    # If somehow the value is outside the range (which shouldn't happen with dynamic max),
    # pd.cut might return NaN.
    if bucket_index_category.isnull().any():
        print(f"Warning: Simpson's integral value {simpsons_integral_value} is outside the calculated range [{min_possible_integral}, {max_possible_integral}]. Assigning to the closest bucket.")
        # Assign to the closest bucket - either the first or the last
        if simpsons_integral_value < min_possible_integral:
            bucket_index = 0
        else: # Value is greater than max_possible_integral
             bucket_index = num_buckets - 1
    else:
        bucket_index = bucket_index_category[0]


    # Return the bucket number (1-based index)
    return bucket_index + 1

### Discretization Tester

In [65]:
if simpsons_integral is not None:
    # Discretize the integral into 5 buckets
    num_buckets = 5 # Adjustable number of buckets
    bucket_number = discretize_simpsons_result(simpsons_integral, num_buckets=num_buckets)

    if bucket_number is not None:
        print(f"Simpson's Integral: {simpsons_integral}")
        print(f"Discretized Bucket Number: {bucket_number}")

    # Example with a different number of buckets
    num_buckets_alt = 7
    bucket_number_alt = discretize_simpsons_result(simpsons_integral, num_buckets=num_buckets_alt)

    if bucket_number_alt is not None:
        print(f"\nDiscretized into {num_buckets_alt} buckets:")
        print(f"Discretized Bucket Number: {bucket_number_alt}")

Simpson's Integral: 24016.36171630266
Discretized Bucket Number: 5

Discretized into 7 buckets:
Discretized Bucket Number: 6
