In [12]:
import io
import json
import boto3
import itertools
import joblib
import pandas as pd
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
import uuid
from io import StringIO
from botocore.exceptions import ClientError
from datetime import datetime
from types import SimpleNamespace
from IPython.display import display

from sklearn.model_selection import train_test_split, cross_validate
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
from sklearn.preprocessing import LabelEncoder

# suppress irrelevant pandas warning
import warnings
warnings.filterwarnings('ignore', message='pandas only supports SQLAlchemy connectable')

In [13]:
class DataSource:
    def __init__(self, config):
        self.config = config
        self.connect()
    
    def connect(self):
        try:
            self.connection = psycopg2.connect(
                host=self.config.host,
                database=self.config.database,
                user=self.config.user,
                password=self.config.password,
                port=self.config.port
            )
            print("Connection to RDS successful")
        except Exception as e:
            print(f"Error connecting to RDS: {e}")
            
    def data(self, query, limit=100, offset=0):
        df = None
        try:
            query = f"SELECT * FROM ({query}) LIMIT {limit} OFFSET {offset}"
            df = pd.read_sql(query, self.connection)
        except Exception as e:
            print(f"Error querying data: {e}")
        return df
    
    def count(self, query):
        total_count = None
        try:
            count_query = f"SELECT COUNT(*) FROM ({query}) AS count_query"
            result = pd.read_sql(count_query, self.connection)
            total_count = result.iloc[0, 0]
        except Exception as e:
            print(f"Error querying count: {e}")
        return total_count

    def enrollment_query(self):
        query = """
        SELECT e.EnrollmentID, e.PersonalID, e.ProjectId, e.EntryDate, e.DateOfEngagement,
            CASE 
                WHEN e.LivingSituation = 116 THEN 'Place Not Meant For Habitation'
                WHEN e.LivingSituation = 101 THEN 'Emergency Shelter'
                WHEN e.LivingSituation = 118 THEN 'Safe Haven'
                WHEN e.LivingSituation = 215 THEN 'Foster Care Home'
                WHEN e.LivingSituation = 206 THEN 'Hospital/ Medical Facility'
                WHEN e.LivingSituation = 207 THEN 'Jail'
                WHEN e.LivingSituation = 225 THEN 'Long-term care facility'
                WHEN e.LivingSituation = 204 THEN 'Psychiatric hospital'
                WHEN e.LivingSituation = 205 THEN 'Substance abuse treatment facility'
                WHEN e.LivingSituation = 302 THEN 'Transitional Housing'
                WHEN e.LivingSituation = 329 THEN 'Halfway House'
                WHEN e.LivingSituation = 314 THEN 'Hotel/ Motel'
                WHEN e.LivingSituation = 332 THEN 'Host Home'
                WHEN e.LivingSituation = 312 THEN 'Staying or living with family, temporary tenure'
                WHEN e.LivingSituation = 313 THEN 'Staying or living with friends, temporary tenure'
                WHEN e.LivingSituation = 327 THEN 'HOPWA funded project TH'
                WHEN e.LivingSituation = 336 THEN 'Staying/ living in friends house'
                WHEN e.LivingSituation = 335 THEN 'Staying/ living in families house'
                WHEN e.LivingSituation = 422 THEN 'Staying or living with family, permanent tenure'
                WHEN e.LivingSituation = 423 THEN 'Staying or living with friends, permanent tenure'
                WHEN e.LivingSituation = 426 THEN 'HOPWA funded project PH'
                WHEN e.LivingSituation = 410 THEN 'Rental by client, no subsidy'
                WHEN e.LivingSituation = 435 THEN 'Rental by client, with subsidy'
                WHEN e.LivingSituation = 421 THEN 'Owned by client, no subsidy'
                WHEN e.LivingSituation = 411 THEN 'Owned by client, with subsidy'
                WHEN e.LivingSituation = 30  THEN 'No exit interview completed'
                WHEN e.LivingSituation = 17  THEN 'Other'
                WHEN e.LivingSituation = 24  THEN 'Deceased'
                WHEN e.LivingSituation = 37  THEN 'Unable to determine'
                WHEN e.LivingSituation = 8   THEN 'Client doesn''t know'
                WHEN e.LivingSituation = 9   THEN 'Client prefers not to answer'
                WHEN e.LivingSituation = 99  THEN 'Data Not Collected'
                WHEN e.LivingSituation IS NULL THEN 'Data Not Collected'
                ELSE 'Unknown'
            END AS LivingSituation,
            CASE
                WHEN e.LivingSituation >= 100 AND e.LivingSituation < 200 THEN 'Homeless Situation'
                WHEN e.LivingSituation >= 200 AND e.LivingSituation < 300 THEN 'Institutional Situation'
                WHEN e.LivingSituation >= 300 AND e.LivingSituation < 400 THEN 'Temporary Situation'
                WHEN e.LivingSituation >= 400 AND e.LivingSituation < 500 THEN 'Permanent Housing Situation'
                ELSE 'Other'
            END AS LivingSituationGrouping,
            x.ExitID, x.ExitDate,
            CASE
                WHEN x.Destination IS NULL THEN 99
                ELSE x.Destination
            END AS Destination,
            CASE
                WHEN x.Destination >= 100 AND x.Destination < 200 THEN 'Homeless Situation'
                WHEN x.Destination >= 200 AND x.Destination < 300 THEN 'Institutional Situation'
                WHEN x.Destination >= 300 AND x.Destination < 400 THEN 'Temporary Situation'
                WHEN x.Destination >= 400 AND x.Destination < 500 THEN 'Permanent Housing Situation'
                ELSE 'Other'
            END AS DestinationGrouping
        FROM Enrollment e
        INNER JOIN Exit x
            ON e.EnrollmentID = x.EnrollmentID
        """
        return query
    
    def log_experiment(
        self,
        model_type: str,
        hyperparameters: dict,
        training_data_version: str,
        performance_metrics: dict,
        s3_model_location: str,
        start_time: datetime,
        end_time: datetime,
        status: str,
        notes: str
    ):
        """
        Logs an experiment to the PostgreSQL database.

        Parameters:
        - model_type (str): The type of the model (e.g., 'RandomForest', 'GBM').
        - hyperparameters (dict): A dictionary of hyperparameters used for the model.
        - training_data_version (str): The version of the training data.
        - performance_metrics (dict): A dictionary of performance metrics (e.g., accuracy, F1 score).
        - s3_model_location (str): The S3 location where the model weights are stored.
        - start_time (datetime): The start time of the experiment.
        - end_time (datetime): The end time of the experiment.
        - status (str): The status of the experiment (e.g., 'Completed', 'Failed').
        - notes (str): Any additional notes for the experiment.

        Returns:
        - None: Logs the experiment in the database.
        """
        cursor = self.connection.cursor()
        insert_query = """
            INSERT INTO experiments (
                model_type, 
                hyperparameters, 
                training_data_version, 
                performance_metrics, 
                s3_model_location, 
                start_time, 
                end_time, 
                status, 
                notes
            ) 
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        """

        # Execute the insert query
        cursor.execute(insert_query, (
            model_type, 
            json.dumps(hyperparameters), 
            training_data_version, 
            json.dumps(performance_metrics), 
            s3_model_location, 
            start_time, 
            end_time, 
            status, 
            notes
        ))

        self.connection.commit()

