<a href="https://colab.research.google.com/github/kadefue/MoEST/blob/main/MoEST_Refactored_Primary_School_Enrollment_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import pandas as pd
import numpy as np
import xgboost as xgb
import lightgbm as lgb
from sklearn.ensemble import (
    RandomForestRegressor,
    GradientBoostingRegressor,
    HistGradientBoostingRegressor
)
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import LabelEncoder
import warnings
import os

warnings.filterwarnings('ignore')

class EnrollmentForecaster:
    """
    Object-Oriented Pipeline for Enrollment Forecasting.
    Handles data preparation, feature engineering, model selection, and recursive forecasting.
    Designed for Pre-Primary and Primary School datasets.
    """

    def __init__(self, enrollment_df, infrastructure_dfs=None, name="Model"):
        """
        Args:
            enrollment_df: DataFrame containing the main enrollment numbers.
            infrastructure_dfs: Dictionary of helper DataFrames (Dropouts, Classrooms, etc.)
            name: Name of the category (e.g., 'Primary', 'Pre-Primary')
        """
        self.raw_df = enrollment_df
        self.infra_dfs = infrastructure_dfs if infrastructure_dfs else {}
        self.name = name

        # Define the 5 models to compare
        self.models = {
            'XGBoost': xgb.XGBRegressor(n_estimators=300, max_depth=6, learning_rate=0.05, n_jobs=-1, random_state=42),
            'LightGBM': lgb.LGBMRegressor(n_estimators=300, num_leaves=31, learning_rate=0.05, verbose=-1, random_state=42),
            'RandomForest': RandomForestRegressor(n_estimators=200, max_depth=12, n_jobs=-1, random_state=42),
            'GradientBoosting': GradientBoostingRegressor(n_estimators=200, max_depth=5, learning_rate=0.05, random_state=42),
            'HistGradientBoosting': HistGradientBoostingRegressor(max_iter=200, max_depth=10, random_state=42)
        }

        self.best_models = []
        self.le_reg = LabelEncoder()
        self.le_cou = LabelEncoder()

        # Will be populated during processing
        self.final_df = None
        self.features = []

    def _standardize_columns(self, df):
        """Standardize column names to Upper Case and strip whitespace."""
        df = df.copy()
        df.columns = [str(c).strip().upper() for c in df.columns]

        # Map common variations to standard names
        rename_map = {
            'YEAR': 'YEAR', 'ACADEMIC YEAR': 'YEAR',
            'REGION': 'REGION', 'REGON': 'REGION',
            'COUNCIL': 'COUNCIL', 'DISTRICT': 'COUNCIL', 'LGA NAME': 'COUNCIL'
        }
        df.rename(columns=rename_map, inplace=True)
        return df

    def prepare_data(self):
        """
        Cleans, melts (if wide), and merges infrastructure data.
        """
        print(f"[{self.name}] Preparing Data...")
        df = self._standardize_columns(self.raw_df)

        # 1. Identify ID columns and Value columns
        id_vars = ['YEAR', 'REGION', 'COUNCIL']
        potential_values = [c for c in df.columns if c not in id_vars]

        # Heuristic: Value columns usually contain specific keywords
        value_vars = [c for c in potential_values if any(x in c for x in ['STD', 'STANDARD', 'GRADE', 'PRE', 'TOTAL', 'BOYS', 'GIRLS'])]

        if not value_vars:
             # Fallback: If no explicit grade columns found, assume 'ENROLLMENT' or use all remaining numeric columns
             numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
             value_vars = [c for c in numeric_cols if c not in id_vars]

        if not value_vars:
             raise ValueError(f"Could not identify enrollment columns in {self.name} dataset. Columns found: {df.columns}")

        # Melt to Long Format
        long_df = df.melt(id_vars=[c for c in id_vars if c in df.columns],
                          value_vars=value_vars,
                          var_name='GRADE_LEVEL',
                          value_name='ENROLLMENT')

        # Convert 'ENROLLMENT' to numeric, handling commas and missing values
        long_df['ENROLLMENT'] = pd.to_numeric(long_df['ENROLLMENT'].astype(str).str.replace(',', ''), errors='coerce')
        long_df['ENROLLMENT'] = long_df['ENROLLMENT'].fillna(0)

        # 2. Clean Grade Level & Extract Numeric Grade
        long_df['GRADE_LEVEL'] = long_df['GRADE_LEVEL'].astype(str).str.upper()

        def extract_grade_num(s):
            if 'PRE' in s: return 0
            if '1' in s or 'I' in s: return 1
            if '2' in s or 'II' in s: return 2
            if '3' in s or 'III' in s: return 3
            if '4' in s or 'IV' in s: return 4
            if '5' in s or 'V' in s: return 5
            if '6' in s or 'VI' in s: return 6
            if '7' in s or 'VII' in s: return 7
            return 0 # Default to 0 (Entry level) if unknown

        long_df['GRADE_NUM'] = long_df['GRADE_LEVEL'].apply(extract_grade_num)

        # 3. Merge Infrastructure (if available)
        keys = ['YEAR', 'REGION', 'COUNCIL']

        for name, infra_df in self.infra_dfs.items():
            if infra_df is None: continue

            infra_clean = self._standardize_columns(infra_df)

            # Simple aggregation: Sum numeric columns by Council
            numeric_cols = infra_clean.select_dtypes(include=np.number).columns.tolist()
            numeric_cols = [c for c in numeric_cols if c not in ['YEAR']]

            # Check if we can merge (must have Region/Council)
            if numeric_cols and all(k in infra_clean.columns for k in keys):
                agg_infra = infra_clean.groupby(keys)[numeric_cols].sum().reset_index()
                # Rename columns to avoid collision
                agg_infra.columns = keys + [f"{name}_{c}" for c in numeric_cols]

                long_df = long_df.merge(agg_infra, on=keys, how='left')

        # Fill missing infrastructure/enrollment with 0
        long_df = long_df.fillna(0)

        # 4. Sort for Feature Engineering
        long_df = long_df.sort_values(['REGION', 'COUNCIL', 'GRADE_NUM', 'YEAR'])
        self.final_df = long_df
        return self

    def engineer_features(self):
        """
        Creates Lags, Year-over-Year Growth, and Cohort Features.
        """
        print(f"[{self.name}] Engineering Features...")
        df = self.final_df.copy()

        # Group by Unit of Analysis
        g = df.groupby(['REGION', 'COUNCIL', 'GRADE_NUM'])

        # 1. Simple Lags
        df['LAG_1'] = g['ENROLLMENT'].shift(1)
        df['LAG_2'] = g['ENROLLMENT'].shift(2)
        df['YOY_GROWTH'] = (df['ENROLLMENT'] - df['LAG_1']) / (df['LAG_1'] + 1e-5)

        # 2. Cohort Logic (The "Flow" Feature)
        # Students in Grade N (Year T) come from Grade N-1 (Year T-1)
        df['PREV_YEAR'] = df['YEAR'] - 1
        df['PREV_GRADE'] = df['GRADE_NUM'] - 1

        # Create a Lookup Dictionary for fast access
        lookup = df.set_index(['YEAR', 'REGION', 'COUNCIL', 'GRADE_NUM'])['ENROLLMENT'].to_dict()

        def get_cohort_flow(row):
            # If Grade 1 or Pre-Primary, there is no "Previous Grade" in this system
            if row['PREV_GRADE'] < 1 and self.name == 'Primary':
                return -1 # Entry grade
            if row['GRADE_NUM'] == 0: # Pre-Primary
                return -1

            key = (row['PREV_YEAR'], row['REGION'], row['COUNCIL'], row['PREV_GRADE'])
            return lookup.get(key, -1)

        df['COHORT_LAG'] = df.apply(get_cohort_flow, axis=1)

        # 3. Encodings
        df['REGION_ENC'] = self.le_reg.fit_transform(df['REGION'].astype(str))
        df['COUNCIL_ENC'] = self.le_cou.fit_transform(df['COUNCIL'].astype(str))

        # 4. Clean up
        df = df.drop(columns=['PREV_YEAR', 'PREV_GRADE'])
        df = df.fillna(-1)

        # Define Features for Training (exclude IDs and Target)
        exclude_cols = ['YEAR', 'REGION', 'COUNCIL', 'GRADE_LEVEL', 'ENROLLMENT']
        self.features = [c for c in df.columns if c not in exclude_cols]

        self.final_df = df
        return self

    def train_and_select_best(self):
        """
        Trains all 5 models and selects the top 2 based on R2 Score.
        Now includes MAPE and Accuracy metrics.
        """
        print(f"[{self.name}] Training 5 Models & Selecting Best 2...")

        # Determine Train/Test Split
        max_year = self.final_df['YEAR'].max()
        if self.final_df['YEAR'].nunique() > 1:
            train_df = self.final_df[self.final_df['YEAR'] < max_year]
            test_df = self.final_df[self.final_df['YEAR'] == max_year]

        X_train = train_df[self.features]
        y_train = train_df['ENROLLMENT']
        X_test = test_df[self.features]
        y_test = test_df['ENROLLMENT']

        results = []

        for name, model in self.models.items():
            try:
                model.fit(X_train, y_train)
                preds = model.predict(X_test)

                # Standard Metrics
                r2 = r2_score(y_test, preds)
                mae = mean_absolute_error(y_test, preds)
                rmse = np.sqrt(mean_squared_error(y_test, preds))

                # Percentage Metrics (MAPE & Accuracy)
                # Avoid division by zero
                mask = y_test != 0
                if mask.sum() > 0:
                    mape = np.mean(np.abs((y_test[mask] - preds[mask]) / y_test[mask])) * 100
                    accuracy = 100 - mape
                else:
                    mape = np.nan
                    accuracy = np.nan

                results.append({
                    'Name': name,
                    'Model': model,
                    'R2': r2,
                    'MAE': mae,
                    'RMSE': rmse,
                    'MAPE': mape,
                    'Accuracy': accuracy
                })
                print(f"   > {name}: R2={r2:.4f}, MAE={mae:,.2f}, MAPE={mape:.2f}%, Acc={accuracy:.2f}%")
            except Exception as e:
                print(f"   > {name} failed: {e}")

        # Select Top 2 by R2
        sorted_models = sorted(results, key=lambda x: x['R2'], reverse=True)
        self.best_models = sorted_models[:2]

        print(f"   >> WINNERS for {self.name}: {self.best_models[0]['Name']} & {self.best_models[1]['Name']}")
        print("-" * 40)
        return self.best_models

    def forecast_recursive(self, start_year=2026, end_year=2030):
        """
        Generates forecasts for the specified range using the average of the top 2 models.
        Returns detailed dataframe with readable grades.
        """
        print(f"[{self.name}] Generating Recursive Forecast ({start_year}-{end_year})...")

        # Start with the latest available data
        current_data = self.final_df[self.final_df['YEAR'] == self.final_df['YEAR'].max()].copy()
        forecasts = []

        for year in range(start_year, end_year + 1):
            next_df = current_data.copy()
            next_df['YEAR'] = year

            # 1. Update Lags
            next_df['LAG_2'] = next_df['LAG_1']
            next_df['LAG_1'] = next_df['ENROLLMENT']

            # 2. Update Cohort Lag
            lookup = current_data.set_index(['REGION', 'COUNCIL', 'GRADE_NUM'])['ENROLLMENT'].to_dict()

            def update_cohort(row):
                if row['GRADE_NUM'] <= 1: return -1
                return lookup.get((row['REGION'], row['COUNCIL'], row['GRADE_NUM'] - 1), -1)

            next_df['COHORT_LAG'] = next_df.apply(update_cohort, axis=1)

            # 3. Predict using Top 2 Models
            print("Now generating the results-----Best models are: ")
            print(self.best_models[0])
            print(self.best_models[1])
            model_1 = self.best_models[0]['Model']
            model_2 = self.best_models[1]['Model']

            p1 = model_1.predict(next_df[self.features])
            p2 = model_2.predict(next_df[self.features])

            # Average and Clip
            avg_pred = (p1 + p2) / 2
            next_df['ENROLLMENT'] = np.maximum(avg_pred, 0)

            forecasts.append(next_df)
            current_data = next_df.copy()

        result_df = pd.concat(forecasts, ignore_index=True)

        # Map Grade Num back to Labels for Readability
        grade_map = {
            0: 'Pre-Primary', 1: 'Standard 1', 2: 'Standard 2', 3: 'Standard 3',
            4: 'Standard 4', 5: 'Standard 5', 6: 'Standard 6', 7: 'Standard 7'
        }
        result_df['GRADE_LABEL'] = result_df['GRADE_NUM'].map(grade_map)

        return result_df


