In [1]:
# imports 
import os
import pandas as pd
import joblib
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score,classification_report
import json
from scipy.sparse import hstack
from rich import print
NEW_DATA_FILE = "new_data.csv"  # New data for retraining, Am using a file but i take it if it could be connected to a warehous with inflowing data that would be best


# func to load for me data
def load_data(file_path):
    try:
        if not os.path.exists(file_path):
            print(f"File {file_path} not found.")
            return None
        df = pd.read_csv(file_path, usecols=[
                                        "HH Income + Production/Day (USD)",  # numeric
                                        "most_recommend_rtv_program",  # categorical
                                        "least_recommend_rtv_program",  # categorical
                                        "most_recommend_rtv_program_reason",  # string
                                        "least_recommend_rtv_program_reason"  # string
                                        ])
        return df
    except pd.errors.EmptyDataError:
        print(f"File {file_path} is empty.")
        return None
    except pd.errors.ParserError:
        print(f"Error parsing file {file_path}.")
        return None
    except Exception as e:
        print(f"An error occurred while loading the data: {e}")
        return None


# Func to  Clean Data
def preprocess_data(df, training=True):
    """
    Preprocess the data by imputing missing values, encoding categorical variables,
    scaling numerical features, and extracting text features using TF-IDF.

    Parameters:
    df (pd.DataFrame): The input dataframe containing the features.
    training (bool): Flag indicating whether the function is being used for training or not.

    Returns:
    X_final (pd.DataFrame): The preprocessed feature dataframe.
    y (pd.Series): The target variable.
    preprocessor (ColumnTransformer): The preprocessor object for numerical and categorical data.
    vectorizer (TfidfVectorizer): The vectorizer object for text data.
    """
    # Define target
    df['risk_target'] = (df['HH Income + Production/Day (USD)'] < 2).astype(int)

    # Separate features & target
    X = df.drop(columns=['risk_target'])
    y = df['risk_target']

    # Identify categorical, numerical, and text columns
    num_cols = ['HH Income + Production/Day (USD)']
    cat_cols = ['most_recommend_rtv_program', 'least_recommend_rtv_program']
    text_cols = ['most_recommend_rtv_program_reason', 'least_recommend_rtv_program_reason']

    # Fill missing values
    num_imputer = SimpleImputer(strategy="median")
    cat_imputer = SimpleImputer(strategy="most_frequent")

    # Encoding & Scaling
    onehot_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
    scaler = StandardScaler()

    # Text Feature Extraction
    vectorizer = TfidfVectorizer(max_features=100)

    # Pipeline for transformations
    num_pipeline = Pipeline([("imputer", num_imputer), ("scaler", scaler)])
    cat_pipeline = Pipeline([("imputer", cat_imputer), ("encoder", onehot_encoder)])

    # Apply transformations
    preprocessor = ColumnTransformer([
        ("num", num_pipeline, num_cols),
        ("cat", cat_pipeline, cat_cols)
    ])

    # Transform data
    X_processed = preprocessor.fit_transform(X) if training else preprocessor.transform(X)

    # Process text features
    text_features = vectorizer.fit_transform(X[text_cols].fillna("Unknown").astype(str).apply(lambda row: " ".join(row), axis=1)).toarray()
    text_feature_names = vectorizer.get_feature_names_out()
    text_df = pd.DataFrame(text_features, columns=text_feature_names)

    # Convert transformed data to DataFrame
    transformed_columns = preprocessor.get_feature_names_out()
    X_processed_df = pd.DataFrame(X_processed, columns=transformed_columns)

    # Combine all features
    X_final = pd.concat([X_processed_df, text_df], axis=1)

    return X_final, y, preprocessor, vectorizer  # Return transformations for future use



# Func to Train Models

