In [1]:
import pyspark.sql.functions as f

In [2]:
df = spark.sql("select * from clickstream_de_csv")
df.show(5)
df.printSchema()

In [3]:
# extract out important features and removing unneeded columns
## 'hour' is expected to be an important feature
## 'day' is expected to be an important feature
## 'week' number may not be significant (in such a short time span) but captured anyways
## 'month' month my not be significant (in such a short time span) but captured anyways
# didn't observe any German holidays during or shortly after that can have an impact on click

df = df \
  .withColumn('day', f.dayofweek('utcdate')) \
  .withColumn('week', f.weekofyear('utcdate')) \
  .withColumn('month', f.month('utcdate')) \
  .withColumn('hour', f.hour('utcdate'))
df = df.drop(*['countrycode', 'utcdate'])

df.show(10)

df.createOrReplaceTempView('impressions')

# verifying our schema

df.printSchema()

# number of partitions

print("Number of partitions for 'df': ", df.rdd.getNumPartitions())


In [4]:
# looking at time frame discriptive stats

df.select(['hour', 'day', 'week', 'month', 'rating']).describe().show()

print("Number of distinct 'category': ", df.select('category').distinct().count())
print("Number of distinct 'merchant': ", df.select('merchant').distinct().count())
print("Number of distinct 'userid': ", df.select('userid').distinct().count())
print("Number of distinct 'offerid': ", df.select('offerid').distinct().count())

In [5]:
# Total number of clicks
df1 = df.groupby().sum('rating')
df1.show()

In [6]:
# offer id that has received the max percentage of clicks
# we are assuming that the event with the most clickthrough rate and highest ad impression is most important

spark.sql('''
select offerid, sum(rating) as clicks, count(offerid) as impressions,
    sum(rating) * 100 / count(offerid) as pct
from test_de_csv
group by offerid
order by clicks desc, impressions desc, pct desc
limit 1
''' ).show(truncate=False)


In [7]:
# user that has the max percentage of click
# we are assuming that the event with the most clickthrough rate and highest ad impression is most important

sample_df = df.sample(False, .5, )

spark.sql('''
select userid, sum(rating) as clicks, count(offerid) as impressions,
    sum(rating) * 100 / count(offerid) as pct
from test_de_csv
group by userid
order by clicks desc, impressions desc, pct desc
limit 1
''' ).show(truncate=False)

In [8]:
# Predict if the offer will be clicked or not

# 'userid', 'offerid' has too many categories
# 'month', 'week' has no predictive power since single or few categories
# unless we are able to group it make it smaller, we will drop it

df = df.drop(*['userid', 'offerid', 'month', 'week'])

# imports

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

# handling categorical columns

categorical_cols = ['category', 'merchant']

indexers = [StringIndexer(inputCol=col, outputCol=col+'idx') \
            .setHandleInvalid('keep') \
            for col in categorical_cols]

indexers_pipeline = Pipeline(stages=indexers)

df_indexed = indexers_pipeline.fit(df).transform(df).drop(*categorical_cols)

# creating assembler

feature_cols = df_indexed.columns[1:]

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

df_assembled = assembler.transform(df_indexed)

df_ml = df_assembled.select(['rating', 'features'])

# Train-Test-Split

(df_train, df_test) = df_ml.randomSplit([0.6, 0.4], seed=42)

In [9]:
# random forest classifier (training set)

from pyspark.ml.classification import RandomForestClassifier as RFC

rfc = RFC(featuresCol='features', labelCol='rating', predictionCol='rating_pred', maxBins=1000, seed=42)

rfc_model = rfc.fit(df_train)

# gradient boosted tree classifier (training set)

from pyspark.ml.classification import GBTClassifier as GBTC

gbtc = GBTC(featuresCol='features', labelCol='rating', predictionCol='rating_pred', maxBins=1000, seed=42)

gbtc_model = gbtc.fit(df_train)


In [10]:
# model evaluation 

from pyspark.ml.evaluation import BinaryClassificationEvaluator as Evaluator

evaluator = Evaluator(labelCol='rating', rawPredictionCol='rating_pred')

# random forest classifier

rfc_train_pred = rfc_model.transform(df_train)
rfc_train_evaluation = evaluator.evaluate(rfc_train_pred)
print(f'Random Forest Classifier Accuracy (Train): {rfc_train_evaluation}')

rfc_test_pred = rfc_model.transform(df_test)
rfc_test_evaluation = evaluator.evaluate(rfc_test_pred)
print(f'Random Forest Classifier Accuracy (Test): {rfc_test_evaluation}\n')

# gradient boosted tree classifier 

gbtc_train_pred = gbtc_model.transform(df_train)
gbtc_train_evaluation = evaluator.evaluate(gbtc_train_pred)
print(f'Gradient Boosted Tree Classifier Accuracy (Train): {gbtc_train_evaluation}')

