
## Spark

In [None]:
#@title  <-- Run Me: (Hidden) Installing Spark
%%bash 

## Setup Spark on Colab
pip install -q pyspark
apt-get -qq install -y openjdk-8-jdk-headless

## Setup port-forwarding

# Download ngrok
wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# Unload ngrok
unzip -q ngrok-stable-linux-amd64.zip

replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename:  NULL
(EOF or read error, treating as "[N]one" ...)


In [None]:
#@title  <-- Run Me: (Hidden) Environment Variable Setup
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
import pyspark 
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import seaborn as sns
import matplotlib.pyplot as plt

import pandas as pd    
import numpy as np


# Set configuration scope to be local and use port 4050
config_scope = SparkConf().set("spark.ui.port", "4050")

# Create the connection to a Spark cluster
sc = pyspark.SparkContext(conf = config_scope)

# Create a session to programmatically build Spark RDD, DataFrame and DataSet
spark = SparkSession.builder.getOrCreate()

ModuleNotFoundError: ignored

## Load data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Read in data
TRAIN_DIR = "drive/MyDrive/STAT480-Group Project/dataset/track2"

In [None]:
training = spark.read.option("header","false").option("delimiter","\t").csv(TRAIN_DIR+'/training.txt')
training = training.selectExpr('_c0 as Click', '_c1 as Impression', '_c2 as AdURL', '_c3 as AdId', '_c4 as AdvId', 
                    '_c5 as Depth', '_c6 as Pos', '_c7 as QId', '_c8 as KeyId', '_c9 as TitleId', 
                    '_c10 as DescId', '_c11 as UId')
training.show(10)

NameError: ignored

In [None]:
purchasekeywordid = spark.read.option("header","false").option("delimiter","\t").csv(TRAIN_DIR+'/purchasedkeywordid_tokensid.txt')
purchasekeywordid = purchasekeywordid.selectExpr('_c0 as KeyId', '_c1 as PurchaseKeyword')
purchasekeywordid.show(10)

In [None]:
queryid = spark.read.option("header","false").option("delimiter","\t").csv(TRAIN_DIR+'/queryid_tokensid.txt')
queryid = queryid.selectExpr('_c0 as QId', '_c1 as Query')
queryid.show(10)

In [None]:
descriptionid = spark.read.option("header","false").option("delimiter","\t").csv(TRAIN_DIR+'/descriptionid_tokensid.txt')
descriptionid = descriptionid.selectExpr('_c0 as DescId', '_c1 as Description')
descriptionid.show(10)

In [None]:
userid = spark.read.option("header","false").option("delimiter","\t").csv(TRAIN_DIR+'/userid_profile.txt')
userid = userid.selectExpr('_c0 as UId', '_c1 as Gender', '_c2 as Age')
userid.show(10)

In [None]:
titleid = spark.read.option("header","false").option("delimiter","\t").csv(TRAIN_DIR+'/titleid_tokensid.txt')
titleid = titleid.selectExpr('_c0 as TitleId', '_c1 as Titile')

In [None]:
titleid.show(10)

## Data pre-processing

### Gender and Age

In [1]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder

ModuleNotFoundError: ignored

In [None]:
# label encode
Gender_indexer = StringIndexer(inputCol='Gender', outputCol='Gender_num').fit(userid)
Age_indexer = StringIndexer(inputCol='Age', outputCol='Age_num').fit(userid)


userid = Gender_indexer.transform(userid)
userid = Age_indexer.transform(userid)

In [None]:
# one-hot encoder
Gender_onehoter = OneHotEncoder(inputCol='Gender_num', outputCol='Gender_vector')
Age_onehoter = OneHotEncoder(inputCol='Age_num', outputCol='Age_vector')


ohe1 = Gender_onehoter.fit(userid)
ohe2 = Age_onehoter.fit(userid)


userid = ohe1.transform(userid)
userid = ohe2.transform(userid)

In [None]:
userid.show(10)

### Average Click Through Rate

In [None]:
# average click through rate
temp_df1 = training.groupBy("AdvId").agg((avg("Click")/count("Click")).alias("AvgClick_Advertiser"))
temp_df1.show(10)

In [None]:
temp_df2 = training.groupBy("AdID").agg((avg("Click")/count("Click")).alias("AvgClick_Ad"))
temp_df2.show(10)

In [None]:
temp_df3 = training.groupBy("QId").agg((avg("Click")/count("Click")).alias("AvgClick_Query"))
temp_df3.show(10)

In [None]:
temp_df4 = training.groupBy("KeyId").agg((avg("Click")/count("Click")).alias("AvgClick_Key"))
temp_df4.show(10)

In [None]:
temp_df5 = training.groupBy("TitleId").agg((avg("Click")/count("Click")).alias("AvgClick_Title"))
temp_df5.show(10)

In [None]:
temp_df6 = training.groupBy("DescId").agg((avg("Click")/count("Click")).alias("AvgClick_Desciption"))
temp_df6.show(10)

### Join tables together

In [None]:
train_df = training.join(userid, on="UId", how="inner")
train_df = train_df.join(temp_df1, on="AdvId", how="inner")
train_df = train_df.join(temp_df2, on="AdId", how="inner")
train_df = train_df.join(temp_df3, on="QId", how="inner")
train_df = train_df.join(temp_df4, on="KeyId", how="inner")
train_df = train_df.join(temp_df5, on="TitleId", how="inner")
train_df = train_df.join(temp_df6, on="DescId", how="inner")
train_df.show(10)

## Logistic Regression Model

In [None]:
from pyspark.ml.feature import VectorAssembler