In [14]:
class DataRetriever:
    def __init__(self, data_source):
        self.data_source = data_source
        
    def records(self):
        query = self.data_source.enrollment_query()
        total_count = self.data_source.count(query)

        if total_count is None:
            print("Failed to retrieve total count.")
            return None

        all_data = pd.DataFrame()
        batch_size = 10000
        
        try:
            for offset in range(0, total_count, batch_size):
                last_idx = min(offset + batch_size, total_count)
                print(f"Fetching {offset} to {last_idx} of {total_count}")
                data = self.data_source.data(query, limit=batch_size, offset=offset)
                all_data = pd.concat([all_data, data], ignore_index=True)
        except Exception as e:
            print(f"Error querying data: {e}")

        return all_data

In [15]:
class DataInspector:
    def display(self, data, columns):
        styled_df = data[[col.lower() for col in columns]].head().style.set_table_styles([{
            'selector': 'table',
            'props': [('max-width', '1000px'), ('overflow-x', 'scroll'), ('display', 'block')]
        }])

        display(styled_df)
        print(f"Total count: {len(data)}")

In [16]:
class S3:
    def __init__(self, bucket_name):
        self.bucket_name = bucket_name
        self.s3 = boto3.client('s3')
        
    def save_model(self, model, name):
        model_buffer = io.BytesIO()
        joblib.dump(model, model_buffer)
        model_buffer.seek(0)
        try:
            file_key = f"models/{name}.joblib"
            self.s3.upload_fileobj(model_buffer, self.bucket_name, file_key)
            print(f"Model successfully uploaded to s3://{self.bucket_name}/{file_key}")
            return f"s3://{self.bucket_name}/{file_key}"
        except Exception as e:
            print(f"Failed to upload model to S3: {e}")
            return None
    
    def read_dataset(self, name):
        base_key = f"datasets/{name}"
        
        # check for the last file written... hack
        if not self.key_exists(f"{base_key}/y_test"):
            return None
        
        dataset = Dataset()

        dataset.X_train = self.read_dataset_df(f"{base_key}/X_train")
        dataset.X_val   = self.read_dataset_df(f"{base_key}/X_val")
        dataset.X_test  = self.read_dataset_df(f"{base_key}/X_test")
        
        # the label is a Series, so get the first column from the saved dataframe
        dataset.y_train = self.read_dataset_df(f"{base_key}/y_train").iloc[:, 0]
        dataset.y_val   = self.read_dataset_df(f"{base_key}/y_val").iloc[:, 0]
        dataset.y_test  = self.read_dataset_df(f"{base_key}/y_test").iloc[:, 0]
        
        return dataset
    
    def read_dataset_df(self, file_key):
        dataset_buffer = io.BytesIO()
        self.s3.download_fileobj(self.bucket_name, file_key, dataset_buffer)
        dataset_buffer.seek(0)
        dataset_df = pd.read_parquet(dataset_buffer, engine='pyarrow')
        
        return dataset_df
    
    def save_dataset(self, dataset, name):
        base_key = f"datasets/{name}"
        
        self.save_dataset_df(dataset.X_train, f"{base_key}/X_train")
        self.save_dataset_df(dataset.X_val,   f"{base_key}/X_val")
        self.save_dataset_df(dataset.X_test,  f"{base_key}/X_test")
        
        # the label is a Series, so save it as a dataframe for parquet
        self.save_dataset_df(pd.DataFrame(dataset.y_train), f"{base_key}/y_train")
        self.save_dataset_df(pd.DataFrame(dataset.y_val),   f"{base_key}/y_val")
        self.save_dataset_df(pd.DataFrame(dataset.y_test),  f"{base_key}/y_test")
        
    def save_dataset_df(self, df, file_key):
        dataset_buffer = io.BytesIO()
        df.to_parquet(dataset_buffer, engine='pyarrow', index=False)

        dataset_buffer.seek(0)
        try:
            self.s3.upload_fileobj(dataset_buffer, self.bucket_name, file_key)
            print(f"Dataset successfully uploaded to s3://{self.bucket_name}/{file_key}")
            return f"s3://{self.bucket_name}/{file_key}"
        except Exception as e:
            print(f"Failed to upload dataset to S3: {e}")
            return None
        
    def key_exists(self, file_key):
        try:
            self.s3.head_object(Bucket=self.bucket_name, Key=file_key)
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                return False
            else:
                raise

