In [86]:
import sqlite3
import pandas as pd
import numpy as np
import yaml
import json

In [141]:
class DatabaseManager():
    def __init__(self, db_path='database.sqlite'):
        self.db_path = db_path
        self.dbtype = 'sqlite' 

    def start(self):
        self.conn = sqlite3.connect(self.db_path)

    def close(self):
        self.conn.close()

    def get_table_description(self, to_str=False):
        self.start()
        cur = self.conn.cursor()
        res = cur.execute('SELECT name FROM sqlite_master WHERE type="table"').fetchall()
        tables = list(map(lambda x: x[0], res))
        # select column names
        db_info = {}
        for t in tables:
            db_info[t] = []
            res = cur.execute(f'PRAGMA table_xinfo({t})').fetchall()
            for r in res:
                db_info[t].append(
                    {'column': r[1], 'type': r[2], 'pk': bool(r[5])}
                )
        cur.close()
        self.close()

        if to_str:
            s = ''
            for i, (k, v) in enumerate(db_info.items()):
                s += f'[Table: {k}] '
                for j, col in enumerate(v):
                    s += f'{col["column"]} {col["type"]}'
                    if col['pk']:
                        s += ' (pk)'
                    if j != len(v) - 1:
                        s += ', '
                if i != len(db_info) - 1:
                    s += '\n'
 
            return s
        else:
            return db_info
    
    def summarize_table(self, table_name):
        """Should be defined by the creator, e.g. number of the row, accessable dates, etc. 
        Show this to user to know what do we have in the table."""
        cur = self.conn.cursor()
        cur.close()
        return 

    def exec_query(self, query, rt_pandas=True):
        self.start()
        try:
            cur = self.conn.cursor()
            res = cur.execute(query)
            if rt_pandas:
                columns = [x[0] for x in res.description]
                df = pd.DataFrame(res.fetchall(), columns=columns)
                cur.close()
                self.close()
                return df
            else:
                cur.close()
                self.close()
                return res.fetchall()
        except Exception as e:
            self.close()
            return e
        
class KnowledgeManager():
    def __init__(self, knowledge_path='knowledges.yml'):
        self.knowledge_path = knowledge_path
        with open(self.knowledge_path, 'r') as file:
            self.knowledges = yaml.safe_load(file)

    def get_custom_knowledge(self, filter_key=None, to_str: bool=False):
        if filter_key is not None:
            ks = {k: dict(filter(lambda x: x[0] == filter_key, v.items())) for k, v in self.knowledges.items()}
        else:
            ks = self.knowledges

        if to_str:
            return json.dumps(ks)
        else:
            return ks
        
    def get_knowledge_keys(self):
        ks = list(self.knowledges.keys())
        return dict(map(lambda x: (f'[{x[0]}]', f'{x[1]}'), enumerate(ks)))
        
    def ml_process(self, process_name: str, model_name: str):
        return

In [150]:
db = DatabaseManager('../sales.db')
km = KnowledgeManager('../knowledges.yml')

'{"Sales": {"formula": "UnitPrice * (1 - DiscountApplied) * OrderQuantity"}, "Revenue": {"formula": "(UnitPrice * (1 - DiscountApplied) - UnitCost) * OrderQuantity"}, "DeliverSpeed": {"formula": "DeliveryDate - OrderDate"}, "ShippingSpeed": {"formula": "DeliveryDate - ShipDate"}}'

## 1. Extract data

Q: Predict the revenue with saleschannel.

In [256]:
sql = """
SELECT *
FROM sales_team
"""
df = db.exec_query(sql, rt_pandas=True)
df.head()

Unnamed: 0,SalesTeamID,SalesTeam,Region
0,1,Adam Hernandez,Northeast
1,2,Keith Griffin,Northeast
2,3,Jerry Green,West
3,4,Chris Armstrong,Northeast
4,5,Stephen Payne,South


