Import python & PySpark Library

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, to_date, year, sum, avg, max, desc, row_number, when, weekofyear, explode, format_string, round
from pyspark.sql.window import Window
import warnings
warnings.filterwarnings("ignore")
import statsmodels.api as sm
import statsmodels.formula.api as smf
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import functions as F
from sklearn.metrics import mean_squared_error
import catboost as cbt
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.model_selection import train_test_split

Data Preparation

In [0]:
# import product and transaction data
prod = spark.table("pro_mrg")
trxn = spark.table("transaction")
# convert trans_dt to date format, convert other numerical variables to double format
trxn = trxn.withColumn("trans_dt", to_date(col('trans_dt'), "M/d/yyyy"))
trxn = trxn.withColumn("sales_amt", col('sales_amt').cast('double'))
trxn = trxn.withColumn("sales_qty", col('sales_qty').cast('double'))
trxn = trxn.withColumn("sales_wgt", col('sales_wgt').cast('double'))
# extract week number from trans_dt 
trxn = trxn.withColumn('week', weekofyear('trans_dt'))
# drop abnormal data
trxn = trxn.filter((trxn.sales_amt>=0)&(trxn.sales_qty>=0)&(trxn.sales_wgt>=0))
# add a column of unit price (determined by qty or wgt)
trxn = trxn.withColumn("price", when(trxn["sales_wgt"] == 0, trxn["sales_amt"] / trxn["sales_qty"]).otherwise(trxn["sales_amt"] / trxn["sales_wgt"]))
# filter transaction in 2020 to reduce uncontrollable factors across years, meanwhile enhance computing efficiency; join two tables.
trxn = trxn.filter(year(trxn.trans_dt)==2020)
trxn = trxn.join(prod, on='prod_id', how='inner')
# calculate sales of each product and category
prod_sales = trxn.select(['prod_id','sales_qty']).groupBy('prod_id').agg(sum('sales_qty').alias('total_sales'))
cat_sales = trxn.select(['sales_qty','prod_category']).groupBy('prod_category').agg(sum('sales_qty').alias('total_sales'))

In [0]:
# sort product by descending sales; pick the top500 products of highest demand
total_prod_sales = prod_sales.select(sum('total_sales')).rdd.flatMap(lambda x: x).collect()[0]
prod_sales = prod_sales.withColumn("sales_percent", sum(prod_sales.total_sales).over(Window.orderBy(desc('total_sales')))/total_prod_sales)
# prod_sales.filter(prod_sales.sales_percent>0.8).count()/prod_sales.count() 
# 20% demands consist of 90% products, so we can choose main products to improve efficiency
prod_500 = prod_sales.limit(500)

In [0]:
# weekly avg_price and demand of each product
prod_500 = prod_500.select('prod_id').join(trxn, on='prod_id', how='inner')
weekly_prod = prod_500.groupBy(['prod_id','week']).agg(avg('price').alias('weekly_price'),
                                                      sum('sales_qty').alias('weekly_demand'))
weekly_prod = weekly_prod.toPandas()

Constant Elasticity Model - Price Response Function

Formula: log(demand/week)~log(price/week))

In [0]:
# weekly demand
prod_sales_df = weekly_prod.groupby('prod_id',as_index=False).weekly_demand.sum()
# log the both sides of price response function
weekly_prod['logP'] = np.log(weekly_prod["weekly_price"])
weekly_prod['logD'] = np.log(weekly_prod["weekly_demand"])
# use smf.ols regression to compute p-value and coefficient
# Analyze elasticity based on p-value and coefficient of each product.
prod = weekly_prod.prod_id.unique()
prod_elastisity = pd.DataFrame(columns = ['prod_id','elasticity','p_value'])
for i in prod:
    sample = weekly_prod[weekly_prod.prod_id==i]
    result = smf.ols('logD ~ logP', data=sample).fit()
    intercept, slope = result.params
    slope = float('{0:.2f}'.format(slope))
    p = float('{0:.2f}'.format(result.f_pvalue))
    new_row = pd.DataFrame(np.array([[i,slope,p]]),columns=['prod_id','elasticity','p_value'])
    prod_elastisity = pd.concat([prod_elastisity,new_row])

