# DE pipeline

In [1]:
# Create table in DB to store the Data. This process will be done by the DE team.

import sqlalchemy
from loguru import logger
import pandas as pd
from sklearn import datasets
import numpy as np
import json

disk_engine = sqlalchemy.create_engine('sqlite:///data_db.db', echo=False)

try:
    with disk_engine.connect() as con:
        con.execute("SELECT 1")
    logger.info('Engine is valid')
except Exception as e:
    logger.info(f'Engine invalid: {str(e)}')

2022-04-12 19:59:37.047 | INFO     | __main__:<module>:15 - Engine is valid


In [2]:
iris = datasets.load_iris()
df_iris = pd.DataFrame(data = np.c_[iris['data'], iris['target']],
            columns = iris['feature_names'] + ['target'])
df_iris.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,0.0
1,4.9,3.0,1.4,0.2,0.0
2,4.7,3.2,1.3,0.2,0.0
3,4.6,3.1,1.5,0.2,0.0
4,5.0,3.6,1.4,0.2,0.0


In [3]:
df_iris.columns = df_iris.columns.str.replace(" ","_").str.replace("_\(cm\)","",regex=True)
df_iris['target'] = df_iris['target'].astype(int)
df_iris.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,target
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


In [4]:
df_iris_dict = {"data":df_iris.to_dict("records"),"target_variable":["target"],"independent_variables":["sepal_length","sepal_width","petal_length","petal_width"]}


with open('model_train_payload.json', 'w') as fp:
    json.dump(df_iris_dict, fp)

In [5]:
df_iris.to_sql(name='iris', 
            con = disk_engine,
            if_exists='replace',
            index=False,
            dtype={'sepal_length': sqlalchemy.types.Float(precision=4, asdecimal=True),
                   'sepal_width': sqlalchemy.types.Float(precision=4, asdecimal=True),
                   'petal_length': sqlalchemy.types.Float(precision=4, asdecimal=True),
                   'petal_width': sqlalchemy.types.Float(precision=4, asdecimal=True),
                   'target': sqlalchemy.types.INTEGER()},
            chunksize=100,
            method="multi")

# ML Pipeline

In [6]:
# This is the modelling process which we will run in a pipeline whenever there is a data refresh happing or due to manual trigger.

import sqlalchemy
from loguru import logger
import pandas as pd
import json
import uuid
from sklearn.linear_model import LogisticRegression

def logistic_regression_to_json(lrmodel, file=None):
    if file is not None:
        serialize = lambda x: json.dump(x, file)
    else:
        serialize = json.dumps
    data = {}
    data['init_params'] = lrmodel.get_params()
    data['model_params'] = mp = {}
    for p in ('coef_', 'intercept_','classes_', 'n_iter_'):
        mp[p] = getattr(lrmodel, p).tolist()
    return serialize(data)


disk_engine = sqlalchemy.create_engine('sqlite:///data_db.db', echo=False)

try:
    with disk_engine.connect() as con:
        con.execute("SELECT 1")
    logger.info('engine is valid')
except Exception as e:
    logger.info(f'Engine invalid: {str(e)}')


iris_df_from_db = pd.read_sql_query('SELECT * FROM iris', disk_engine)
iris_df_from_db

2022-04-12 19:59:39.263 | INFO     | __main__:<module>:28 - engine is valid


Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,target
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,2
146,6.3,2.5,5.0,1.9,2
147,6.5,3.0,5.2,2.0,2
148,6.2,3.4,5.4,2.3,2


In [7]:
X = iris_df_from_db.drop(columns = ['target'])
y = iris_df_from_db[['target']]

In [8]:
X

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
0,5.1,3.5,1.4,0.2
1,4.9,3.0,1.4,0.2
2,4.7,3.2,1.3,0.2
3,4.6,3.1,1.5,0.2
4,5.0,3.6,1.4,0.2
...,...,...,...,...
145,6.7,3.0,5.2,2.3
146,6.3,2.5,5.0,1.9
147,6.5,3.0,5.2,2.0
148,6.2,3.4,5.4,2.3


In [9]:
y

Unnamed: 0,target
0,0
1,0
2,0
3,0
4,0
...,...
145,2
146,2
147,2
148,2


In [14]:
lr_model = LogisticRegression()
lr_model.fit(X, y)
model_response = logistic_regression_to_json(lr_model)
model_id = str(uuid.uuid4())
logger.info(f"Storing results for : {model_id}.")
parameters_df = pd.DataFrame({'model_id':[model_id], "response":[model_response] })
parameters_df

  y = column_or_1d(y, warn=True)
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
2022-04-12 20:12:20.826 | INFO     | __main__:<module>:5 - Storing results for : 29249f8c-6492-48ec-849a-851433b7c145.


Unnamed: 0,model_id,response
0,29249f8c-6492-48ec-849a-851433b7c145,"{""init_params"": {""C"": 1.0, ""class_weight"": nul..."


In [19]:
json.loads(model_response)

{'init_params': {'C': 1.0,
  'class_weight': None,
  'dual': False,
  'fit_intercept': True,
  'intercept_scaling': 1,
  'l1_ratio': None,
  'max_iter': 100,
  'multi_class': 'auto',
  'n_jobs': None,
  'penalty': 'l2',
  'random_state': None,
  'solver': 'lbfgs',
  'tol': 0.0001,
  'verbose': 0,
  'warm_start': False},
 'model_params': {'coef_': [[-0.4179559690983556,
    0.9661824148400735,
    -2.5214315192770207,
    -1.0840052333192296],
   [0.5307588192150016,
    -0.3143802929489565,
    -0.19909553551841389,
    -0.9489695571862117],
   [-0.11280285011669776,
    -0.65180212189115,
    2.720527054795414,
    2.0329747905054365]],
  'intercept_': [9.838323162947265, 2.2149726370800296, -12.053295800027412],
  'classes_': [0, 1, 2],
  'n_iter_': [100]}}

In [None]:
parameters_df['model_id'] = parameters_df['model_id'].astype(str)
parameters_df.dtypes

In [None]:
parameters_df.to_sql(name='model_parameters',
                    con= disk_engine,
                    if_exists='replace',
                    index=False,
                    dtype={'response': sqlalchemy.types.JSON(), 
                           'model_id': sqlalchemy.types.String()},
                    chunksize=100,
                    method="multi")

# Model serving

* Real time inference (RESTful API)
* Batch process (Event driven architecture)

In [None]:
import sqlalchemy
from loguru import logger
import pandas as pd
import json
import uuid
from sklearn.linear_model import LogisticRegression

def logistic_regression_from_json(jstring):
    data = json.loads(jstring)
    model = LogisticRegression(**data['init_params'])
    for name, p in data['model_params'].items():
        setattr(model, name, np.array(p))
    return model

disk_engine = sqlalchemy.create_engine('sqlite:///data_db.db', echo=False)

try:
    with disk_engine.connect() as con:
        con.execute("SELECT 1")
    logger.info('Engine is valid.')
except Exception as e:
    logger.info(f'Engine invalid: {str(e)}')

iris_df_from_db = pd.read_sql_query('SELECT * FROM iris', disk_engine)
X = iris_df_from_db.drop(columns = ['target'])
X

In [None]:
model_parameters_from_db = pd.read_sql_query('SELECT * FROM model_parameters', disk_engine)
d = eval((model_parameters_from_db[model_parameters_from_db['model_id'] == model_id]['response'][0]))
model_parameters_json = d.replace("'", "\"")
model_parameters_json

In [None]:
model_object = logistic_regression_from_json(model_parameters_json)
model_object.predict(X)