In [258]:
sql = """
SELECT 
    product.ProductName,
    sales_team.Region,
    AVG(julianday(sales.DeliveryDate) - julianday(sales.OrderDate)) AS AvgDeliverySpeed, 
    AVG(julianday(sales.DeliveryDate) - julianday(sales.ShipDate)) AS AvgShippingSpeed, 
    SUM(CASE WHEN sales.SalesChannel = 'Distributor' THEN 1 ELSE 0 END) AS Distributor,
    SUM(CASE WHEN sales.SalesChannel = 'In-Store' THEN 1 ELSE 0 END) AS InStore,
    SUM(CASE WHEN sales.SalesChannel = 'Online' THEN 1 ELSE 0 END) AS Online,
    SUM(CASE WHEN sales.SalesChannel = 'Wholesale' THEN 1 ELSE 0 END) AS Retailer,
    SUM((sales.UnitPrice * (1 - sales.DiscountApplied) - sales.UnitCost) * sales.OrderQuantity) AS TotalRevenue
FROM sales
JOIN product ON sales.ProductID = product.ProductID
JOIN sales_team ON sales.SalesTeamID = sales_team.SalesTeamID
GROUP BY product.ProductName, strftime('%Y-%m', sales.OrderDate)
ORDER BY product.ProductName, sales.OrderDate;
"""
df = db.exec_query(sql, rt_pandas=True)
df.head()

Unnamed: 0,ProductName,Region,AvgDeliverySpeed,AvgShippingSpeed,Distributor,InStore,Online,Retailer,TotalRevenue
0,Accessories,West,7.0,5.0,0,0,0,1,5486.74
1,Accessories,West,18.888889,5.666667,2,5,1,1,33693.3325
2,Accessories,Midwest,21.142857,6.571429,0,4,2,1,51687.755
3,Accessories,Northeast,23.5,7.0,0,2,2,0,15865.04
4,Accessories,West,18.666667,5.0,1,5,2,1,36304.76


In [259]:
sql = """
SELECT 
    product.ProductName,
    sales_team.Region,
    AVG(julianday(sales.DeliveryDate) - julianday(sales.OrderDate)) AS AvgDeliverySpeed, 
    AVG(julianday(sales.DeliveryDate) - julianday(sales.ShipDate)) AS AvgShippingSpeed, 
    SUM(CASE WHEN sales.SalesChannel = 'Distributor' THEN 1 ELSE 0 END) AS Distributor,
    SUM(CASE WHEN sales.SalesChannel = 'In-Store' THEN 1 ELSE 0 END) AS InStore,
    SUM(CASE WHEN sales.SalesChannel = 'Online' THEN 1 ELSE 0 END) AS Online,
    SUM(CASE WHEN sales.SalesChannel = 'Wholesale' THEN 1 ELSE 0 END) AS Retailer,
    LEAD(SUM((sales.UnitPrice * (1 - sales.DiscountApplied) - sales.UnitCost) * sales.OrderQuantity)) 
        OVER (PARTITION BY product.ProductName ORDER BY sales.OrderDate) AS FutureRevenue
FROM sales
JOIN product ON sales.ProductID = product.ProductID
JOIN sales_team ON sales.SalesTeamID = sales_team.SalesTeamID
GROUP BY product.ProductName, strftime('%Y-%m', sales.OrderDate)
ORDER BY product.ProductName, sales.OrderDate;
"""
df = db.exec_query(sql, rt_pandas=True).dropna()
df.head()

Unnamed: 0,ProductName,Region,AvgDeliverySpeed,AvgShippingSpeed,Distributor,InStore,Online,Retailer,FutureRevenue
0,Accessories,West,7.0,5.0,0,0,0,1,33693.3325
1,Accessories,West,18.888889,5.666667,2,5,1,1,51687.755
2,Accessories,Midwest,21.142857,6.571429,0,4,2,1,15865.04
3,Accessories,Northeast,23.5,7.0,0,2,2,0,36304.76
4,Accessories,West,18.666667,5.0,1,5,2,1,12762.7225


## 2. model training pipeline

In [260]:
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
from sklearn.preprocessing import OneHotEncoder, StandardScaler, OrdinalEncoder
from sklearn.model_selection import StratifiedShuffleSplit

In [268]:
# 1. Split data
spliter = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42)
train_idx, test_idx = next(spliter.split(df, df['ProductName']))
df_train = df.iloc[train_idx]
df_test = df.iloc[test_idx]

X_train, y_train = df_train.drop('FutureRevenue', axis=1), df_train[['FutureRevenue']]
X_test, y_test = df_test.drop('FutureRevenue', axis=1), df_test[['FutureRevenue']]

# 2. Model Pipeline
linear_pipe = Pipeline(steps=[
    ('preprocessing', ColumnTransformer(
        transformers=[('cate', OneHotEncoder(categories='auto', dtype='int'), ['ProductName', 'Region'])], 
        remainder='passthrough', 
        verbose_feature_names_out=True)
    ),
    ('model', Ridge(alpha=0.01))
])