In [0]:
# choose 15 Elastic products for EDLP strategy 
# p<0.05 shows demand is significantly affected by price; elasticity<-1; choose products with high demands
prod_sales_df = weekly_prod.groupby('prod_id',as_index=False).weekly_demand.sum()
elastic = prod_elastisity[(prod_elastisity.elasticity<-1)&(prod_elastisity.p_value<0.05)].merge(prod_sales_df,on='prod_id').sort_values(['weekly_demand'])[0:15]
# choose 80 inElastic products for Hi-Lo strategy 
# -1<elasticity<0; choose products with high demands 
# small increase in price won't cause too much decrease in quantity demand
inelastic = prod_elastisity[(prod_elastisity.elasticity>-1)&(prod_elastisity.elasticity<0)].merge(prod_sales_df,on='prod_id').sort_values(['weekly_demand'])[0:80]

In [0]:
# plot demand-price of EDLP
k = elastic.prod_id.unique()[0]
plt.plot(weekly_prod[weekly_prod.prod_id==k].sort_values('weekly_price').weekly_price, 
         weekly_prod[weekly_prod.prod_id==k].sort_values('weekly_price').weekly_demand)
plt.title('product %i price - demand'%k)
plt.xlabel('weekly price')
plt.ylabel('weekly demand')
plt.show()

Find complementary item for each product using basket analysis and calculate the weekly price of complementary item.

In [0]:
basketdata = trxn.select('prod_id','trans_id').dropDuplicates(['prod_id','trans_id']).sort('trans_id')
basketdata = basketdata.groupBy('trans_id').agg(F.collect_list('prod_id')).sort('trans_id')
# Frequent Pattern Growth – FP Growth is a method of mining frequent itemsets using support, lift, and confidence.
fpg = FPGrowth(itemsCol="collect_list(prod_id)", minSupport=0.005, minConfidence=0.005)
model = fpg.fit(basketdata)
# sort by descending suppport/confidentce/lift, and filter antecedents that are 95 products we chose
rules = model.associationRules.sort(desc("support"), desc("confidence"), desc("lift"))
prod_60 = spark.createDataFrame(pd.DataFrame(pd.concat([elastic, inelastic], ignore_index=True).prod_id))
prod_60 = prod_60.withColumn("prod_id", col("prod_id").cast("long"))
rules = rules.select(explode('antecedent').alias('antecedent'), 'consequent', 'support','confidence','lift')
rules = rules.withColumnRenamed('antecedent', 'prod_id')
rules = rules.join(prod_60, 'prod_id', "inner")
rules.printSchema()

In [0]:
# choose 1 complementary for each product
w3 = Window.partitionBy('prod_id').orderBy(desc("support"), desc("confidence"), desc("lift"))
rules = rules.withColumn('row_num', row_number().over(w3))
rules = rules.filter(rules.row_num <= 1).drop('row_num')
rules = rules.withColumnRenamed('prod_id', 'prod_60')
rules = rules.select('prod_60', explode('consequent').alias('prod_id'))
rules = rules.withColumn('prod_id', col('prod_id').cast('long'))
rules = rules.join(trxn.select(['prod_id','week','price']),on='prod_id', how='inner')
complementary = rules.groupBy(['prod_id','prod_60','week']).agg(avg('price').alias('avg_comp_price'))
complementary = complementary.select('prod_60','prod_id','avg_comp_price','week')
complementary = complementary.toPandas()
# pivot the table into [week, prod_id, comp_weekly_price]
complementary_pivot = pd.DataFrame(columns = ['week','comp','prod_60'])
for i in complementary.prod_60.unique():
    #print(i)
    pivot = complementary[complementary.prod_60==i].pivot(index='week', columns='prod_id', values='avg_comp_price').reset_index().iloc[:,:2] # prod_id>prod_60[i]
    pivot.columns = ['week','comp']
    pivot['prod_60']=i
    complementary_pivot = pd.concat([complementary_pivot, pivot])

Find substitute item for each product in the same subcategory with same type/prod_unit_qty_count/prod_count_uom and calculate the weekly price of substitute item.