In [17]:
class FeatureEngineer:
    def __init__(self, data):
        self.data = data
        
    def transformed_data(self):
        self.transform()
        return self.data
    
    def transform(self):
        self.encode_categorical()
        self.add_new_features()
        
    def encode_categorical(self):
        le = LabelEncoder()
        categorical_columns = ['livingsituation', 'livingsituationgrouping', 'destination', 'destinationgrouping']
        for col in categorical_columns:
            self.data[col] = le.fit_transform(self.data[col])
            
    def add_new_features(self):
        # create a feature for enrollment duration in days
        self.data['entrydate'] = pd.to_datetime(self.data['entrydate'])
        self.data['exitdate'] = pd.to_datetime(self.data['exitdate'])
        self.data['enrollment_duration'] = (self.data['exitdate'] - self.data['entrydate']).dt.days


In [18]:
class Dataset:
    def __init__(self, data=None, random_state=413):
        self.random_state = random_state
        self.data = data
        if self.data is not None:
            self.train_test_split()
        
    def train_test_split(self):
        # create an 80/10/10 train/val/test split
        self.X_train, X_temp, self.y_train, y_temp = train_test_split(
            self.X(), self.y(), test_size=0.2, random_state=self.random_state
        )
        self.X_val, self.X_test, self.y_val, self.y_test = train_test_split(
            X_temp, y_temp, test_size=0.5, random_state=self.random_state
        )
        
    def X(self):
        return self.data[['livingsituation', 'livingsituationgrouping', 'enrollment_duration']]
    
    def y(self):
        return self.data['destinationgrouping']

