# Upload Survival Model to MLflow

@roman_avj

7 nov 2023


In [4]:
# libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import sys
import boto3
import sqlalchemy
import mlflow
import cloudpickle


from sksurv.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import PowerTransformer
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from skopt import BayesSearchCV
from skopt.space import Real, Categorical, Integer
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer
from sklearn.model_selection import StratifiedKFold

from xgbse import XGBSEStackedWeibull
from xgbse.extrapolation import extrapolate_constant_risk
import lifelines

from scipy.integrate import simpson
from scipy.optimize import brentq

import geopandas as gpd
import folium

from sksurv.metrics import (
    concordance_index_censored,
    concordance_index_ipcw,
    cumulative_dynamic_auc,
    integrated_brier_score,
)
from xgbse.metrics import (
    approx_brier_score,
    dist_calibration_score,
    concordance_index
)

# Data

## Read & Clean

In [5]:
# read
df_model = pd.read_parquet('../../data/data2analyze_clean_v2_sale.parquet')
df_model.info()

# add if has maintenance
df_model['has_maintenance'] = df_model['cost_of_maintenance'].apply(lambda x: 1 if x > 0 else 0)

# clip columns with 'lag' up to 99 percentile
vars_lag = df_model.columns[df_model.columns.str.contains('lag')]
df_model[vars_lag] = df_model[vars_lag].clip(upper=df_model[vars_lag].quantile(0.99), axis=1)

# look rows with maximum time2event
df_max = df_model[df_model['time2event'] == df_model['time2event'].max()]



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 47397 entries, 0 to 47396
Columns: 141 entries, id to cosine_tmonth
dtypes: datetime64[us](2), float64(122), int32(1), int64(4), object(9), string(3)
memory usage: 50.8+ MB


## Transformations

In [6]:
# select columns
vars_x_categorical = ['property_type']
vars_x_discrete = []
vars_x_woe = ['woe_marketplace', 'woe_seller', 'woe_id_sepomex']
vars_x_numerical = [
    'first_price', 'diff_first_prediction', 
    # 'prediction_price_per_square_meter',
    # 'surface_total',
    'page_on_marketplace'
    ]
vars_x_binary = []
vars_x_geographic = ['latitude', 'longitude']
vars_x_time = ['sine_tmonth', 'cosine_tmonth']

vars_x_names = vars_x_categorical + vars_x_numerical + vars_x_binary + vars_x_discrete + vars_x_geographic + vars_x_woe + vars_x_time

# corroborate there are not duplicates in the vars_x_names
print(len(vars_x_names))
print(len(set(vars_x_names)))

# get y data as sksurv need
data_y = np.array(
    list(zip(df_model['event'], df_model['time2event'])),
    dtype=[('Status', '?'), ('Survival_in_days', '<f8')]
)

# get x data
data_x = (
    df_model.copy()
    .astype({col: 'category' for col in vars_x_categorical})
    .astype({col: np.float64 for col in vars_x_numerical + vars_x_discrete + vars_x_binary + vars_x_geographic + vars_x_woe + vars_x_time})
    .astype({col: np.int8 for col in vars_x_binary})
    [vars_x_names]
)
data_x.info()

11
11
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 47397 entries, 0 to 47396
Data columns (total 11 columns):
 #   Column                 Non-Null Count  Dtype   
---  ------                 --------------  -----   
 0   property_type          47397 non-null  category
 1   first_price            47397 non-null  float64 
 2   diff_first_prediction  47397 non-null  float64 
 3   page_on_marketplace    47397 non-null  float64 
 4   latitude               47397 non-null  float64 
 5   longitude              47397 non-null  float64 
 6   woe_marketplace        47397 non-null  float64 
 7   woe_seller             47397 non-null  float64 
 8   woe_id_sepomex         47397 non-null  float64 
 9   sine_tmonth            47397 non-null  float64 
 10  cosine_tmonth          47397 non-null  float64 
dtypes: category(1), float64(10)
memory usage: 3.7 MB


In [7]:
def boxcox(X):
    # power_transform
    power_transform = PowerTransformer(method='yeo-johnson', standardize=True).fit(X)
    X_transf = power_transform.transform(X)
    return X_transf, power_transform

def scale(X):
    # power_transform
    standard_scaler = StandardScaler().fit(X)
    X_transf = standard_scaler.transform(X)
    return X_transf, standard_scaler

# one hot encoding #
data_x_numeric = OneHotEncoder().fit_transform(data_x)
colnames_x_numeric = data_x_numeric.columns

# get boxcox transformation for each property type
boxcox_vars_property = [
    'first_price'
]
# difference between vars_x_numerical and boxcox_vars_property
boxcox_vars_all = ['diff_first_prediction', 'page_on_marketplace']
# box cox transformation by property type #
# subset data
idx_house = (data_x_numeric['property_type=house'] >= 1)
idx_apartment = (data_x_numeric['property_type=house'] < 1)