forest_pipe = Pipeline(steps=[
    ('preprocessing', ColumnTransformer(
        transformers=[('cate', OrdinalEncoder(), ['ProductName', 'Region'])],
        remainder='passthrough', 
        verbose_feature_names_out=True)),
    ('model', RandomForestRegressor(n_estimators=50, max_depth=2))
])

class Model:
    def __init__(self, model_name: str, pipeline: Pipeline):
        self.model_name = model_name
        self.scaler = StandardScaler()
        self.model = pipeline
        
    def fit(self, X, y):
        self.scaler.fit(y)
        y_true = self.scaler.transform(y).ravel()
        self.model.fit(X, y_true)

    def predict(self, X):
        y_pred = self.model.predict(X)
        if y_pred.ndim == 1:
            y_pred = y_pred.reshape(-1, 1)
        return self.scaler.inverse_transform(y_pred)

    def get_score(self, X, y):
        y_true = self.scaler.transform(y).ravel()
        y_pred = self.model.predict(X)
        mse = mean_squared_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        return mse, r2

# 3. Train
linear_model = Model(model_name='Linear', pipeline=linear_pipe)
linear_model.fit(X_train, y_train)

forest_model = Model(model_name='Forest', pipeline=forest_pipe)
forest_model.fit(X_train, y_train)

# 4. Evaluate
for model in [linear_model, forest_model]:
    print(model.model_name)
    mse_train, r2_train = model.get_score(X_train, y_train)
    mse_test, r2_test = model.get_score(X_test, y_test)
    print(f'Train MSE: {mse_train:.4f}, R2: {mse_train:.4f}')
    print(f'Test MSE: {mse_test:.4f}, R2: {mse_test:.4f}')

# 5. Save model
import joblib

joblib.dump(linear_model, '../models/linear.pkl')
joblib.dump(forest_model, '../models/forest.pkl')

# 6. Inference
test_pipe = joblib.load('../models/forest.pkl')

Linear
Train MSE: 0.9517, R2: 0.9517
Test MSE: 1.2385, R2: 1.2385
Forest
Train MSE: 0.9740, R2: 0.9740
Test MSE: 1.2502, R2: 1.2502


In [269]:
test_pipe = joblib.load('../models/linear.pkl')
print(test_pipe.predict(X_test)[:3])
print()
print(y_test[:3])

[[15953.80997514]
 [12348.7250761 ]
 [15562.74525817]]

     FutureRevenue
210      3360.0300
266     15750.3400
904     13811.3375


