***

<img src='teradata_logo.png' alt='Teradata' width='200'/>


# [Micro or Partition Modelling using STOs Demo](#title)

***


## [Table of Contents](#toc)

1. [Introduction](#Sec_1)
2. [Connection to Vantage](#Sec_2)
3. [Modelling with VAL](#Sec_3)
    1. [Model Training](#Sec_3.1.1)
    2. [Model Evaluation Report](#Sec_3.1.2)
    3. [Scoring and Evalution](#Sec_3.1.3)
4. [Code for data upload to Vantage](#Sec_4)
        
***


## [1. Introduction](#Sec_1)


In [1]:
from teradataml import DataFrame, create_context, remove_context, copy_to_sql
from teradatasqlalchemy.types import VARCHAR, BIGINT, CLOB
import getpass

In [2]:
# non-TD Libraries
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import numpy as np

import json
import base64
import dill
import uuid

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  from ._gradient_boosting import predict_stages
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  from ._gradient_boosting import predict_stages


## [2. Connection to Vantage](#Sec_2)

In [3]:
# Establish connection to AOPS Teradata Vantage instance 
host = "3.238.151.85"
username = "AOA_DEMO" #update username as needed
password = getpass.getpass() #FppT4qdna7
database_name = "AOA_DEMO"

 ··········


In [4]:
# create the connection using credentials
eng = create_context(host=host, username=username, password=password)

## 3. Training

In [8]:
model_version = str(uuid.uuid4())
hyperparams = {
    "rand_seed": 111,
    "n_estimators": 10
    }

data_conf = {
    "data_table": "DEMAND_FORECAST_TRAIN_VAL",
    "numeric_columns": ["center_id", "meal_id", "checkout_price", "base_price",
       "emailer_for_promotion", "homepage_featured", "op_area"],
    "target_column": "num_orders",
    "categorical_columns": ["center_type", "category", "cuisine"],
    "partition_column": "center_id"
}

In [16]:
def train_partition(partition, model_version, hyperparams):
    partition = partition.read() # read returns pandas df
    numeric_columns = data_conf["numeric_columns"]
    target_column = data_conf["target_column"]
    categorical_columns = data_conf["categorical_columns"]
    features = numeric_columns + categorical_columns
    partition_column = data_conf["partition_column"]

    # modelling pipeline: feature encoding + algorithm
    numeric_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median"))])
    oh_encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
    regressor = RandomForestRegressor(random_state=hyperparams["rand_seed"],
                                      n_estimators=hyperparams["n_estimators"]
                                     )
    preprocessor = ColumnTransformer(transformers=[
        ("num", numeric_transformer, numeric_columns),
        ("cat", oh_encoder, categorical_columns)])

#     preprocessor = ColumnTransformer(transformers=[
#                                     ("cat", oh_encoder, categorical_columns)],
#                                     remainder="passthrough")

    model = Pipeline([("preprocessor", preprocessor),
                         ("regressor", regressor)])

    # data loading and formating
    train_df = partition[features + [target_column]]
#     if "id" in train_df.columns:
#         train_df.set_index("id", inplace=True)
    train_df[categorical_columns] = train_df[categorical_columns].astype("category")
#    train_df[numeric_columns] = train_df[numeric_columns].astype("float")

#     print('Loaded data ...')
    # preprocess training data and train the model
    X_train = train_df[features]
    y_train = train_df[target_column]
    model.fit(X_train, y_train)

#     print("Finished training")
    model.features = features

    partition_id = partition.loc[0, partition_column]
    artefact = base64.b64encode(dill.dumps(model))

    # record whatever partition level information you want like rows, 
    # data stats, explainability, etc
    partition_metadata = json.dumps({
        "num_rows": partition.shape[0],
        "hyper_parameters": hyperparams
    })

    return np.array([partition_id, model_version, partition.shape[0], partition_metadata, artefact])
    #return np.array([partition_id, model_version, partition.shape[0], "{}", ""])

In [17]:
print("Starting training...")
#df = DataFrame(data_conf["data_table"])
#df = DataFrame.from_query('select top 1000 * from DEMAND_FORECAST_TRAIN_VAL')
df = DataFrame.from_query("SELECT top 1000 * FROM {table}".format(table=data_conf["data_table"]))
model_df = df.map_partition(lambda partition: train_partition(partition, model_version, hyperparams),
                            data_partition_column=data_conf["partition_column"],
                            returns=dict([("partition_id", VARCHAR()),
                                      ("model_version", VARCHAR()),
                                      ("num_rows", BIGINT()),
                                      ("partition_metadata", CLOB()),
                                      ("model_artefact", CLOB())]),
                            exec_mode = "LOCAL"
                            )
# materialize as we reuse result
#model_df = DataFrame(model_df._table_name, materialize=True)

# append to models table
#model_df.to_sql("aoa_sto_models", if_exists="append")
copy_to_sql(df=model_df, table_name="aoa_sto_models_beta", schema_name='AOA_DEMO', if_exists="append", index=True, index_label='index')
print("Finished training")

Starting training...
Finished training


In [18]:
model_df

Unnamed: 0,partition_id,model_version,num_rows,partition_metadata,model_artefact
0,89,6a4ade3f-1d76-4831-8fef-62d4f92b2f5d,1000,"{""num_rows"": 1000, ""hyper_parameters"": {""rand_...",gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...


In [19]:
output_df = DataFrame("aoa_sto_models_beta")
output_df

   partition_id                         model_version  num_rows                                                              partition_metadata                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

## 3. Evaluation

In [5]:
from sklearn import metrics
from aoa.sto.util import save_metadata, save_evaluation_metrics
data_conf = {
    "data_table": "DEMAND_FORECAST_TEST_VAL",
    "numeric_columns": ["center_id", "meal_id", "checkout_price", "base_price",
       "emailer_for_promotion", "homepage_featured", "op_area"],
    "target_column": "num_orders",
    "categorical_columns": ["center_type", "category", "cuisine"],
    "partition_column": "center_id"
}

In [6]:
def eval_partition(partition):
    target_column = data_conf["target_column"]
    partition = partition.read()
    model_artefact = partition.loc[partition['n_row'] == 1, 'model_artefact'].iloc[0]
    model = dill.loads(base64.b64decode(model_artefact))

    X_test = partition[model.features]
    y_test = partition[[target_column]]

    y_pred = model.predict(X_test)

    partition_id = partition.partition_ID.iloc[0]

    # record whatever partition level information you want like rows, data stats, metrics, explainability, etc
    partition_metadata = json.dumps({
        "num_rows": partition.shape[0],
        "metrics": {
            "MAE": "{:.2f}".format(metrics.mean_absolute_error(y_test, y_pred)),
            "MSE": "{:.2f}".format(metrics.mean_squared_error(y_test, y_pred)),
            "R2": "{:.2f}".format(metrics.r2_score(y_test, y_pred))
        }
    })

    return np.array([[partition_id, partition.shape[0], partition_metadata]])

In [9]:
# we join the model artefact to the 1st row of the data table so we can load it in the partition

partition_id = data_conf["partition_column"]

query = f"""
SELECT d.*, CASE WHEN n_row=1 THEN m.model_artefact ELSE null END AS model_artefact 
    FROM (SELECT x.*, ROW_NUMBER() OVER (PARTITION BY x.{partition_id} ORDER BY x.{partition_id}) AS n_row 
    FROM {data_conf["data_table"]} x) AS d
    LEFT JOIN aoa_sto_models m
    ON d.{partition_id} = CAST (m.partition_id AS BIGINT)
    WHERE m.model_version =  '75ded33b-45a8-4398-a231-1cae281b7f1a';    --'{model_version}'
"""
#query
df = DataFrame(query=query)

In [None]:
df = df.iloc[0:999]
eval_df = df.map_partition(lambda partition: eval_partition(partition),
                           data_partition_column=partition_id,
                           returns=dict([("partition_id", VARCHAR()),
                                      ("num_rows", BIGINT()),
                                      ("partition_metadata", CLOB())]),
                            exec_mode = "LOCAL"
                          )

# materialize as we reuse result
#eval_df = DataFrame(eval_df._table_name, materialize=True)

#save_metadata(eval_df)
#save_evaluation_metrics(eval_df, ["MAE", "MSE", "R2"])
#df.head(1)
#print("Finished evaluation")

In [13]:
import numpy as np
import pandas as pd
#df = pd.DataFrame(np.random.randint(0, 9, [10, 3]))
dft = df.iloc[0:3]
dft

Unnamed: 0,0,1,2
0,5,1,4
1,5,3,2
2,8,6,4


## Sandbox

In [6]:
numeric_columns = data_conf["numeric_columns"]
target_column = data_conf["target_column"]
categorical_columns = data_conf["categorical_columns"]
features = numeric_columns + categorical_columns
q = 'SELECT center_id FROM {table} GROUP BY 1;'.format(table=data_conf["data_table"])  
df = DataFrame.from_query(q)
partition_ids = list(df.dropna().get_values().flatten())
partition_id = partition_ids[0]
q = 'SELECT * FROM {table} WHERE center_id={pid};'.format(table=data_conf["data_table"], pid=partition_id)
partition = DataFrame.from_query(q)
#train_df = partition.select([features + [target_column]])
#train_pdf = train_df.to_pandas(all_rows=True)

In [12]:
#pdf = partition.to_pandas()
pdf[features + [target_column]]

Unnamed: 0_level_0,center_id,meal_id,checkout_price,base_price,emailer_for_promotion,homepage_featured,op_area,center_type,category,cuisine,num_orders
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
348233,24,2126,448.14,533.53,0,0,3.6,TYPE_B,Pasta,Italian,215
299537,24,2322,322.10,322.10,0,0,3.6,TYPE_B,Beverages,Continental,364
90479,24,1902,445.23,447.23,0,0,3.6,TYPE_B,Biryani,Indian,69
234548,24,2577,292.03,291.03,0,0,3.6,TYPE_B,Starters,Thai,310
199066,24,2322,347.26,466.63,0,1,3.6,TYPE_B,Beverages,Continental,297
...,...,...,...,...,...,...,...,...,...,...,...
161382,24,1445,618.86,618.86,0,0,3.6,TYPE_B,Seafood,Continental,81
87400,24,1902,447.23,447.23,0,0,3.6,TYPE_B,Biryani,Indian,28
215237,24,2577,292.03,292.03,0,0,3.6,TYPE_B,Starters,Thai,243
279961,24,2126,533.53,532.53,0,0,3.6,TYPE_B,Pasta,Italian,82


In [21]:
# modelling pipeline: feature encoding + algorithm
numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median"))])
oh_encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
regressor = RandomForestRegressor(random_state=hyperparams["rand_seed"],
                                  n_estimators=hyperparams["n_estimators"]
                                 )
preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_transformer, numeric_columns),
    ("cat", oh_encoder, categorical_columns)])

# preprocessor = ColumnTransformer(transformers=[
#                                 ("cat", oh_encoder, categorical_columns)],
#                                 remainder="passthrough")

model = Pipeline([("preprocessor", preprocessor),
                     ("regressor", regressor)])

# data loading and formating
# read training dataset from Teradata and convert to pandas
train_df = partition.select([features + [target_column]])
train_pdf = train_df.to_pandas(all_rows=True)
if "id" in train_df.columns:
    train_df.set_index("id", inplace=True)
train_pdf[categorical_columns] = train_pdf[categorical_columns].astype("category")
#    train_pdf[numeric_columns] = train_pdf[numeric_columns].astype("float")

print('Loaded data ...')

# preprocess training data and train the model
X_train = train_pdf[features]
y_train = train_pdf[target_column]
model.fit(X_train, y_train)

print("Finished training")


Loaded data ...
Finished training


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  X_int = np.zeros_like(X, dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  X_mask = np.ones_like(X, dtype=np.bool)


In [16]:
#partition.loc[0, 'center_id']
#partition.select('center_id').iloc[0].get_values()[0][0]
#partition.shape[0]

AttributeError: '_SQLColumnExpression' object has no attribute 'loc'

In [None]:
model.features = features

partition_id = partition.select('center_id').iloc[0].get_values()[0][0]
artefact = base64.b64encode(dill.dumps(model))

# record whatever partition level information you want like rows, 
# data stats, explainability, etc
partition_metadata = json.dumps({
    "num_rows": partition.shape[0],
    "hyper_parameters": hyperparams
})

dict([("partition_id", partition_id), 
      ("model_version", model_version),
      ("num_rows", partition.shape[0]),
      ("partition_metadata", partition_metadata),
      ("model_artefact", artefact)])

In [None]:
print("Starting training...")
df = DataFrame.from_query('select top 100 * from DEMAND_FORECAST_TRAIN_VAL')
model_df = df.map_partition(lambda partition: train_partition(partition, model_version, hyperparams),
                            data_partition_column="center_id",
                            exec_mode = "LOCAL",
                            returns=dict([("partition_id", VARCHAR()),
                                      ("model_version", VARCHAR()),
                                      ("num_rows", BIGINT()),
                                      ("partition_metadata", CLOB()),
                                      ("model_artefact", CLOB())])
                            )
# materialize as we reuse result
#model_df = DataFrame(model_df._table_name, materialize=True)

# append to models table
#model_df.to_sql("aoa_sto_models", if_exists="append")

print("Finished training")

#remove_context()

In [22]:
def test_partition(partition):
    partition = partition.read()
    partition_id = partition.loc[0, 'center_id']
    return np.asarray([partition_id, partition.shape[0]])

In [23]:
df = DataFrame.from_query('select top 1000 * from DEMAND_FORECAST_TRAIN_VAL')
model_df = df.map_partition(test_partition,
                            returns = dict([("partition id", INTEGER()),
                                            ("partition size", INTEGER())
                                           ]),
                            data_partition_column="center_id",
                            exec_mode = "LOCAL",
                            )

In [24]:
model_df

Unnamed: 0,partition id,partition size
0,89,1000


In [15]:
def test_partition(partition):
    partition = partition.read()
    return partition


In [16]:
df = DataFrame.from_query('select top 1000 * from DEMAND_FORECAST_TRAIN_VAL')
model_df = df.map_partition(test_partition,                            
                            data_partition_column="center_id",
                            exec_mode = "LOCAL",
                            )

In [42]:
model_df.dtypes
model_df.loc[0, 'center_id']
model_df

Unnamed: 0,week,center_id,meal_id,checkout_price,base_price,emailer_for_promotion,homepage_featured,num_orders,city_code,region_code,center_type,op_area,category,cuisine,index
0,1,88,1778,171.75,171.75,0,0,121,526,34,TYPE_A,4.1,Beverages,Italian,2263
1,1,64,1109,267.75,266.75,0,0,486,553,77,TYPE_A,4.4,Rice Bowl,Indian,1325
2,1,68,1109,304.58,305.58,0,0,350,676,34,TYPE_B,4.1,Rice Bowl,Indian,2814
3,1,92,2826,281.36,281.36,0,0,122,526,34,TYPE_C,2.9,Sandwich,Italian,999
4,1,99,1727,447.23,446.23,0,0,431,596,71,TYPE_A,4.5,Rice Bowl,Indian,734
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,117,26,1558,432.68,641.23,1,0,189,515,77,TYPE_C,3.0,Pizza,Continental,363180
996,117,64,1971,322.07,321.07,0,0,121,553,77,TYPE_A,4.4,Sandwich,Italian,362854
997,117,110,2631,145.56,143.56,0,0,69,485,77,TYPE_A,3.8,Beverages,Indian,364322
998,117,99,2322,395.76,503.43,0,0,352,596,71,TYPE_A,4.5,Beverages,Continental,362181


In [18]:
def test_partition(partition, model_version, hyperparams):
#def test_partition(model_version, hyperparams):
    partition = partition.read()
    partition_id = partition.loc[0,'center_id']
    #raise Exception("my_test_exception")
    return np.array([partition_id, model_version, partition.shape[0], "{}", ""])


In [19]:
print("Starting training...")
df = DataFrame.from_query('select top 1000 * from DEMAND_FORECAST_TRAIN_VAL')
model_df = df.map_partition(lambda partition: test_partition(partition, model_version, hyperparams),
                            returns=dict([("partition_id", VARCHAR()),
                                      ("model_version", VARCHAR()),
                                      ("num_rows", BIGINT()),
                                      ("partition_metadata", CLOB()),
                                      ("model_artefact", CLOB())]),
                            data_partition_column="center_id",
                            exec_mode = "LOCAL"
                            )
print("Finished training...")

Starting training...


In [20]:
model_df

Unnamed: 0,partition_id,model_version,num_rows,partition_metadata,model_artefact
0,36,50f54e85-b95d-49f0-a361-73841530da10,1000,{},


In [42]:
from teradataml import remove_context
remove_context()

True