In [1]:

import datetime

import pandas as pd
import os
import requests
import zipfile
from tqdm import tqdm

import mlflow
import os
import io
import pickle

from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

from prefect import flow, task

from mlflow.tracking import MlflowClient
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository
from mlflow.entities import ViewType

from google.cloud import storage

import pyarrow as pa



import evidently

from hyperopt import STATUS_OK, Trials, fmin, hp, tpe

In [2]:
#utils imports
from utils.get_data.download_data import download_file
from utils.data_preprocessing.data_preparation import load_dataframe, remove_missing_data, create_hour_weekday, transform_df_target, extract_station_info
from utils.data_processing.feature_encode_scale import encode_categorical, scale_numerical
from utils.data_processing.splitter import split_data
from utils.save_data.save_to_gcs import save_multiple_dataframes_to_gcs_as_pkl, save_dataframe_to_gcs_as_parquet


# Download raw data, preprocess and create new dataset

In [3]:
def save_multiple_pkl_to_gcs(encoder_scaler, bucket_name, folder_path, file_name):
    with io.BytesIO() as buffer:
        pickle.dump(encoder_scaler, buffer)
        buffer.seek(0)

        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(f"{folder_path}/{file_name}.pkl")
        blob.upload_from_file(buffer)

In [4]:
#setting up mlflow experiment on prefect (https://medium.com/@rehabreda/mlflow-hyperopt-prefect-evidently-and-grafana-the-ultimate-guide-to-building-tracking-2d3591b6c1bd)
 
@task
def mlflow_environment(experiment_name):
    TRACKING_SERVER_HOST = "34.171.118.161" #external IP reserved in GCP - updated Aug 8 2024
    mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:5000") 
    mlflow.set_experiment(experiment_name)
    experiment = mlflow.get_experiment_by_name(experiment_name)
    print(f"tracking URI: '{mlflow.get_tracking_uri()}'")

    return experiment.experiment_id


In [5]:
# download data and prepare data
# @task
# def download_data(year, month):
    
#     result = download_file(year, month)
#     print(f'Downloaded file is available at {result["path"]}')

In [6]:
@task
def download_file(year, month):
    month = str(month).zfill(2)
    file= f'{year}{month}' #change to env variables year and month and path
    path= './data/raw'
    url=f'https://divvy-tripdata.s3.amazonaws.com/{file}-divvy-tripdata.zip'

    resp=requests.get(url, stream=True)
    zip_save_path = f'{path}/{file}.zip'

    os.makedirs(path, exist_ok=True)

    with open(zip_save_path,"wb") as handle:
        for data in tqdm(resp.iter_content(chunk_size=1024),
                        desc=f'{file}',
                        postfix=f"save to {zip_save_path}",
                        total=int(resp.headers["Content-Length"])):
            handle.write(data)

    with zipfile.ZipFile(zip_save_path, 'r') as zip_ref:
        zip_ref.extractall(path)
    
    os.remove(zip_save_path)

    return {
      "path": f'{path}/{year}{month}-divvy-tripdata.csv',
      "year": year,
      "month": month
    }

In [7]:
@task
def prepare_dataset(year, month):
    month = str(month).zfill(2)
    path_input ='./data/raw'   
    path_output ='./data/processed'
    filename_input = f'{path_input}/{year}{month}-divvy-tripdata.csv'
    filename_output = f'{path_output}/{year}{month}-usage.parquet'

    df = load_dataframe(filename_input) 
    df = remove_missing_data(df)
    df = create_hour_weekday(df)

    usage_df = transform_df_target(df)

    usage_df_final = extract_station_info(usage_df.copy())

    usage_df_final.to_parquet(filename_output)

    return usage_df_final, filename_output
    

# Load data, data processing and model training

In [8]:
@task
def preprocess_data(data_file, split_params):
    df = pd.read_parquet(data_file)
    ohe, encoded_df = encode_categorical(df)

    X_train, X_test, y_train, y_test = split_data(encoded_df, split_params)

    scaler, transformed_X_train, transformed_X_test, y_train, y_test = scale_numerical(X_train, X_test, y_train, y_test)

    return ohe, scaler, transformed_X_train, transformed_X_test, y_train, y_test