In [0]:
prod = prod.withColumn('prod_id', col('prod_id').cast('long'))
prod_60 = spark.createDataFrame(pd.DataFrame(pd.concat([elastic, inelastic], ignore_index=True).prod_id))
prod_60 = prod_60.withColumn("prod_id", col("prod_id").cast("long"))
prod_60 = prod_60.join(prod,on='prod_id')
prod_60 = prod_60.withColumnRenamed('prod_id','prod_60')
prod_60 = prod_60.withColumnRenamed('prod_desc','desc_60')
prod_60 = prod_60.withColumnRenamed('prod_mfc_brand_cd','brand_60')
substitute = prod_60.join(trxn.select('prod_id','trans_id','prod_desc','prod_type','prod_subcategory','prod_mfc_brand_cd','prod_unit_qty_count','prod_count_uom','price','week','sales_qty'), on=['prod_type','prod_subcategory','prod_unit_qty_count','prod_count_uom'])
substitute = substitute.filter(substitute.prod_60!=substitute.prod_id)
subs_total_sale = substitute.groupBy(['prod_60','prod_id']).agg(sum('sales_qty').alias('total_sales'))
w4 = Window.partitionBy('prod_60').orderBy(desc('total_sales'))
subs_total_sale = subs_total_sale.withColumn('row_num', row_number().over(w4))
# only keep 1 substitutes with the largest sales
subs_total_sale = subs_total_sale.filter(subs_total_sale.row_num <= 1).drop('row_num')
substitute = substitute.join(subs_total_sale, on=['prod_60','prod_id'])
substitute = substitute.select(['prod_60','prod_id','week','price']).groupBy(['prod_60','prod_id','week']).agg(avg('price').alias('avg_subs_price'))
substitute = substitute.toPandas()

In [0]:
# pivot the table into [week, prod_id, sub_weekly_price]
substitute_pivot = pd.DataFrame(columns=['week','sub','prod_60'])
for i in substitute.prod_60.unique():
    pivot = substitute[substitute.prod_60==i].pivot(index='week', columns='prod_id', values='avg_subs_price').reset_index() # prod_id>prod_60[i]
    pivot.columns = ['week','sub']
    pivot['prod_60']=i
    substitute_pivot = substitute_pivot.append(pivot)

Consider seasonality as weekly revenue of the same category for each product.

In [0]:
prod_60 = spark.createDataFrame(pd.DataFrame(pd.concat([elastic, inelastic], ignore_index=True).prod_id))
prod_60 = prod_60.withColumn("prod_id", col("prod_id").cast("long"))
prod_60 = prod_60.join(prod.select('prod_id','prod_category'),on='prod_id')
prod_60 = prod_60.withColumnRenamed('prod_id','prod_60')
seasonality = prod_60.join(trxn.select('prod_id','trans_id','prod_category','sales_amt','week'), on=['prod_category'])
seasonality = seasonality.groupBy(['prod_category','week']).agg(sum('sales_amt').alias('category_revenue'))
seasonality = seasonality.select('prod_category','week','category_revenue').join(prod_60, on='prod_category')
seasonality = seasonality.toPandas()

Find competitors of each product(same subcategory/type but different brand) and calculate competitors' weekly average price.

In [0]:
prod_60 = spark.createDataFrame(pd.DataFrame(pd.concat([elastic, inelastic], ignore_index=True).prod_id))
prod_60 = prod_60.withColumn("prod_id", col("prod_id").cast("long"))
prod_60 = prod_60.join(prod,on='prod_id')
prod_60 = prod_60.withColumnRenamed('prod_id','prod_60')
prod_60 = prod_60.withColumnRenamed('prod_mfc_brand_cd','brand_60')
competitor = prod_60.join(trxn.select('prod_id','trans_id','prod_type','prod_subcategory','prod_mfc_brand_cd','price','week'), on=['prod_type','prod_subcategory'])
competitor = competitor.filter(competitor.brand_60!=competitor.prod_mfc_brand_cd)
competitor = competitor.groupBy(['prod_60','prod_subcategory','week']).agg(avg('price').alias('competitor_weekly_price'))
competitor = competitor.toPandas()

Join product weekly price, complementary weekly price, substitute weekly price, competitor weekly price, and seanality into one table for regression model of demand prediction.

In [0]:
complementary_pivot = complementary_pivot.rename(columns={'prod_60': 'prod_id'})
substitute_pivot = substitute_pivot.rename(columns={'prod_60': 'prod_id'})
seasonality = seasonality.rename(columns={'prod_60': 'prod_id'})
competitor = competitor.rename(columns={'prod_60': 'prod_id'})
# join features into regression table
# EDLP needs to consider competitor price on the product
EDLP_df = elastic.merge(weekly_prod, on=['prod_id'], how='left').merge(substitute_pivot, on = ['prod_id','week'], how='left').merge(seasonality, on = ['prod_id','week'], how='left').merge(competitor, on = ['prod_id','week'], how='left').merge(complementary_pivot, on = ['prod_id','week'], how='left').drop(['elasticity','p_value','logP','logD', 'prod_category','prod_subcategory'],axis=1)
# Hi-Lo
HiLo_df = inelastic.merge(weekly_prod, on=['prod_id'], how='left').merge(substitute_pivot, on = ['prod_id','week'], how='left').merge(seasonality, on = ['prod_id','week'], how='left').merge(complementary_pivot, on = ['prod_id','week'], how='left').merge(competitor, on = ['prod_id','week'], how='left').drop(['elasticity','p_value','logP','logD', 'prod_category'],axis=1)