def train_and_select_best_model(X_train, y_train, X_test, y_test):
    param_grids = {
        "Random Forest": {
            "model": RandomForestClassifier(random_state=42),
            "params": {
                "n_estimators": [50, 100, 200],
                "max_depth": [None, 10, 20],
                "min_samples_split": [2, 5],
                "min_samples_leaf": [1, 2]
            }
        },
        "Logistic Regression": {
            "model": LogisticRegression(max_iter=500),
            "params": {
                "C": [0.1, 1, 10],
                "solver": ["liblinear", "lbfgs"]
            }
        },
        "Gradient Boosting": {
            "model": GradientBoostingClassifier(random_state=42),
            "params": {
                "n_estimators": [50, 100, 200],
                "learning_rate": [0.01, 0.1, 0.2],
                "max_depth": [3, 5, 10]
            }
        },
        "Support Vector Machine": {
            "model": SVC(),
            "params": {
                "C": [0.1, 1, 10],
                "kernel": ["linear", "rbf"]
            }
        },
        "K-Nearest Neighbors": {
            "model": KNeighborsClassifier(),
            "params": {
                "n_neighbors": [3, 5, 7],
                "weights": ["uniform", "distance"]
            }
        },
        "Neural Network": {
            "model": MLPClassifier(max_iter=500),
            "params": {
                "hidden_layer_sizes": [(50,), (100,), (50, 50)],
                "activation": ["relu", "tanh"],
                "alpha": [0.0001, 0.001]
            }
        }
    }
    
    best_model = None
    best_score = 0
    best_name = ""
    best_params = {}

    for name, config in param_grids.items():
        model = config["model"]
        params = config["params"]

        grid_search = GridSearchCV(model, params, cv=5, scoring="accuracy", n_jobs=-1)
        grid_search.fit(X_train, y_train)

        model_best_params = grid_search.best_params_
        best_model_cv = grid_search.best_estimator_
        best_score_cv = grid_search.best_score_

        # Evaluate on the test set
        test_score = best_model_cv.score(X_test, y_test)

        print(f"{name}: Best Accuracy (CV) = {best_score_cv:.4f} | Test Accuracy = {test_score:.4f} | Best Params = {model_best_params}")
        # testing each model with classification report
        y_pred = grid_search.predict(X_test)
        print(f"\n📋 {name} Classification Report:\n{classification_report(y_test, y_pred)}")

        if test_score > best_score:
            best_score = test_score
            best_model = best_model_cv
            best_name = name
            best_params = model_best_params

    # Save best model parameters to JSON file
    with open("best_model_params.json", "w") as f:
        json.dump({"model": best_name, "accuracy": best_score, "params": best_params}, f, indent=4)

    print(f"✅ Best Model: {best_name} with Accuracy = {best_score:.4f} and Test Accuracy = {test_score:.4f}")
    print(f"📄 Best Parameters saved in 'best_model_params.json'")

    return best_model


# func to Evaluate Models
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)  # Predict on the test set

    print("📊 Model Evaluation on Test Set:")
    print(f"✅ Accuracy: {accuracy_score(y_test, y_pred):.4f} | 🎯 Precision: {precision_score(y_test, y_pred, average='weighted'):.4f} | 🔄 Recall: {recall_score(y_test, y_pred, average='weighted'):.4f} | 🏆 F1 Score: {f1_score(y_test, y_pred, average='weighted'):.4f}")



# Func to Automate Retraining
def retrain_if_new_data():
    if os.path.exists(NEW_DATA_FILE):
        print("New data found. Retraining model...")

        new_df = load_data(NEW_DATA_FILE)
        if new_df is None:
            print("New Data file is None. Check Data File")
            return
        else:
            print("New Data file is Found. Check Data File")
            X_new, y_new, _, _ = preprocess_data(new_df, training=False)

            # Load saved model
            model = joblib.load(MODEL_FILE)

            # Retrain model
            model.fit(X_new, y_new)

            # Save updated model
            joblib.dump(model, MODEL_FILE)
            print("Model retrained and updated.")

            # Remove new data file after training
            os.remove(NEW_DATA_FILE)
    else:
        print("No new data found. Model remains unchanged.")