gbtc_test_pred = gbtc_model.transform(df_test)
gbtc_test_evaluation = evaluator.evaluate(gbtc_test_pred)
print(f'Gradient Boosted Tree Classifier Accuracy (Test): {gbtc_test_evaluation}\n')

In [11]:
# confusion 'matrix' for  the random forest model (Train)
print('Confusion Matrix [Random Forest] (Train)')
rfc_train_pred.groupby('rating', 'rating_pred').count().show()

# confusion 'matrix' for  the random tree model (Train)
print('Confusion Matrix [Random Forest] (Test)')
rfc_test_pred.groupby('rating', 'rating_pred').count().show()

# confusion 'matrix' for  the gradient boosted tree model (Train)
print('Confusion Matrix [Gradient Boosted Tree] (Train)')
gbtc_train_pred.groupby('rating', 'rating_pred').count().show()

# confusion 'matrix' for  the gradient boosted tree model (Train)
print('Confusion Matrix [Gradient Boosted Tree] (Test)')
gbtc_test_pred.groupby('rating', 'rating_pred').count().show()

In [12]:
rfc_pred = rfc_model.transform(df_ml)
# confusion 'matrix' for  the random forest model (Full)
print('\nConfusion Matrix [Random Forest]')
rfc_pred.groupby('rating', 'rating_pred').count().show()

# accuracy for entire dataset
rfc_pred =rfc_model.transform(df_ml)
rfc_evaluation = evaluator.evaluate(rfc_pred)
print(f'Random Forest Classifier Accuracy (Full): {rfc_evaluation}\n')


gbtc_pred = gbtc_model.transform(df_ml)
# confusion 'matrix' for  the gradient boosted tree model (Full)
print('\nConfusion Matrix [Gradient Boosted Tree]')
gbtc_pred.groupby('rating', 'rating_pred').count().show()

# accuracy for entire dataset
gbtc_pred = gbtc_model.transform(df_ml)
gbtc_evaluation = evaluator.evaluate(gbtc_pred)
print(f'Gradient Boosted Tree Classifier Accuracy (Full): {gbtc_evaluation}\n')


# Business Scenario

_In a situation, where a mobile advertisement company has this data as historical data. Each impression cost the advertisement company 1 cent, and each click costs the advertisement company $1. It has been given by merchants that for each impression the ROIfor the merchant is 10 cents and for each click, the ROI for the merchant is $10._

_The advertisement company has $1000 to run the advertisement campaigns in the next 7 days. 
Based on the above historical dataset, identify the {offerid, merchantid} combination (or combinations) that the advertisement agency should run in this campaign._

In [14]:
# starting dataframe for reference

df.show(10)

In [15]:
# Goal: We are trying to stay under advertising budget while attaining max ROI over a 7-day period
#
# Which merchant/offerid has best ROI for $1000 budget?


# calculate cost and roi

cost_vs_roi = spark.sql('''
select merchant, offerid, 
    concat(cast(week as string),'-',cast(day as string)) as day, 
    sum(rating) as clicks, count(offerid) as impressions,
    (count(offerid) * .01 + sum(rating)) as cost, 
    (count(offerid) + sum(rating) * 10) as roi
from impressions
group by offerid, merchant, week, day
order by day
''' )

cost_vs_roi.show(10)

cost_vs_roi.createOrReplaceTempView('cost_vs_roi')


In [16]:
# merchant/offerid combination

group_by_mod = spark.sql('''
select merchant, offerid, day, sum(cost) as total_cost, sum(roi) as total_roi
from cost_vs_roi
group by merchant, offerid, day
order by merchant, day, total_cost desc, total_roi desc
''')

group_by_mod.show(10)

group_by_mod.createOrReplaceTempView('group_by_mod')

In [17]:
# now we want to look at the cost/roi in a 7 day window

seven_days = spark.sql('''
select merchant, offerid, day,
    sum(total_cost) over (partition by merchant, offerid 
                    order by day 
                    rows between 3 preceding and 3 following) as 7day_cost,
    sum(total_roi) over (partition by merchant, offerid
                    order by day 
                    rows between 3 preceding and 3 following) as 7day_roi
from group_by_mod
order by merchant, day
''')

seven_days.show(10)

seven_days.createOrReplaceTempView('seven_days')

In [18]:
%sql

/* find the max roi while staying under budget */

select merchant, offerid, 7day_cost, 7day_roi
from seven_days
where 7day_cost < 1000
order by 7day_roi desc
limit 1

merchant,offerid,7day_cost,7day_roi
21a509189fb0875c3732590121ff3fc86da770b0628c1812e82f023ee2e0f683,19754ec121b3a99fff3967646942de67,789.58,78058