In [0]:
# theoretical max demand: elasticity=-1   
md_edlp = EDLP_df[['prod_id','weekly_demand']].groupby('prod_id',as_index=False).weekly_demand.max()
md_edlp = md_edlp.rename(columns={'weekly_demand': 'max_demand'})
EDLP_df = EDLP_df.merge(md_edlp, on='prod_id', how='left')

md_hilo = HiLo_df[['prod_id','weekly_demand']].groupby('prod_id',as_index=False).weekly_demand.max()
md_hilo = md_hilo.rename(columns={'weekly_demand': 'max_demand'})
HiLo_df = HiLo_df.merge(md_hilo, on='prod_id', how='left')

Demand Prediction - Logit Price Response Function (EDLP & Hi-Lo)

Target = log(dit/(ui-dit))

In [0]:
# fill in missing value 
EDLP_df['sub'] = EDLP_df['sub'].fillna(EDLP_df['sub'].mean())
EDLP_df['competitor_weekly_price'] = EDLP_df['competitor_weekly_price'].fillna(EDLP_df['weekly_price'])
EDLP_df.isna().sum()

In [0]:
# sample 1 product and feed on two different models
# linear regression
# split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
lr = Lasso(alpha=0.05)
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
y_pred_t = lr.predict(X_train)
# calculate the RMSE
rmse_t = np.sqrt(mean_squared_error(y_train, y_pred_t))
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print('LR Train RMSE:{0:.3f}'.format(rmse_t))
print('LR Test RMSE:{0:.3f}'.format(rmse))
# catboost regression
# split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
lr = Lasso(alpha=0.05)
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
y_pred_t = lr.predict(X_train)
# calculate the RMSE
rmse_t = np.sqrt(mean_squared_error(y_train, y_pred_t))
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print('Tree Train RMSE:{0:.3f}'.format(rmse_t))
print('Tree Test RMSE:{0:.3f}'.format(rmse))

EDLP - pit/qit (consider competitors)

Optimize price for each product and predict demand in each store

In [0]:
def optimization(sample, j):
    sample['price_opt'] = j 
    sample['P_opt'] = sample['price_opt']/sample['competitor_weekly_price']
    # standardized features considering their different magnitude
    sample['Seasonality'] = (sample['category_revenue'] - sample['category_revenue'].mean())/sample['category_revenue'].std()
    sample = sample.drop(sample[sample['Target'] == np.inf].index)
    X = sample[['P','sub','comp','Seasonality']]
    y = sample[['Target']]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    clf = cbt.CatBoostRegressor(iterations=30,depth=5, learning_rate=0.2,loss_function="RMSE",verbose=False)
    clf.fit(X_train, y_train)
    y_pred_clf = clf.predict(X_test)
    y_pred_t_clf = clf.predict(X_train)
    RMSE_TRAIN_clf = math.sqrt(mean_squared_error(y_train, y_pred_t_clf))
    RMSE_TRAIN_clf = float('{0:.2f}'.format(RMSE_TRAIN))
    RMSE_TEST_clf = math.sqrt(mean_squared_error(y_test, y_pred_clf))
    RMSE_TEST_clf = float('{0:.2f}'.format(RMSE_TEST))
    lr = LinearRegression()
    lr.fit(X_train, y_train)
    y_pred_lr = lr.predict(X_test)
    y_pred_t_lr = lr.predict(X_train)
    RMSE_TRAIN_lr = math.sqrt(mean_squared_error(y_train, y_pred_t_lr))
    RMSE_TRAIN_lr = float('{0:.2f}'.format(RMSE_TRAIN))
    RMSE_TEST_lr = math.sqrt(mean_squared_error(y_test, y_pred_lr))
    RMSE_TEST_lr = float('{0:.2f}'.format(RMSE_TEST))
   # print('CAT Train RMSE:', RMSE_TRAIN_clf,'CAT Test RMSE:', RMSE_TEST_clf,'\nLR Train RMSE:', RMSE_TRAIN_lr,'LR Test RMSE:', RMSE_TEST_lr,)
    X_opt = sample[['P_opt','sub','comp','Seasonality']]
    X_opt = X_opt.rename(columns = {'P_opt':'P'})
    # choose model with lower rmse
    if RMSE_TEST_lr<RMSE_TEST_clf:
        y_opt = lr.predict(X_opt)
    else:
        y_opt = clf.predict(X_opt)
    sample['D_opt'] = sample.max_demand/(1+np.exp(-y_opt.flatten()))
    Re_EDLP.append((sample['D_opt']*sample['price_opt']).sum())
    return sample, Re_EDLP