train_assembler = VectorAssembler(inputCols=['Impression', 'Depth', 'Pos', 
                        'Gender_vector', 'Age_vector',
                        'AvgImp_Advertiser', 'AvgClick_Ad',
                        'AvgClick_Query', 'AvgClick_Key',
                        'AvgClick_Title', 'AvgClick_Desciption'],
                 outputCol='features')
train_df = train_assembler.transform(train_df)

In [None]:
# split the dataset
dataset = train_df.select(['features', 'Click'])
train, test = dataset.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.classification import LogisticRegression

log_reg = LogisticRegression(labelCol = 'Click').fit(train)

train_pred = log_reg.evaluate(train).predictions

train_pred.filter(train_pred['Click'] == 1).filter(train_pred['prediction'] == 1).select(['Click', 'prediction', 'probability']).show(10, False)

### Model Evaluation

In [None]:
result_lr = log_reg.evaluate(test).predictions
result_lr.show(3)

In [None]:
tp_lr = result_lr[(result_lr.label == 1) & (result_lr.prediction == 1)].count()
tn_lr = result_lr[(result_lr.label == 0) & (result_lr.prediction == 1)].count()
fp_lr = result_lr[(result_lr.label == 0) & (result_lr.prediction == 1)].count()
fn_lr = result_lr[(result_lr.label == 1) & (result_lr.prediction == 0)].count()

print('tp is : %f'%(tp_lr))
print('tn is : %f'%(tn_lr))
print('fp is : %f'%(fp_lr))
print('fn is : %f'%(fn_lr))

In [None]:
# Accuracy
print('test accuracy is : %f'%((tp_lr+tn_lr)/(tp_lr+tn_lr+fp_lr+fn_lr)))

# Recall
print('test accuracy is : %f'%(tp_lr/(tp_lr+fn_lr)))

# Precision
print('test accuracy is : %f'%(tp_lr/(tp_lr+fp_lr)))

In [None]:
preds_lr = result_lr.select('Click','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['Click']))).collect()

from sklearn.metrics import roc_curve
y_score, y_true = zip(*preds_lr)
fpr, tpr, thresholds = roc_curve(y_true, y_score, pos_label = 1)

In [None]:
import matplotlib.pyplot as plt

plt.plot(fpr, tpr)
plt.title('ROC curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')

## Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Click')
rf_model = rf.fit(train)

In [None]:
result_rf = rf_model.transform(test).select("Click", "probability").rdd.map(lambda row: (float(row['probability'][1]), float(row['Click']))).collect()
result_rf.show(5)

In [None]:
tp_rf = result_rf[(result_rf.label == 1) & (result_rf.prediction == 1)].count()
tn_rf = result_rf[(result_rf.label == 0) & (result_rf.prediction == 1)].count()
fp_rf = result_rf[(result_rf.label == 0) & (result_rf.prediction == 1)].count()
fn_rf = result_rf[(result_rf.label == 1) & (result_rf.prediction == 0)].count()

print('tp is : %f'%(tp_rf))
print('tn is : %f'%(tn_rf))
print('fp is : %f'%(fp_rf))
print('fn is : %f'%(fn_rf))

In [None]:
# Accuracy
print('test accuracy is : %f'%((tp_rf+tn_rf)/(tp_rf+tn_rf+fp_rf+fn_rf)))

# Recall
print('test accuracy is : %f'%(tp_rf/(tp_rf+fn_rf)))

# Precision
print('test accuracy is : %f'%(tp_rf/(tp_rf+fp_rf)))

## Naive Bayes

In [None]:
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.classification import NaiveBayes 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


train_nb_assembler = VectorAssembler(inputCols=['UId',
                        'QId', 'AdId',
                        'KeyId', 'Pos'],
                 outputCol='features')
train_nb_df = train_nb_assembler.transform(training)

In [None]:
# split the dataset
dataset_nb = train_nb_df.select(['features', 'Click'])
train_nb, test_nb = dataset_nb.randomSplit([0.8, 0.2])

In [None]:
nb = NaiveBayes(modelType='multinomial')
nbmodel = nb.fit(train_nb)

### Model Evaluation

In [None]:
result_nb = nbmodel.transform(test_nb)
result_nb.show(5)

In [None]:
tp_nb = result_nb[(result_nb.label == 1) & (result_nb.prediction == 1)].count()
tn_nb = result_nb[(result_nb.label == 0) & (result_nb.prediction == 1)].count()
fp_nb = result_nb[(result_nb.label == 0) & (result_nb.prediction == 1)].count()
fn_nb = result_nb[(result_nb.label == 1) & (result_nb.prediction == 0)].count()

print('tp is : %f'%(tp_nb))
print('tn is : %f'%(tn_nb))
print('fp is : %f'%(fp_nb))
print('fn is : %f'%(fn_nb))

In [None]:
# Accuracy
print('test accuracy is : %f'%((tp_nb+tn_nb)/(tp_nb+tn_nb+fp_nb+fn_nb)))

# Recall
print('test accuracy is : %f'%(tp_nb/(tp_nb+fn_nb)))

# Precision
print('test accuracy is : %f'%(tp_nb/(tp_nb+fp_nb)))

In [None]:
preds_nb = result_nb.select('Click','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['Click']))).collect()

from sklearn.metrics import roc_curve
y_score, y_true = zip(*preds_nb)
fpr, tpr, thresholds = roc_curve(y_true, y_score, pos_label = 1)

In [None]:
import matplotlib.pyplot as plt

plt.plot(fpr, tpr)
plt.title('ROC curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')