In [9]:
@task
def save_dataframes_gcs(usage_df_final, transformed_X_train, y_train, transformed_X_test, y_test, usage_filename, train_test_filename, test_filename):
    bucket_name = 'mlops-divvy-experiment-tracking'
    folder_path ='data/deployment'

    save_dataframe_to_gcs_as_parquet(usage_df_final, bucket_name, folder_path, usage_filename)

    dataframes =[transformed_X_train, y_train, transformed_X_test, y_test]

    save_multiple_dataframes_to_gcs_as_pkl(dataframes, bucket_name, folder_path, train_test_filename)

    transformed_test = pd.concat([transformed_X_test, y_test], axis =1)
    save_dataframe_to_gcs_as_parquet(transformed_test, bucket_name, folder_path, test_filename)

    print(f'transformed training and test sets saved in pkl, transformed set saved as parquet to deployment folder ')
 


In [10]:
@task
def save_encoder_scaler(ohe, scaler,file_name):
    encoder_scaler = [ohe, scaler]
    bucket_name = 'mlops-divvy-experiment-tracking'
    folder_path ='models/deployment'

    save_multiple_pkl_to_gcs(encoder_scaler, bucket_name, folder_path, file_name)

In [11]:
@task
def save_experiment_data(transformed_X_train,y_train, transformed_X_test, y_test, train_test_path, test_path):
    with open(train_test_path, 'wb') as f: 
            pickle.dump((transformed_X_train, y_train, transformed_X_test, y_test), f)
    transformed_test = pd.concat([transformed_X_test, y_test], axis =1)
    transformed_test.to_parquet(test_path) 


In [12]:
# loading daat function and task
@task
def load_prepared_data(data_file): 
    df = pd.read_parquet(data_file)
    return df

In [13]:
# encode categorical data
@task
def encode_categorical(df):
    cat_features = ['station_name', 'day_of_week']
    ohe = OneHotEncoder(handle_unknown='ignore')
    ohe_cols = pd.DataFrame(ohe.fit_transform(df[cat_features]).toarray(), columns = ohe.get_feature_names_out(cat_features))
    ohe_cols.index = df.index

    encoded_df = pd.concat([df, ohe_cols], axis=1)
    encoded_df = encoded_df.drop(cat_features, axis =1)

    return ohe, encoded_df

    

In [14]:
# split data
@task
def split_data(df, params):
    features = df.drop(['net_usage'], axis =1)
    target = df['net_usage']

    X_train, X_test, y_train, y_test = train_test_split(features, target, **params)

    return X_train, X_test, y_train, y_test
    

In [15]:
@task
def scale_numerical(X_train, X_test, y_train, y_test):
    num_features= ['hour']
    Standard_Scaler = StandardScaler()
    num_scaled_train = pd.DataFrame(Standard_Scaler.fit_transform(X_train[num_features]), columns=['hour_scaled'])
    num_scaled_test = pd.DataFrame(Standard_Scaler.transform(X_test[num_features]), columns=['hour_scaled'])

    num_scaled_train.index = X_train.index
    num_scaled_test.index = X_test.index

    scaled_X_train = pd.concat([X_train, num_scaled_train], axis=1)
    scaled_X_test = pd.concat([X_test, num_scaled_test], axis=1)

    scaled_X_train = scaled_X_train.drop(num_features, axis=1)
    scaled_X_test = scaled_X_test.drop(num_features, axis=1)

    return Standard_Scaler, scaled_X_train, scaled_X_test, y_train, y_test

    