# get boxcox transformation
data_x_numeric.loc[idx_house, boxcox_vars_property], pt_house = boxcox(data_x_numeric.loc[idx_house, boxcox_vars_property])
data_x_numeric.loc[idx_apartment, boxcox_vars_property], pt_apartment = boxcox(data_x_numeric.loc[idx_apartment, boxcox_vars_property])
data_x_numeric[boxcox_vars_all], pt_all = boxcox(data_x_numeric[boxcox_vars_all])

# to numeric
data_x_numeric = data_x_numeric.to_numpy()


In [11]:
pt_all.feature_names_in_

array(['diff_first_prediction', 'page_on_marketplace'], dtype=object)

In [7]:
colnames_x_numeric

Index(['property_type=house', 'first_price', 'diff_first_prediction',
       'page_on_marketplace', 'latitude', 'longitude', 'woe_marketplace',
       'woe_seller', 'woe_id_sepomex', 'sine_tmonth', 'cosine_tmonth'],
      dtype='object')

# Fit Model

In [13]:
# split train & test
X_train, X_test, y_train, y_test = train_test_split(
    data_x_numeric, data_y, test_size=0.05, random_state=42, shuffle=True
)

In [14]:
# print shapes
print(X_train.shape)
print(X_test.shape)

(45027, 11)
(2370, 11)


In [15]:
# add monotonic constraints
monotone_constraints = len(colnames_x_numeric) * [0]

# add descreasing monotonic constraints for 'first_price'
monotone_constraints[colnames_x_numeric.to_list().index('first_price')] = 1
# add increasing monotonic constraints for 'diff_first_prediction'
monotone_constraints[colnames_x_numeric.to_list().index('diff_first_prediction')] = 1
# add increasing monotonic constraints for 'property_type=house'

monotone_constraints = tuple(monotone_constraints)
monotone_constraints

(0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0)

In [16]:
# fit weibull
xgboost_params = {
            "objective": "survival:aft",
            "eval_metric": "aft-nloglik",
            "aft_loss_distribution": "normal",
            "aft_loss_distribution_scale": 1,
            "tree_method": "hist",
            "learning_rate": 5e-2,
            "max_depth": 8,
            "booster": "dart",
            "subsample": 0.5,
            "min_child_weight": 50,
            "colsample_bynode": 0.5,
            'monotone_constraints': monotone_constraints
        }
xgbse_weibull = XGBSEStackedWeibull(xgb_params=xgboost_params)
xgbse_weibull.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    early_stopping_rounds=10,
    verbose_eval=50,
    time_bins = range(1, 171, 1)
)

[0]	validation-aft-nloglik:13.68248


[50]	validation-aft-nloglik:3.27984
[100]	validation-aft-nloglik:3.18555
[150]	validation-aft-nloglik:3.18202
[188]	validation-aft-nloglik:3.18119


In [17]:
def get_xgbse_mean_time(df):
    """Get mean time to event for a given time interval."""
    # get linespace from names of columns
    delta = df.columns.astype(int).to_numpy()
    # get survival probabilities as the values of the dataframe
    surv_probas = df.values

    # for each row, compute the area under the curve
    mean_time = np.array([simpson(y=y, x=delta) for y in surv_probas])

    return(mean_time)

def get_metrics(df):
    df = df.copy()
    cindex = concordance_index_censored(df['event'], df['observed_time'], df['risk_score'])[0]
    # rmse & mape for all with event as True
    rmse = np.sqrt(np.mean((df[df['event']]['predicted_time'] - df[df['event']]['observed_time'])**2))
    return pd.Series({'rmse': rmse, 'cindex': cindex})

def get_prediction_df(X, y, colnames, model):
    # get rmse, mape and cindex by listing & property type
    df_pred = (
        pd.DataFrame(X, columns=colnames)
        .assign(
            observed_time=y['Survival_in_days'],
            event=y['Status'],
            predicted_time=model.predict(X).pipe(get_xgbse_mean_time),
            risk_score=lambda x: - x['predicted_time']
        )
        .rename(columns={
        'property_type=house': 'property_type',
        })
        .assign(
            property_type=lambda x: np.where(x['property_type'] == 1, 'house', 'apartment'),
        )  
    )

    return df_pred

# get prediction df
df_pred = get_prediction_df(X_test, y_test, colnames_x_numeric, xgbse_weibull)

# get metrics
table_metrics = (
    df_pred
    .groupby(['property_type'])
    .apply(get_metrics)
)
table_metrics

Unnamed: 0_level_0,rmse,cindex
property_type,Unnamed: 1_level_1,Unnamed: 2_level_1
apartment,40.424149,0.719277
house,39.23938,0.71742


# Upload to MLflow

## Setup

In [18]:
# keys
os.environ["AWS_PROFILE"] = "default" # prod

# track server
TRACKING_SERVER_HOST = "mlflow.prod.dd360.mx" # fill in with the public DNS of the EC2 instance

# set uri
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:443")

# experiment
EXPERIMENT_NAME = "liquidity-sale-cdmx"
mlflow.set_experiment(EXPERIMENT_NAME)