In [19]:
class RandomForestModel:
    def __init__(self, config):
        self.model = RandomForestClassifier(**config)
        
    def cross_validate(self, X_train, y_train, cv=5):
        scores = cross_validate(self.model, X_train, y_train, cv=cv, scoring=['accuracy', 'f1'], return_train_score=False)
        return {
            "cv_accuracy": scores['test_accuracy'].mean(), 
            "cv_f1_score": scores['test_f1'].mean()
        }
    
    def fit(self, X_train, y_train):
        self.model.fit(X_train, y_train)
        
    def predict(self, X_test):
        return self.model.predict(X_test)        

In [20]:
class FineTuningConfigurator:
    def __init__(self, configs):
        self.configs = self.make_configs(configs)
        
    def all_configs(self):
        return self.configs
    
    def make_configs(self, configs):
        keys = configs.keys()
        values = configs.values()
        combinations = list(itertools.product(*values))
        param_combinations = [dict(zip(keys, combination)) for combination in combinations]
        return param_combinations

In [21]:
class DataPipeline:
    def __init__(self, config):
        self.config = SimpleNamespace(**config)
        
    def run(self):
        start_time = datetime.now()
        
        data_source = DataSource(self.db_config())
        s3 = S3(bucket_name='capstone-hmis')

        dataset = s3.read_dataset(self.config.dataset_name)
        if not dataset:
            print("Querying RDS for data...")
            data_retriever = DataRetriever(data_source)
            data = data_retriever.records()

            print("Data before feature engineering")
            data_inspector = DataInspector()
            data_inspector.display(data, data.columns.tolist())

            feature_engineer = FeatureEngineer(data)
            transformed_data = feature_engineer.transformed_data()
            print("Data after feature engineering")
            data_inspector.display(transformed_data, transformed_data.columns.tolist())

            dataset = Dataset(transformed_data)
            s3.save_dataset(dataset, self.config.dataset_name)
        else:
            print(f"Using saved Dataset {self.config.dataset_name}")
            
        best_experiment = SimpleNamespace(accuracy=0, f1_score=0, model=None, uuid=None, config=None)
        
        # fine-tune the model by exploring all combinations of model params
        configurator = FineTuningConfigurator(self.config.model_params)
        for model_config in configurator.all_configs():
            print(f"\nFitting {self.config.model_type} model with config: {model_config}")
            model = RandomForestModel(model_config)
            model.fit(dataset.X_train, dataset.y_train)
        
            print("Evaluating model with validation set")
            y_pred = model.predict(dataset.X_val)

            accuracy = accuracy_score(dataset.y_val, y_pred)
            f1 = f1_score(dataset.y_val, y_pred, average='weighted')
            print("\n====================================================================\n")
            print(f" {self.config.model_type}({self.config.model_name}) Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
            print("\n====================================================================\n")

            experiment_uuid = str(uuid.uuid4())
            
            # determine if this was the best model by comparing f1 score
            if f1 > best_experiment.f1_score:
                best_experiment.accuracy = accuracy
                best_experiment.f1_score = f1
                best_experiment.model = model.model
                best_experiment.uuid = experiment_uuid
                best_experiment.config = model_config
                
            print("Logging experiment in RDS experiments table")
            data_source.log_experiment(
                model_type=self.config.model_type,
                hyperparameters=model.model.get_params(),
                training_data_version=self.config.dataset_name,
                performance_metrics={"accuracy": accuracy, "f1_score": f1},
                s3_model_location=None,
                start_time=start_time,
                end_time=datetime.now(),
                status='Completed',
                notes=self.config.notes
            )

        print("\n--------------------------------------------------------------------------\n")
        print(f" Best Experiment Config: {best_experiment.config}")
        print(f" Accuracy: {best_experiment.accuracy:.4f}, F1: {best_experiment.f1_score:.4f}")
        print("\n--------------------------------------------------------------------------\n")
        
        print("Saving best model to S3...")
        s3_model_location = s3.save_model(
            model=best_experiment.model,
            name=f"{self.config.model_name}_{datetime.now():%Y-%m-%d}_{best_experiment.uuid}"
        )

        print("Logging best experiment in RDS experiments table")
        data_source.log_experiment(
            model_type=self.config.model_type,
            hyperparameters=best_experiment.model.get_params(),
            training_data_version=self.config.dataset_name,
            performance_metrics={"accuracy": best_experiment.accuracy, "f1_score": best_experiment.f1_score},
            s3_model_location=s3_model_location,
            start_time=start_time,
            end_time=datetime.now(),
            status='Completed',
            notes=self.config.notes
        )

        print("Experiment Complete!")
    
    def db_config(self):
        return SimpleNamespace(
            host="capstone-database.cr62wyo4a7dt.us-east-2.rds.amazonaws.com",
            database="capstone",
            user="postgres",
            password="<PASSWORD_HERE>",
            port="5432"
        )

In [22]:
config = {
    "model_type": "RandomForest",
    "model_name": "random-forest-v1",
    "model_params": {
        'n_estimators': [50, 100], # [50, 100, 200],
        'max_depth': [10], # [10, 20, None],
        'min_samples_split': [2], # [2, 5, 10],
        'min_samples_leaf': [1], # [1, 2, 4],
        'max_features': ['sqrt'], # ['sqrt', 'log2', None]
    },
    "dataset_name": "v0.0.2",
    "notes": "First RandomForest experiment with fine-tuning"
}
pipeline = DataPipeline(config)
pipeline.run()

Connection to RDS successful
Using saved Dataset v0.0.2

Fitting RandomForest model with config: {'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 2, 'min_samples_leaf': 1, 'max_features': 'sqrt'}
Evaluating model with validation set


 RandomForest(random-forest-v1) Accuracy: 0.7153, F1: 0.6228


Logging experiment in RDS experiments table

Fitting RandomForest model with config: {'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 2, 'min_samples_leaf': 1, 'max_features': 'sqrt'}
Evaluating model with validation set


 RandomForest(random-forest-v1) Accuracy: 0.7152, F1: 0.6221


Logging experiment in RDS experiments table

--------------------------------------------------------------------------

 Best Experiment Config: {'n_estimators': 50, 'max_depth': 10, 'min_samples_split': 2, 'min_samples_leaf': 1, 'max_features': 'sqrt'}
 Accuracy: 0.7153, F1: 0.6228

--------------------------------------------------------------------------

Saving best model to S3...
Model 