In [0]:
EDLP_opt = pd.DataFrame(columns = ['prod_id','original_revenue','optimized_revenue','optimized_price','price_change','revenue_change'])
for i in EDLP_df.prod_id.unique():
    Re_EDLP =  []
    i =  EDLP_df.prod_id.unique()[1]
    sample = EDLP_df[EDLP_df.prod_id==i]
    sample['Target'] = np.log(EDLP_df.weekly_demand/(EDLP_df.max_demand-EDLP_df.weekly_demand))
    # consider competitors
    sample['P'] = EDLP_df.weekly_price/EDLP_df.competitor_weekly_price
    # every day low price: iterate between lowest price and highest price of the product, make elasticity as close to -1 as possible and find the price that optimized the revenue of stores; aggregate the optimized revenue across all the stores of each product
    lower = round(sample.weekly_price.min(),2)-1
    upper = round(sample.weekly_price.max(),2)+1
    p_range =  np.arange(lower, upper, 0.1)
    for j in p_range:
        sample, Re_EDLP = optimization(sample,j)
    orrev = float('{0:.2f}'.format((sample['weekly_demand']*sample['weekly_price']).sum()))
    oprev = float('{0:.2f}'.format((np.max(Re_EDLP))))
    rbest = p_range[np.array(Re_EDLP).argmax()]
    up = '{0:.2f}%'.format((oprev-orrev)/orrev*100)
    pp = '{0:.2f}%'.format((rbest- sample.weekly_price.mean())/sample.weekly_price.mean()*100)
    it_row = [i,orrev, oprev,rbest,pp,up]
    it_row = pd.DataFrame([it_row], columns = ['prod_id','original_revenue','optimized_revenue','optimized_price','price_change','revenue_change'])
    EDLP_opt = pd.concat([EDLP_opt,it_row])
EDLP_opt['prod_id'] = EDLP_opt['prod_id'].astype('long')
EDLP_opt

In [0]:
# show the demand-price
plt.figure(figsize=(10,8))
plt.plot(p_range, Re_EDLP, '-o',markevery=[np.argmax(Re_EDLP)])
plt.title('product %i price - demand'%i)
plt.xlabel('EDLP')
plt.ylabel('Revenue')
plt.show()

In [0]:
# orgnize the demand/revenue change of each store
trxn_EDLP = trxn.select('prod_id','store_id','sales_amt','sales_qty').join(spark.createDataFrame(EDLP_opt).drop('price_change','original_revenue','revenue_change'), on='prod_id')
prod_store_revenue = trxn_EDLP.groupBy(['prod_id','store_id']).agg(sum('sales_amt').alias('revenue'),sum('sales_qty').alias('demand'))
prod_revenue = trxn_EDLP.groupBy(['prod_id']).agg(sum('sales_amt').alias('total_revenue'))
prod_store_revenue = prod_store_revenue.join(prod_revenue, on='prod_id')
prod_store_revenue = prod_store_revenue.withColumn('store_ratio',round(prod_store_revenue.revenue/prod_store_revenue.total_revenue,2))
trxn_EDLP = trxn_EDLP.join(prod_store_revenue, on=['prod_id','store_id'])
trxn_EDLP = trxn_EDLP.withColumn('Opt_Price',round(trxn_EDLP.optimized_price,2))
trxn_EDLP = trxn_EDLP.withColumn('revenue',round(trxn_EDLP.revenue,2))
trxn_EDLP = trxn_EDLP.withColumn('total_revenue',round(trxn_EDLP.total_revenue,2))
trxn_EDLP = trxn_EDLP.withColumn('Opt_Revenue',round(trxn_EDLP.optimized_revenue*trxn_EDLP.store_ratio,2))
trxn_EDLP = trxn_EDLP.withColumn('Revenue_Change',format_string("%.2f%%", (trxn_EDLP.Opt_Revenue-trxn_EDLP.revenue)/trxn_EDLP.revenue*100))
trxn_EDLP = trxn_EDLP.withColumn('Opt_demand',trxn_EDLP.Opt_Revenue/trxn_EDLP.Opt_Price)
trxn_EDLP = trxn_EDLP.withColumn('Opt_demand',trxn_EDLP.Opt_demand.cast("integer"))
trxn_EDLP = trxn_EDLP.withColumn('Demand_Change',format_string("%.2f%%", (trxn_EDLP.Opt_demand-trxn_EDLP.demand)/trxn_EDLP.demand*100))
trxn_EDLP.show()