In [16]:
# used the help of this post to set up hyperparameter tuning with hyperopt: https://medium.com/@rehabreda/mlflow-hyperopt-prefect-evidently-and-grafana-the-ultimate-guide-to-building-tracking-2d3591b6c1bd
@task
def train_hyperparameter_tuning(X_train, X_test, y_train, y_test, model_name):
    search_space = {
    'n_estimators': hp.choice('n_estimators', range(100, 1001, 100)),
    'max_depth': hp.choice('max_depth', range(3, 21, 3)),
    'min_samples_split': hp.choice('min_samples_split', range(2, 11)),
    'min_samples_leaf': hp.choice('min_samples_leaf', range(1, 11)),
    'bootstrap': hp.choice('bootstrap', [True, False])
}


    def objective (params):
    
        with mlflow.start_run(nested=True):
            mlflow.set_tag("model", "randomforest")
            mlflow.log_params(params)
            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_test)
            
            mse = mean_squared_error(y_test,y_pred)
            mlflow.log_metric("mse", mse)
            
            mlflow.sklearn.log_model(
                rf, artifact_path="mlruns", 
                registered_model_name = model_name
            )
            
        return {'loss': -mse, 'status': STATUS_OK } 

    
    best_result = fmin(
    fn = objective,
    space = search_space,
    algo = tpe.suggest,
    max_evals = 5,
    trials = Trials())

    return best_result
    

In [17]:
# select best model and send to production on mlflow
@task
def get_best_model(experiment_id):
    client = MlflowClient(tracking_uri="http://34.171.118.161:5000")
    run = MlflowClient().search_runs(
    experiment_ids = experiment_id,
    run_view_type = ViewType.ACTIVE_ONLY,
    order_by=["metrics.mse DESC"]
    )[0]
    run_id = run.info.run_id
    model_uri = f"runs:/{run_id}/mlruns"
    model_src = RunsArtifactRepository.get_underlying_uri(model_uri)

    filter_string = "run_id='{}'".format(run_id)
    results = client.search_model_versions(filter_string)
    model_version=results[0].version

    return model_version ,model_uri 

In [18]:
# best model to production
@task
def promote_best_model(model_version,model_name):
    new_stage = "Production"
    client = MlflowClient(tracking_uri="http://34.171.118.161:5000")
    client.transition_model_version_stage(
            name=model_name,
            version=model_version,
            stage=new_stage,
            archive_existing_versions=False
    )

# Grafana Dashboard with Docker (not set up)

In [170]:
# def container_status(container_names):
#     client = docker.from_env()

#     # Specify the prefix of the container name
    
#     running_containers=0

#     for container_name in container_names :
#         containers = client.containers.list(filters={"name": container_name})
#         if len(containers)!=0:
#             if client.api.inspect_container(containers[0].id)["State"]["Running"]:
#                 running_containers+=1
#         time.sleep(1) 
     
            

#     client.close()  
#     return running_containers

In [171]:
# @task
# def docker_up(container_names):
#     #os.environ["COMPOSE_PROJECT_NAME"] = container_name
#     running_containers=container_status(container_names)
#     if running_containers!=3 :
#        os.system("start docker-compose up --build ") 

In [172]:
# @task
# def wait_for_container(container_names):
#     client = docker.from_env()

#     # Specify the prefix of the container name

#     for container_name in container_names :
        
#         while True:
#             containers = client.containers.list(filters={"name": container_name})
#             if len(containers)==1 and client.api.inspect_container(containers[0].id)["State"]["Running"]:
#                 break
#             time.sleep(120)           
#     client.close()        

In [173]:
# @task
# def prep_db():
     
#      create_table_statement = """
#         drop table if exists predictions_metrics;
#         create table predictions_metrics(
#             timestamp timestamp,
#             prediction_drift float,
#             num_drifted_columns integer,
#             share_missing_values float
#         )
#      """
#      con = psycopg2.connect(host='localhost', user='postgres',password='example',database='postgres', port=5432)
#      con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # <-- ADD THIS LINE
#      cur = con.cursor()
#      check_database_query = "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'test')"
#      # Execute the SQL statement to check if the database exists
#      cur.execute(check_database_query)
#      # Fetch the result of the query
#      exists = cur.fetchone()[0]
#      if not exists:
#         cur.execute(sql.SQL("CREATE DATABASE  test"))
#      with psycopg2.connect(host='localhost', user='postgres',password='example', dbname='test', port=5432) as conn:
#         cur = conn.cursor()        
#         cur.execute(create_table_statement)    


In [174]:
# @task
# def prepare_reference_data(model_uri,x_train,x_test):
#     loaded_model = mlflow.pyfunc.load_model(model_uri)
#     x_train['prediction'] = loaded_model.predict(x_train)
#     engine = create_engine('postgresql+psycopg2://postgres:example@localhost:5432/test')
#     x_train.to_sql('reference', con=engine, if_exists='replace',
#           index=False)
#     x_test.to_sql('production', con=engine, if_exists='replace',
#           index=False)