<Experiment: artifact_location='s3://dd360-ds-artifacts/135', creation_time=1700156333492, experiment_id='135', last_update_time=1700156333492, lifecycle_stage='active', name='liquidity-sale-cdmx', tags={}>

## Start Run

In [19]:
# cloudpickle
pt_all_serialized = cloudpickle.dumps(pt_all)
pt_house_serialized = cloudpickle.dumps(pt_house)
pt_apartment_serialized = cloudpickle.dumps(pt_apartment)
xgbse_weibull_serialized = cloudpickle.dumps(xgbse_weibull)

In [20]:
# start run
with mlflow.start_run() as run:
    # set tags
    mlflow.set_tag('model', 'survival')
    mlflow.set_tag('model-type', 'xgbse-stacked-weibull')
    mlflow.set_tag('model-name', 'liquidity_v1')
    mlflow.set_tag('model-version', '1.0.0')
    mlflow.set_tag('model-description', 'Modelo de supervivencia para predecir el tiempo de venta de una propiedad')
    # log model

    # mlflow.log_artifact(xgbse_weibull, 'model')
    # log variables
    mlflow.log_param('variables', vars_x_names)
    mlflow.log_param('categorical_variables', vars_x_categorical)
    mlflow.log_param('discrete_variables', vars_x_discrete)
    mlflow.log_param('woe_variables', vars_x_woe)
    mlflow.log_param('numerical_variables', vars_x_numerical)
    mlflow.log_param('binary_variables', vars_x_binary)
    mlflow.log_param('geographic_variables', vars_x_geographic)
    mlflow.log_param('time_variables', vars_x_time)
    # log transformations
    mlflow.sklearn.log_model(pt_all_serialized, 'pt_all', serialization_format=mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE)
    mlflow.sklearn.log_model(pt_house_serialized, 'pt_house', serialization_format=mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE)
    mlflow.sklearn.log_model(pt_apartment_serialized, 'pt_apartment', serialization_format=mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE)
    mlflow.sklearn.log_model(xgbse_weibull_serialized, 'xgbse_weibull', serialization_format=mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE)
    # log all the table_metrics
    for index, row in table_metrics.iterrows():
        mlflow.log_metric(f"rmse_{index[0]}_{index[1]}", row['rmse'])
        mlflow.log_metric(f"cindex_{index[0]}_{index[1]}", row['cindex'])
    
# end run
mlflow.end_run()



## Try to load model

In [21]:
# # get log id
# log_id = "39d3eadaedf5499d9051fdfa94bd6994"

# # load models #
# # load power transform
# power_transform_load = cloudpickle.loads(mlflow.sklearn.load_model(f"runs:/{log_id}/power_transform"))
# # load standard scaler
# standard_scaler_load = cloudpickle.loads(mlflow.sklearn.load_model(f"runs:/{log_id}/standard_scaler"))
# # # load xgbse weibull
# xgbse_weibull_load =  cloudpickle.loads(mlflow.sklearn.load_model(f"runs:/{log_id}/xgbse_weibull"))

### Check

In [None]:
# # look transformed data is the same as original
# data_aux = data_x[vars_x_discrete + vars_x_geographic].copy()
# (pd.DataFrame(standard_scaler_load.inverse_transform(data_x_numeric_aux_scale), columns=location_cols_scale) - pd.DataFrame(standard_scaler_load.inverse_transform(data_x_numeric_aux_scale), columns=location_cols_scale)).describe()

In [None]:
# xgbse_weibull_load

In [None]:
# get one value
data_x.loc[0]

In [None]:
# def load_models(log_id):
#     """
#     Load models from mlflow
#     """
#     # get the model

#     # load power transform
#     power_transform_load = cloudpickle.loads(mlflow.sklearn.load_model(f"runs:/{log_id}/power_transform"))
#     # load standard scaler
#     standard_scaler_load = cloudpickle.loads(mlflow.sklearn.load_model(f"runs:/{log_id}/standard_scaler"))
#     # # load xgbse weibull
#     xgbse_weibull_load =  cloudpickle.loads(mlflow.sklearn.load_model(f"runs:/{log_id}/xgbse_weibull"))

#     # save them into a dictionary
#     models = {
#         "power_transform": power_transform_load,
#         "standard_scaler": standard_scaler_load,
#         "xgbse_weibull": xgbse_weibull_load
#     }
#     return models

# LOG_ID = "39d3eadaedf5499d9051fdfa94bd6994"
# models = load_models(LOG_ID)


# Save Model 

In [22]:
# create directory
os.makedirs('models', exist_ok=True)

In [23]:
# save to cloudpickle
with open('models/pt_all.pkl', 'wb') as f:
    cloudpickle.dump(pt_all, f)

with open('models/pt_house.pkl', 'wb') as f:
    cloudpickle.dump(pt_house, f)

with open('models/pt_apartment.pkl', 'wb') as f:
    cloudpickle.dump(pt_apartment, f)

with open('models/xgbse_weibull.pkl', 'wb') as f:
    cloudpickle.dump(xgbse_weibull, f)    