In [266]:
pipeline = {'LoadData': """
SELECT 
    product.ProductName,
    sales_team.Region,
    AVG(julianday(sales.DeliveryDate) - julianday(sales.OrderDate)) AS AvgDeliverySpeed, 
    AVG(julianday(sales.DeliveryDate) - julianday(sales.ShipDate)) AS AvgShippingSpeed, 
    SUM(CASE WHEN sales.SalesChannel = 'Distributor' THEN 1 ELSE 0 END) AS Distributor,
    SUM(CASE WHEN sales.SalesChannel = 'In-Store' THEN 1 ELSE 0 END) AS InStore,
    SUM(CASE WHEN sales.SalesChannel = 'Online' THEN 1 ELSE 0 END) AS Online,
    SUM(CASE WHEN sales.SalesChannel = 'Wholesale' THEN 1 ELSE 0 END) AS Retailer,
    LEAD(SUM((sales.UnitPrice * (1 - sales.DiscountApplied) - sales.UnitCost) * sales.OrderQuantity)) 
        OVER (PARTITION BY product.ProductName ORDER BY sales.OrderDate) AS FutureRevenue
FROM sales
JOIN product ON sales.ProductID = product.ProductID
JOIN sales_team ON sales.SalesTeamID = sales_team.SalesTeamID
GROUP BY product.ProductName, strftime('%Y-%m', sales.OrderDate)
ORDER BY product.ProductName, sales.OrderDate;
""",
'ImportPackage': """
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
from sklearn.preprocessing import OneHotEncoder, StandardScaler, OrdinalEncoder
from sklearn.model_selection import StratifiedShuffleSplit
""",
'SplitData': """
spliter = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42)
train_idx, test_idx = next(spliter.split(df, df['ProductName']))
df_train = df.iloc[train_idx]
df_test = df.iloc[test_idx]

X_train, y_train = df_train.drop('TotalRevenue', axis=1), df_train[['TotalRevenue']]
X_test, y_test = df_test.drop('TotalRevenue', axis=1), df_test[['TotalRevenue']]
""",
'CreateModel': """
linear_pipe = Pipeline(steps=[
    ('preprocessing', ColumnTransformer(
        transformers=[('cate', OneHotEncoder(categories='auto', dtype='int'), ['ProductName', 'Region'])], 
        remainder='passthrough', 
        verbose_feature_names_out=True)
    ),
    ('model', Ridge(alpha=0.01))
])

forest_pipe = Pipeline(steps=[
    ('preprocessing', ColumnTransformer(
        transformers=[('cate', OrdinalEncoder(), ['ProductName', 'Region'])],
        remainder='passthrough', 
        verbose_feature_names_out=True)),
    ('model', RandomForestRegressor(n_estimators=300, max_depth=5))
])

class Model:
    def __init__(self, model_name: str, pipeline: Pipeline):
        self.model_name = model_name
        self.scaler = StandardScaler()
        self.model = pipeline
        
    def fit(self, X, y):
        self.scaler.fit(y)
        y_true = self.scaler.transform(y).ravel()
        self.model.fit(X, y_true)

    def predict(self, X):
        y_pred = self.model.predict(X)
        if y_pred.ndim == 1:
            y_pred = y_pred.reshape(-1, 1)
        return self.scaler.inverse_transform(y_pred)

    def get_score(self, X, y):
        y_true = self.scaler.transform(y).ravel()
        y_pred = self.model.predict(X)
        mse = mean_squared_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        return mse, r2
""",
'Train': """
linear_model = Model(model_name='Linear', pipeline=linear_pipe)
linear_model.fit(X_train, y_train)

forest_model = Model(model_name='Forest', pipeline=forest_pipe)
forest_model.fit(X_train, y_train)
""",
'Evaluate': """
for model in [linear_model, forest_model]:
    print(model.model_name)
    mse_train, r2_train = model.get_score(X_train, y_train)
    mse_test, r2_test = model.get_score(X_test, y_test)
    print(f'Train MSE: {mse_train:.4f}, R2: {mse_train:.4f}')
    print(f'Test MSE: {mse_test:.4f}, R2: {mse_test:.4f}')
""",
'SaveModel': """
import joblib

joblib.dump(linear_model, '../models/linear.pkl')
joblib.dump(forest_model, '../models/forest.pkl')
""",
'Inference': """
test_pipe = joblib.load('../models/linear.pkl')
test_pipe.predict(X_test)
"""
}

joblib.dump(pipeline, '../knowledges/revenue.pkl')

['../knowledges/revenue.pkl']

In [168]:
sql = """
SELECT 
    product.ProductName,
    AVG(julianday(sales.DeliveryDate) - julianday(sales.OrderDate)) AS AvgDeliverySpeed, 
    AVG(julianday(sales.DeliveryDate) - julianday(sales.ShipDate)) AS AvgShippingSpeed, 
    SUM(CASE WHEN sales.SalesChannel = 'Distributor' THEN 1 ELSE 0 END) AS Distributor,
    SUM(CASE WHEN sales.SalesChannel = 'In-Store' THEN 1 ELSE 0 END) AS InStore,
    SUM(CASE WHEN sales.SalesChannel = 'Online' THEN 1 ELSE 0 END) AS Online,
    SUM(CASE WHEN sales.SalesChannel = 'Wholesale' THEN 1 ELSE 0 END) AS Retailer,
    SUM((sales.UnitPrice * (1 - sales.DiscountApplied) - sales.UnitCost) * sales.OrderQuantity) AS TotalRevenue
FROM sales
JOIN product ON sales.ProductID = product.ProductID
WHERE product.ProductName = 'Baseball' AND strftime('%Y-%m', sales.OrderDate) = '2022-01'
GROUP BY product.ProductName, strftime('%Y-%m', sales.OrderDate)
ORDER BY product.ProductName, sales.OrderDate;
"""
df = db.exec_query(sql, rt_pandas=True)
df.head()

Unnamed: 0,ProductName,AvgDeliverySpeed,AvgShippingSpeed,Distributor,InStore,Online,Retailer,TotalRevenue
0,Baseball,23.25,5.75,0,2,0,2,3895.15


In [176]:
test_pipe = joblib.load('../models/forest.pkl')
test_pipe.predict(df)

array([[10290.80774279]])