In [175]:
@task
def serve_model(model_uri):
   os.system("start killport 8000 ")
   os.system(f"start uvicorn main:app  ") 

# Running flow

## flow starting from downloading the raw data - WORKING

In [19]:
@flow
def main(experiment_name, model_name):
    experiment_id = mlflow_environment(experiment_name)
    # add data download and preparation
    year = 2023
    month = 4
    
    result = download_file(year, month)
    print(result)
    
    usage_df_final, filename_output = prepare_dataset(year, month)
    print('dataset usage to train is ready!')

    with mlflow.start_run(nested=True):

        #preprocess_data load the usage dataset
        
        split_params = {"test_size": 0.2, "random_state": 42}
        mlflow.log_param("split_params", split_params)

        ohe, scaler, transformed_X_train, transformed_X_test, y_train, y_test = preprocess_data(filename_output, split_params)
        mlflow.log_param("usage_data_file", filename_output)

        encoder_scaler_filename = f'encoder_scaler-{experiment_name}-{experiment_id}.pkl'
        save_encoder_scaler(ohe, scaler, encoder_scaler_filename)

        with open(f"models/encoder-{experiment_name}-{experiment_id}.pkl", "wb") as f:
             pickle.dump(ohe, f)
        mlflow.log_artifact(f"models/encoder-{experiment_name}-{experiment_id}.pkl")
    
        with open(f"./models/scaler-{experiment_name}-{experiment_id}.pkl", "wb") as f:
            pickle.dump(scaler, f)
        
        mlflow.log_artifact(f"./models/scaler-{experiment_name}-{experiment_id}.pkl")

        # #saving sets locally
        # with open(f'../data/test_data/202304-usage-{experiment_name}-{experiment_id}.pkl', 'wb') as f:
        #     pickle.dump((transformed_X_train, y_train, transformed_X_test, y_test), f)
        # transformed_test = pd.concat([transformed_X_test, y_test], axis =1)
        # transformed_test.to_parquet(f"../deployment/data/202304-transformed_X_test-{experiment_name}-{experiment_id}.parquet")

        #saving data to GCS
        train_test_filename= f'202304-usage-{experiment_name}-{experiment_id}.pkl'
        test_filename = f'202304-transformed_X_test-{experiment_name}-{experiment_id}.parquet'
        usage_filename =f'{year}{month}-usage.parquet'

        
        save_dataframes_gcs(usage_df_final,transformed_X_train, y_train, transformed_X_test, y_test, usage_filename, train_test_filename, test_filename)

        best_result = train_hyperparameter_tuning(transformed_X_train,transformed_X_test, y_train, y_test, model_name)
        
        print(experiment_id)
        
        model_version, model_uri = get_best_model(experiment_id)

        print(model_version)
        print(model_uri)

        #os.environ['model_uri'] = model_uri
        
        promote_best_model(model_version, model_name)

        #docker_up(container_names)
        #prep_db()
        #prepare_reference_data(model_uri, transformed_X_train, transformed_X_test)
        #serve_model(model_uri)
       






In [20]:
#testing flow from raw data
experiment_name = '08142024'
model_name = 'random-forest-regressor'
main(experiment_name, model_name)





tracking URI: 'http://34.171.118.161:5000'


202304:   0%|          | 15038/15398190 [00:01<19:38, 13056.77it/s, save to ./data/raw/202304.zip]


{'path': './data/raw/202304-divvy-tripdata.csv', 'year': 2023, 'month': '04'}


ValueError: too many values to unpack (expected 2)

## flow starting from prepared dataset - working