# Func to Load, Transform Data before making predicitons on data       

def load_and_predict(X_new, text_column):
   # Load the preprocessor (for numerical & categorical data)
    preprocessor = joblib.load("preprocessor.pkl")
    X_transformed = preprocessor.transform(X_new)

    # Load the vectorizer (for text features)
    vectorizer = joblib.load("vectorizer.pkl")
    
    # Combine text columns as done during training
    text_series = text_column.fillna("Unknown").astype(str).apply(lambda row: " ".join(row), axis=1)
    X_text_transformed = vectorizer.transform(text_series)

    # Combine both numerical & text features
    from scipy.sparse import hstack
    X_final = hstack([X_transformed, X_text_transformed])
    
    # Get the feature names from the preprocessor and vectorizer
    pre_cols = preprocessor.get_feature_names_out()
    vec_cols = vectorizer.get_feature_names_out()
    combined_cols = np.concatenate([pre_cols, vec_cols])
    
    # Convert the combined sparse matrix to a dense DataFrame with proper column names
    X_final_df = pd.DataFrame(X_final.todense(), columns=combined_cols)
    
    # Load the trained model
    model = joblib.load("best_model.pkl")
    
    # Make predictions using a DataFrame with valid feature names
    predictions = model.predict(X_final_df)
    
    return predictions

# To map the printed predictions to human readable format
def map_predictions(predictions):
    mapping = {0: "Not at risk", 1: "At risk"}
    mapped_predictions = [mapping[pred] for pred in predictions]

    print(mapped_predictions)



# My Main Function with entire flow logic
def main():
    
    # Global Variables for file paths
    DATA_FILE = r"C:\Users\M D\Desktop\HRTV test\interview_dataset.csv"  # Main dataset
    MODEL_FILE = "best_model.pkl" # Trained model file
    PREPROCESSOR_FILE ="preprocessor.pkl" # Preprocessor file for data transformation
    VECTORIZER_FILE = "vectorizer.pkl" # Text vectorizer file

    
    # Load data into memory
    df = load_data(DATA_FILE)
    if df is None:
        print("Data Frame is None. Check Dataframe")
        exit()  # Stop execution if no data is found
    else:
        # Preprocess data
        X, y, preprocessor, vectorizer = preprocess_data(df)
        
        # Split into training and test sets (80% train, 20% test)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True , stratify=y)
        
        # Train and select the best model
        best_model = train_and_select_best_model(X_train, y_train, X_test, y_test)
        
        # Evaluate model
        evaluate_model(best_model, X_test, y_test)

        # Save best model & preprocessors
        joblib.dump(best_model, MODEL_FILE)
        joblib.dump(preprocessor, PREPROCESSOR_FILE)
        joblib.dump(vectorizer, VECTORIZER_FILE)
        print("Model training complete. Best model saved.")
        
        # Retrain model if new data is available
        retrain_if_new_data()

if __name__ == "__main__":
    main()
    

In [2]:
# my small tests one unseen made up data if the model is working

X_new = pd.DataFrame({
    'HH Income + Production/Day (USD)': [1.5, 3.2, 0.8],  # Numerical data
    'most_recommend_rtv_program': [1, 3, 'food security'],  # Categorical data
    'least_recommend_rtv_program': [1, 99, 'active water'],  # Categorical data
    'most_recommend_rtv_program_reason': ['low', 'Interesting characters', 'Good reviews'],  # Text data
    'least_recommend_rtv_program_reason': ['Boring', 'Poor storyline', 'Too slow']  # Text data
})

# Ensure that both X_new and text_column have the same number of rows
X_transformed = X_new[['HH Income + Production/Day (USD)', 'most_recommend_rtv_program', 'least_recommend_rtv_program']]
text_column = X_new[['most_recommend_rtv_program_reason', 'least_recommend_rtv_program_reason']]

# Test the prediction function with this new data
predictions = load_and_predict(X_transformed, text_column)

# Map the predictions to human-readable format
map_predictions(predictions)