# ==========================================
# MAIN EXECUTION BLOCK WITH GOOGLE DRIVE
# ==========================================
if __name__ == "__main__":
    loaded_dataframes = {}

    # 1. ATTEMPT TO CONNECT TO GOOGLE DRIVE
    try:
        from google.colab import drive
        print(">> Mounting Google Drive...")
        drive.mount('/content/drive/')
        IN_COLAB = True

        base_directory = '/content/drive/MyDrive/GUIDELINES_TSC_JAN2026/Data Set/csvs/'

        if os.path.exists(base_directory):
            print(f">> Loading files from: {base_directory}")
            all_files = [f for f in os.listdir(base_directory) if f.endswith('.csv')]

            # Keywords to exclude
            exclude_keywords = ['Secondary', 'Textbooks', 'Population', 'Teacher', 'COBET', 'Vocational']

            filtered_files = [f for f in all_files if not any(k.lower() in f.lower() for k in exclude_keywords)]

            for file_name in filtered_files:
                file_path = os.path.join(base_directory, file_name)
                df_name = file_name.replace('.csv', '')
                try:
                    loaded_dataframes[df_name] = pd.read_csv(file_path)
                    print(f"   Loaded: {df_name}")
                except Exception as e:
                    print(f"   Error loading {file_name}: {e}")
        else:
            print(f"Error: Directory not found: {base_directory}")

    except ImportError:
        print(">> Not running in Google Colab. Drive mounting skipped.")
        IN_COLAB = False

    # 2. CHECK DATA AND RUN PIPELINE
    if loaded_dataframes:
        # Retrieve Primary Data
        df_prim = loaded_dataframes.get("Data-Primary Enrollment 2016-2025")
        df_pre = loaded_dataframes.get("Data-Pre-Primary Enrollment 2016-2025")

        # Retrieve Infrastructure (Shared)
        infra_dict = {
            'Classrooms': loaded_dataframes.get("PRIMARY Pit Latrine AND CLASSROOMS 2017-2025"),
            'Dropout': loaded_dataframes.get("Dropout-Primary 2017-2024"),
            'Repeaters': loaded_dataframes.get("Repeaters-Primary 2017-2024")
        }

        all_forecasts = []

        # --- PRIMARY FORECAST ---
        if df_prim is not None:
            print("\n" + "="*50)
            print("   STARTING PRIMARY SCHOOL FORECAST   ")
            print("="*50)

            prim_forecaster = EnrollmentForecaster(df_prim, infra_dict, name="Primary")
            prim_forecaster.prepare_data()
            prim_forecaster.engineer_features()
            prim_forecaster.train_and_select_best()

            prim_forecast = prim_forecaster.forecast_recursive(2026, 2030)
            all_forecasts.append(prim_forecast)

        # --- PRE-PRIMARY FORECAST ---
        if df_pre is not None:
            print("\n" + "="*50)
            print("   STARTING PRE-PRIMARY SCHOOL FORECAST   ")
            print("="*50)

            pre_forecaster = EnrollmentForecaster(df_pre, infra_dict, name="Pre-Primary")
            pre_forecaster.prepare_data()
            pre_forecaster.engineer_features()
            pre_forecaster.train_and_select_best()

            pre_forecast = pre_forecaster.forecast_recursive(2026, 2030)
            all_forecasts.append(pre_forecast)

        # --- COMBINE AND FORMAT OUTPUT ---
        if all_forecasts:
            final_combined_df = pd.concat(all_forecasts, ignore_index=True)

            # Ensure sorting logic uses the numeric grade before we drop it (Pre-Primary = 0)
            final_combined_df = final_combined_df.sort_values(['YEAR', 'REGION', 'COUNCIL', 'GRADE_NUM'])

            # Select relevant columns and rename GRADE_LABEL to GRADE_LEVEL as requested
            output_cols = ['YEAR', 'REGION', 'COUNCIL', 'GRADE_LABEL', 'ENROLLMENT']
            final_output = final_combined_df[output_cols].rename(columns={'GRADE_LABEL': 'GRADE_LEVEL'})

            # Round off to integer
            final_output['ENROLLMENT'] = final_output['ENROLLMENT'].round(0).astype(int)

            print("\n" + "="*60)
            print("   COMBINED ENROLLMENT FORECAST (2026-2030)   ")
            print("   Format: YEAR  REGION  COUNCIL  GRADE_LEVEL  ENROLLMENT")
            print("="*60)

            # Displaying first 50 rows as a preview to avoid console overflow
            # In a real environment, you can export this dataframe or view more
            print(final_output.head(50).to_string(index=False))

            print(f"\n[Note] Total rows generated: {len(final_output)}")

    else:
        print("WARNING: No data loaded. Please check your Drive path or ensure you are in Colab.")

>> Mounting Google Drive...
Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).
>> Loading files from: /content/drive/MyDrive/GUIDELINES_TSC_JAN2026/Data Set/csvs/
   Loaded: Data-Pre-Primary Enrollment 2016-2025
   Loaded: Data-Primary Enrollment 2016-2025
   Loaded: Dropout-Primary 2017-2024
   Loaded: Pre-primary GER NA NER 2017-2025
   Loaded: Data-Primary repeaters 2017-2025
   Loaded: Data-Primary STD VII Leavers 2017-2025
   Loaded: Primary GIR NA NIR 2017-2025
   Loaded: PRIMARY DESK 2016-2025
   Loaded: Primary-Re_entry
   Loaded: PRE-PRIMARY - DISABALITY 2024-2025
   Loaded: PRIMARY - DISABALITY 2017-2025
   Loaded: PRIMARY Pit Latrine AND CLASSROOMS  Final 2016-2025
   Loaded: LGAs Urban and Rural Status
   Loaded: Combined_Primary_ICT_Govt
   Loaded: Combined_Primary_ICT_All_G_NG
   Loaded: Combined_Primary_Electricity_All_G_NG
   Loaded: Combined_Primary_Electricity_Govt

   STARTING PRIMARY SCH