Hi-Lo - pit 

Optimize price for each product and predict demand in each store

In [0]:
# fill in missing value 
HiLo_df['sub'] = HiLo_df['sub'].fillna(HiLo_df['sub'].mean())
HiLo_df['competitor_weekly_price'] = HiLo_df['competitor_weekly_price'].fillna(HiLo_df['weekly_price']) 
HiLo_df.isna().sum()

In [0]:
# Hi-Lo similar as EDLP
def optimization_hilo(report, sample, j):
    sample['price_opt'] = j*sample['competitor_weekly_price'].mean()
    # standardization
    sample['Seasonality'] = (sample['category_revenue'] - sample['category_revenue'].mean())/sample['category_revenue'].std()
    sample['P_opt'] = sample['price_opt']/sample['competitor_weekly_price']
    sample = sample.drop(sample[sample['Target'] == np.inf].index)
    X = sample[['P','sub','comp','Seasonality']]
    y = sample[['Target']]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    clf = cbt.CatBoostRegressor(iterations=30,depth=5, learning_rate=0.2,loss_function="RMSE",verbose=False)
    clf.fit(X_train, y_train)
    y_pred_clf = clf.predict(X_test)
    y_pred_t_clf = clf.predict(X_train)
    RMSE_TRAIN_clf = math.sqrt(mean_squared_error(y_train, y_pred_t_clf))
    RMSE_TRAIN_clf = float('{0:.2f}'.format(RMSE_TRAIN))
    RMSE_TEST_clf = math.sqrt(mean_squared_error(y_test, y_pred_clf))
    RMSE_TEST_clf = float('{0:.2f}'.format(RMSE_TEST))
    lr = LinearRegression()
    lr.fit(X_train, y_train)
    y_pred_lr = lr.predict(X_test)
    y_pred_t_lr = lr.predict(X_train)
    RMSE_TRAIN_lr = math.sqrt(mean_squared_error(y_train, y_pred_t_lr))
    RMSE_TRAIN_lr = float('{0:.2f}'.format(RMSE_TRAIN))
    RMSE_TEST_lr = math.sqrt(mean_squared_error(y_test, y_pred_lr))
    RMSE_TEST_lr = float('{0:.2f}'.format(RMSE_TEST))
    X_opt = sample[['P_opt','sub','comp','Seasonality']]
    X_opt = X_opt.rename(columns = {'P_opt':'P'})
        # choose model with lower rmse
    if RMSE_TEST_lr<RMSE_TEST_clf:
        y_opt = lr.predict(X_opt)
    else:
        y_opt = clf.predict(X_opt)
    print(lr.coef_)
    sample['D_opt'] = sample.max_demand/(1+np.exp(-y_opt.flatten()))
    # after predicting demand using price, fit the features and target to constant-elasticity model for the currenct elasticity. keep elasticity>-1
    result = smf.ols('np.log(D_opt) ~ np.log(price_opt)+np.log(sub)+np.log(comp)+np.log(Seasonality)', data=sample).fit()
    slope = float('{0:.3f}'.format(result.params[1]))
    Re = float('{0:.2f}'.format((sample['D_opt']*sample['price_opt']).sum()))
    price =  float('{0:.2f}'.format(sample['price_opt'].unique()[0]))
    ogp = sample.weekly_price.mean()
    new_row = pd.DataFrame(np.array([[Re,slope,price,ogp]]), columns=['Revenue','elasticity','price','og_price'])
    report = pd.concat([report,new_row])
    return sample, report

