In [None]:
# Benin EDA Notebook
# Objective: Profile, clean, and explore Benin's solar dataset end-to-end.

import os
import warnings
from typing import List

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats

warnings.filterwarnings("ignore")
plt.style.use("seaborn-v0_8")
sns.set_context("talk")

COUNTRY = "benin"
RAW_DATA_PATH = os.path.join("..", "data", f"{COUNTRY}.csv")
CLEAN_DATA_PATH = os.path.join("..", "data", f"{COUNTRY}_clean.csv")

# Columns of interest (use what exists in the dataset)
NUMERIC_CANDIDATES: List[str] = [
    "GHI", "DNI", "DHI", "Tamb", "TModA", "TModB",
    "ModA", "ModB", "WS", "WSgust", "WD", "RH", "BP"
]
TIME_COLUMN_CANDIDATES: List[str] = ["Timestamp", "timestamp", "time", "Date", "Datetime"]
CLEANING_FLAG_CANDIDATES: List[str] = ["Cleaning", "cleaned", "is_cleaned"]

def find_first_column(df: pd.DataFrame, candidates: List[str]) -> str | None:
    for c in candidates:
        if c in df.columns:
            return c
    return None

def existing_columns(df: pd.DataFrame, cols: List[str]) -> List[str]:
    return [c for c in cols if c in df.columns]

print(f"Expecting raw CSV at: {RAW_DATA_PATH}")
if not os.path.exists(RAW_DATA_PATH):
    print("WARNING: Raw data file not found. Place the country's CSV at:", RAW_DATA_PATH)



In [None]:
# Load data
try:
    df = pd.read_csv(RAW_DATA_PATH)
    print("Loaded:", df.shape)
    # Try to parse a timestamp column if present
    time_col = find_first_column(df, TIME_COLUMN_CANDIDATES)
    if time_col is not None:
        df[time_col] = pd.to_datetime(df[time_col], errors="coerce")
        df = df.sort_values(by=time_col)
    else:
        print("No time-like column found in:", TIME_COLUMN_CANDIDATES)
except FileNotFoundError:
    df = pd.DataFrame()
    print("Data not found. Proceed to place the CSV and re-run this cell.")


In [None]:
# Basic profile: head, info, dtypes
if not df.empty:
    display(df.head())
    display(df.tail())
    print("\nData types:\n", df.dtypes)
    print("\nShape:", df.shape)
else:
    print("DataFrame is empty. Load data first.")


In [None]:
# Summary statistics & missing-value report
if not df.empty:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    display(df[num_cols].describe().T)

    na_series = df.isna().sum().sort_values(ascending=False)
    na_pct = (na_series / len(df) * 100).round(2)
    missing_report = pd.DataFrame({"missing": na_series, "%": na_pct})
    display(missing_report[missing_report["%"] > 0])

    gt5 = missing_report[missing_report["%"] > 5]
    if not gt5.empty:
        print("Columns with >5% nulls:")
        display(gt5)
else:
    print("DataFrame is empty. Load data first.")


In [None]:
# Outlier detection via Z-scores and basic cleaning
if not df.empty:
    cols_to_check = existing_columns(df, [
        "GHI", "DNI", "DHI", "ModA", "ModB", "WS", "WSgust"
    ])
    print("Columns considered for Z-score outliers:", cols_to_check)

    z_df = pd.DataFrame(index=df.index)
    for c in cols_to_check:
        series = df[c].astype(float)
        z = stats.zscore(series, nan_policy='omit')
        z_df[c] = z

    # Flag rows with any |Z| > 3
    z_flag = (z_df.abs() > 3).any(axis=1)
    print("Potential outliers (any |Z|>3):", int(z_flag.sum()))

    # Create cleaned copy with median imputation for key columns
    cleaned = df.copy()
    key_cols = existing_columns(cleaned, ["GHI", "DNI", "DHI", "ModA", "ModB", "WS", "WSgust"])
    for c in key_cols:
        if cleaned[c].isna().any():
            cleaned[c] = cleaned[c].fillna(cleaned[c].median())

    display(cleaned[key_cols].describe().T)
else:
    print("DataFrame is empty. Load data first.")


In [None]:
# Export cleaned dataset
def export_clean(df_original: pd.DataFrame) -> None:
    if df_original.empty:
        print("Empty DataFrame; skipping export.")
        return
    cleaned = df_original.copy()
    key_cols = existing_columns(cleaned, ["GHI", "DNI", "DHI", "ModA", "ModB", "WS", "WSgust"])
    for c in key_cols:
        cleaned[c] = cleaned[c].fillna(cleaned[c].median())

    os.makedirs(os.path.dirname(CLEAN_DATA_PATH), exist_ok=True)
    cleaned.to_csv(CLEAN_DATA_PATH, index=False)
    print("Exported cleaned CSV to:", CLEAN_DATA_PATH)

export_clean(df)


In [None]:
# Time series analysis: GHI, DNI, DHI, Tamb vs time
if not df.empty:
    tcol = find_first_column(df, TIME_COLUMN_CANDIDATES)
    if tcol is not None and pd.api.types.is_datetime64_any_dtype(df[tcol]):
        ts_cols = existing_columns(df, ["GHI", "DNI", "DHI", "Tamb"])
        if ts_cols:
            fig, axes = plt.subplots(len(ts_cols), 1, figsize=(14, 3.2*len(ts_cols)), sharex=True)
            if len(ts_cols) == 1:
                axes = [axes]
            for ax, c in zip(axes, ts_cols):
                ax.plot(df[tcol], df[c], label=c)
                ax.set_ylabel(c)
                ax.legend(loc="upper right")
            axes[-1].set_xlabel(tcol)
            plt.suptitle("Time Series: core variables")
            plt.show()
        else:
            print("No TS columns among:", ["GHI", "DNI", "DHI", "Tamb"])    
    else:
        print("Time column not found or not datetime; skipping TS plots.")
else:
    print("DataFrame is empty. Load data first.")


In [None]:
# Cleaning impact: average ModA & ModB by Cleaning flag
if not df.empty:
    flag_col = find_first_column(df, CLEANING_FLAG_CANDIDATES)
    target_cols = existing_columns(df, ["ModA", "ModB"])
    if flag_col and target_cols:
        grp = df.groupby(flag_col)[target_cols].mean().rename_axis(flag_col)
        display(grp)
        grp.plot(kind="bar", figsize=(8,4), title="Average ModA/ModB by Cleaning flag")
        plt.ylabel("Average")
        plt.show()
    else:
        print("Cleaning flag or target cols not present; skipping.")
else:
    print("DataFrame is empty. Load data first.")
