In [1]:
import os
import io
import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
import mlflow
import dvc.api
import warnings

from custom_data_transformers.utils import HouseColumnTransformerFunc

warnings.filterwarnings("ignore")

pipeline_path = "pipe.pkl"
model_path = "model.pkl"

path = 'dataset/housing.csv'
repo = '../.git'
version = 'v1' #Git tag

data_url = dvc.api.get_url(
    path = path,
    repo = repo,
    rev = version
)

In [None]:
def fetch_data_from_s3(path, repo, version):
    data = dvc.api.read(
            path = path,
            repo = repo,
            rev = version
        )
    return pd.read_csv(io.StringIO(data), sep=',')


def fetch_data_from_fs(url):
    return pd.read_csv(url, sep=',')


def fetch_data(url):
    storage_type = url.split(":")[0]
    if storage_type.upper() == "S3":
        data = fetch_data_from_s3(path, repo, version)
    else:
        data = fetch_data_from_fs(url)
        
    return data

In [4]:
backend_uri = os.environ['MLFLOW_TRACKING_URI']
artifact_uri = os.environ['MLFLOW_ARTIFACT_STORE']
mlflow.set_tracking_uri(backend_uri)

In [None]:
housing = fetch_data(data_url)
housing.head()

### Create pipeline

In [6]:
targetCol = "median_house_value"
catCols = ["ocean_proximity"]
numCols = ["housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]

customColTrans = FunctionTransformer(HouseColumnTransformerFunc, 
                                     kw_args={"add_bedrooms_per_room": False})

num_pipeline = Pipeline([
                    ("Imputer", SimpleImputer()),
                    ("Scaler", StandardScaler()),
                    ("Transform", customColTrans)
                ])

full_pipeline = ColumnTransformer([
                    ("Numerical_Pipeline", num_pipeline, numCols),
                    ("OneHot", OneHotEncoder(), catCols)
                ])

### Data processing

In [7]:
x = housing[numCols + catCols]
y = housing[targetCol]

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42, shuffle=True)
x_train = full_pipeline.fit_transform(x_train)
x_test = full_pipeline.transform(x_test)

In [8]:
x_test.shape

(4128, 10)

### Build Model

In [9]:
model = LinearRegression()
model.fit(x_train, y_train)
predictions = model.predict(x_test)
rmse = np.sqrt(mean_squared_error(y_test, predictions))
print(rmse)

73202.85375260453


### Save the pipeline and the model

In [10]:
joblib.dump(full_pipeline, pipeline_path);
joblib.dump(model, model_path);

In [12]:
# Create an `artifacts` dictionary that assigns a unique name to the saved pipeline and the model
# This dictionary will be passed to `mlflow.pyfunc.save_model` or `mlflow.pyfunc.log_model`, which will 
# copy the model file into the new MLflow Model's directory.
artifacts = {
    "pipeline": pipeline_path,
    "model": model_path
}

# Define the model class
class ModelWrapper(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        self.pipeline = joblib.load(context.artifacts["pipeline"])
        self.model = joblib.load(context.artifacts["model"])

    def predict(self, context, model_input):
        model_input.columns = ["housing_median_age", "total_rooms", "total_bedrooms", 
                   "population", "households", "median_income", "ocean_proximity"]
        input_matrix = self.pipeline.transform(model_input)
        return self.model.predict(input_matrix)

In [14]:
experiment = mlflow.get_experiment_by_name("advanced_pipeline")
if experiment is None:
    experiment_id = mlflow.create_experiment("advanced_pipeline", artifact_location=artifact_uri)
    experiment = mlflow.get_experiment(experiment_id)

mlflow.set_experiment(experiment.name)

mlflow_pyfunc_model_path = "mlflow_advanced_pipeline"
with mlflow.start_run():
    mlflow.log_param('data_url', data_url)
    mlflow.log_param('data_version', version)
    mlflow.log_param('input_rows', x_train.shape[0])
    mlflow.log_param('input_columns', x_train.shape[1])
    
    mlflow.log_metric("rmse", rmse)
    mlflow.pyfunc.log_model(mlflow_pyfunc_model_path, 
                            python_model=ModelWrapper(), 
                            artifacts=artifacts)

The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet



In [14]:
# Save the MLflow Model
# mlflow_pyfunc_model_path = "mlflow_advanced_workflow"
# mlflow.pyfunc.save_model(
#         path=mlflow_pyfunc_model_path, python_model=ModelWrapper(), artifacts=artifacts,
#         conda_env=conda_env)