In [0]:
# Increase price, make elasticity as close to -1 as possible; aggregate the optimized revenue across all the stores of each product
HiLo_opt = pd.DataFrame(columns = ['prod_id','original_revenue','optimized_revenue','optimized_price','price_change','revenue_change'])
for i in HiLo_df.prod_id.unique():
    sample = HiLo_df[HiLo_df.prod_id==i]
    sample['Target'] = np.log(HiLo_df.weekly_demand/(HiLo_df.max_demand-HiLo_df.weekly_demand))
    sample['P'] = HiLo_df.weekly_price/HiLo_df.competitor_weekly_price
    # adjust price
    p_range =  np.arange(1.01, 1.5, 0.01)
    for j in p_range:
        sample, report = optimization_hilo(report,sample,j)
    orrev = float('{0:.2f}'.format((sample['weekly_demand']*sample['weekly_price']).sum()))
    Rmax = report[report.elasticity>-1].Revenue.max()
    oprev = float('{0:.2f}'.format(Rmax))
    rbest = report[report.Revenue==Rmax].price[0]
    pp = '{0:.2f}%'.format(((rbest-report.og_price.unique()[0])/report.og_price.unique()[0])*100)
    up = '{0:.2f}%'.format((oprev-orrev)/orrev*100)
    it_row = np.array([[i,orrev, oprev,rbest, pp, up]])
    it_row = pd.DataFrame(it_row, columns = ['prod_id','original_revenue','optimized_revenue','optimized_price','price_change','revenue_change'])
HiLo_opt = pd.concat([HiLo_opt,it_row])

In [0]:
# orgnize the demand/revenue change of each store
trxn_HiLo = trxn.select('prod_id','store_id','sales_amt','sales_qty').join(spark.createDataFrame(HiLo_opt).drop('price_change','original_revenue','revenue_change'), on='prod_id')
prod_store_revenue_HiLo = trxn_HiLo.groupBy(['prod_id','store_id']).agg(sum('sales_amt').alias('revenue'),sum('sales_qty').alias('demand'))
prod_revenue_HiLo = trxn_HiLo.groupBy(['prod_id']).agg(sum('sales_amt').alias('total_revenue'))
prod_store_revenue_HiLo = prod_store_revenue_HiLo.join(prod_revenue_HiLo, on='prod_id')
prod_store_revenue_HiLo = prod_store_revenue_HiLo.withColumn('store_ratio',round(prod_store_revenue_HiLo.revenue/prod_store_revenue_HiLo.total_revenue,2))
trxn_HiLo = trxn_HiLo.join(prod_store_revenue_HiLo, on=['prod_id','store_id'])
trxn_HiLo = trxn_HiLo.withColumn('Opt_price',round(trxn_HiLo.optimized_price,2))
trxn_HiLo = trxn_HiLo.withColumn('revenue',round(trxn_HiLo.revenue,2))
trxn_HiLo = trxn_HiLo.withColumn('total_revenue',round(trxn_HiLo.total_revenue,2))
trxn_HiLo = trxn_HiLo.withColumn('Opt_Revenue',round(trxn_HiLo.optimized_revenue*trxn_HiLo.store_ratio,2))
trxn_HiLo = trxn_HiLo.withColumn('Revenue_Change',format_string("%.2f%%", (trxn_HiLo.Opt_Revenue-trxn_HiLo.revenue)/trxn_HiLo.revenue*100))
trxn_HiLo = trxn_HiLo.withColumn('Opt_Demand',trxn_HiLo.Opt_Revenue/trxn_HiLo.Opt_price)
trxn_HiLo = trxn_HiLo.withColumn('Opt_Demand',trxn_HiLo.Opt_Demand.cast("integer"))
trxn_HiLo = trxn_HiLo.withColumn('Demand_Change',format_string("%.2f%%", (trxn_HiLo.Opt_Demand-trxn_HiLo.demand)/trxn_HiLo.demand*100))
trxn_HiLo = trxn_HiLo.select(['prod_id','store_id','Opt_price','demand','revenue','Opt_Demand','Opt_Revenue','Demand_Change','Revenue_Change']).distinct()
trxn_HiLo

Calculate the total revenue increasement

In [0]:
total_revenue_change = (EDLP_opt.optimized_revenue.astype('float')-EDLP_opt.original_revenue.astype('float')).sum()+(HiLo_opt.optimized_revenue.astype('float')-HiLo_opt.original_revenue.astype('float')).sum()
print('We help ASCE increase revenue by',total_revenue_change,'.')