In [82]:
@flow
def main(experiment_name, model_name):
    experiment_id = mlflow_environment(experiment_name)
    with mlflow.start_run(nested=True):
        data_file = "../data/processed/202304-usage.parquet" 
        df = load_prepared_data(data_file)
        mlflow.log_param("data_file", data_file)
        
        # categorical encode
        ohe, encoded_df = encode_categorical(df)
        with open(f"models/encoder-{experiment_name}-{experiment_id}.pkl", "wb") as f:
            pickle.dump(ohe, f)
        mlflow.log_artifact(f"models/encoder-{experiment_name}-{experiment_id}.pkl")

        #split sets
        split_params = {"test_size": 0.2, "random_state": 42}
        X_train, X_test, y_train, y_test = split_data(encoded_df, split_params)
        mlflow.log_param("split_params", split_params)

        #numerical scale
        scaler, transformed_X_train, transformed_X_test, y_train, y_test = scale_numerical(X_train, X_test, y_train, y_test)
        with open(f"./models/scaler-{experiment_name}-{experiment_id}.pkl", "wb") as f:
            pickle.dump(scaler, f)
        mlflow.log_artifact(f"./models/scaler-{experiment_name}-{experiment_id}.pkl")

        
        #saving sets
        with open(f'../data/test_data/202304-usage-{experiment_name}-{experiment_id}.pkl', 'wb') as f:
            pickle.dump((transformed_X_train, y_train, transformed_X_test, y_test), f)
        transformed_test = pd.concat([transformed_X_test, y_test], axis =1)
        transformed_test.to_parquet(f"../deployment/data/202304-transformed_X_test-{experiment_name}-{experiment_id}.parquet")

        #saving data to GCS
        train_test_filename= f'202304-usage-{experiment_name}-{experiment_id}.pkl'
        test_filename = f'202304-transformed_X_test-{experiment_name}-{experiment_id}.parquet'
        save_dataframes_gcs(transformed_X_train, y_train, transformed_X_test, y_test, train_test_filename, test_filename)


        best_result = train_hyperparameter_tuning(transformed_X_train,transformed_X_test, y_train, y_test, model_name)
        
        print(experiment_id)
        
        model_version, model_uri = get_best_model(experiment_id)

        print(model_version)
        print(model_uri)

        #os.environ['model_uri'] = model_uri
        
        promote_best_model(model_version, model_name)

        #docker_up(container_names)
        #prep_db()
        #prepare_reference_data(model_uri, transformed_X_train, transformed_X_test)
        #serve_model(model_uri)
       










In [83]:
experiment_name ='experiment-20240813'
model_name = "random-forest-regressor"

In [84]:
main(experiment_name, model_name)




tracking URI: 'http://34.171.118.161:5000'


  0%|          | 0/5 [00:00<?, ?trial/s, best loss=?]



Registered model 'random-forest-regressor' already exists. Creating a new version of this model...
2024/08/13 12:11:24 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: random-forest-regressor, version 2



 20%|██        | 1/5 [04:24<17:38, 264.62s/trial, best loss: -21.826912973684998]

Created version '2' of model 'random-forest-regressor'.


Registered model 'random-forest-regressor' already exists. Creating a new version of this model...
2024/08/13 12:26:53 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: random-forest-regressor, version 3



 40%|████      | 2/5 [19:53<32:45, 655.23s/trial, best loss: -21.922722686254843]

Created version '3' of model 'random-forest-regressor'.




Registered model 'random-forest-regressor' already exists. Creating a new version of this model...
2024/08/13 14:03:25 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: random-forest-regressor, version 4



 60%|██████    | 3/5 [1:56:25<1:40:02, 3001.04s/trial, best loss: -21.99691175885226]

Created version '4' of model 'random-forest-regressor'.


Registered model 'random-forest-regressor' already exists. Creating a new version of this model...
2024/08/13 14:24:29 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: random-forest-regressor, version 5



 80%|████████  | 4/5 [2:17:30<38:35, 2315.34s/trial, best loss: -22.074096583710904] 

Created version '5' of model 'random-forest-regressor'.


Registered model 'random-forest-regressor' already exists. Creating a new version of this model...
2024/08/13 14:26:53 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: random-forest-regressor, version 6

Created version '6' of model 'random-forest-regressor'.


100%|██████████| 5/5 [2:19:54<00:00, 1678.82s/trial, best loss: -22.074096583710904]


13


5
runs:/1290d94d659f4f6da4202bec8927c2be/mlruns


  client.transition_model_version_stage(


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `dict`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `tuple`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersist