# w261 Final Project - Clickthrough Rate Prediction

In [18]:
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when  
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.sql.functions import lit
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import time

In [2]:
# start Spark Session
from pyspark.sql import SparkSession

app_name = "w261_final_rishi"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .getOrCreate()
sc = spark.sparkContext

In [25]:
#Load the data

train_tmp_rdd = sc.textFile('gs://w261hw5rishi/projdata/dac/test.txt')

rdd_train = train_tmp_rdd.map(lambda r : r.split('\t'))

df_train = rdd_train.toDF()

#df_test = spark.read.csv('data/dac/test.small.csv', header = True, inferSchema = True)

In [None]:
pd.DataFrame(df_train.take(5), columns=df_train.columns).transpose()

In [4]:
def fill_missing_val(t_data):
    #Fill missing values - numerical
    return t_data.na.fill(0)


def get_columns(t_data):
    #Store the original columns
    cols = t_data.columns
    
    #Get numeric and categorical column names
    numericCols = t_data.columns[1:14]
    categoricalColumns = t_data.columns[16:41]
    
    return cols, numericCols, categoricalColumns

def match_test_cols_with_train_cols(t_data):
    
    new_names = []
    
    #rename cols to match train data
    for c in t_data.columns:
        curr_pos = c.split('_')[1]
        new_names.append('_' + str(int(curr_pos) + 1))
   
    t_data = t_data.toDF(*new_names)
    
    #include a dummy output col
    t_data = t_data.withColumn("_1", lit(0))
    
    return t_data

In [5]:
def create_string_indexer_vector_assm_pipeline_stages(num_cols, cat_cols):
    stages = []
    indexerCols = []

    for categoricalCol in cat_cols:
        indexerCol = categoricalCol + "Index"
        indexer = StringIndexer(inputCol=categoricalCol, outputCol= indexerCol).setHandleInvalid("keep")
        stages += [indexer]
        indexerCols.append(indexerCol)

    
    label_stringIdx = StringIndexer(inputCol = '_1', outputCol = 'output')
    stages += [label_stringIdx]
    
    assembler = VectorAssembler(inputCols=indexerCols + num_cols, outputCol="features")
    stages += [assembler]
    
    return stages

In [6]:
def run_transformation_pipeline_stages(t_data, cols, stages):
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(t_data)
    t_data = pipelineModel.transform(t_data)

    selectedCols = ['output', 'features'] + cols
    t_data = t_data.select(selectedCols)
        
    return t_data

In [7]:
def run_standard_scaler(t_data):
    standardscaler=StandardScaler().setInputCol("features").setOutputCol("scaled_features")
    t_data = standardscaler.fit(t_data).transform(t_data)
    
    return t_data

In [8]:
#Train test split

def train_test_split(t_data):
    train, test = t_data.randomSplit([0.7, 0.3], seed = 2019)
    print("Training Dataset Count: " + str(train.count()))
    print("Test Dataset Count: " + str(test.count()))
    
    return train, test

In [9]:
def feature_selection(t_data):
    #Feature selection
    css = ChiSqSelector(featuresCol='scaled_features',outputCol='Aspect',labelCol='output',fpr=0.05)
    t_data=css.fit(t_data).transform(t_data)
    
    return t_data

In [10]:
#Imbalance check - train only

def get_balance_weight_ratio_data(t_data):
    dataset_size=float(t_data.select("output").count())
    numPositives=t_data.select("output").where('output == 1').count()
    per_ones=(float(numPositives)/float(dataset_size))*100
    numNegatives=float(dataset_size-numPositives)
    
    #Rebalance data
    BalancingRatio = numNegatives/dataset_size
    
    return t_data.withColumn("classWeights", when(t_data.output == 1,BalancingRatio).otherwise(1-BalancingRatio))

In [11]:
def print_perf_summary(trainingSummary):
    print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))
    
    accuracy = trainingSummary.accuracy
    falsePositiveRate = trainingSummary.weightedFalsePositiveRate
    truePositiveRate = trainingSummary.weightedTruePositiveRate
    fMeasure = trainingSummary.weightedFMeasure()
    precision = trainingSummary.weightedPrecision
    recall = trainingSummary.weightedRecall
    print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
          % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [12]:
def print_perf_eval(predictions):
    predictions.filter(predictions['prediction'] == 0) \
    .select("probability","output","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
    
    evaluator = BinaryClassificationEvaluator(labelCol = 'output')
    print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Logistic Regression

In [13]:
def run_logistic_regression(tn_data):
    lr = LogisticRegression(maxIter=10, featuresCol="Aspect", labelCol="output", 
                            weightCol="classWeights", predictionCol="prediction")
    # Fit the model
    lrModel = lr.fit(tn_data)

    predict_train=lrModel.transform(tn_data)
    #predict_test=lrModel.transform(ts_data)
    
    trainingSummary = lrModel.summary
    
    print_perf_summary(trainingSummary)

### RF

In [14]:
def run_random_forest_algorithm(tn_data, ts_data):
    rf = RandomForestClassifier(numTrees=10, featuresCol="scaled_features", labelCol="output", predictionCol="prediction")
    rfModel = rf.fit(tn_data)
    predictions = rfModel.transform(ts_data)
    
    print_perf_eval(predictions)
    

### GB

In [15]:
def run_gradient_boost(tn_data, ts_data):
    gbt = GBTClassifier(maxIter=10, featuresCol="scaled_features", labelCol="output", predictionCol="prediction")
    gbtModel = gbt.fit(tn_data)
    predictions = gbtModel.transform(ts_data)
    
    print_perf_eval(predictions)

### LSVC

In [16]:
def run_lsvc(tn_data, ts_data):
    sv = LinearSVC(maxIter=10, regParam=0.1,
                     featuresCol="scaled_features", labelCol="output", predictionCol="prediction")
    svModel = sv.fit(tn_data)
    predictions = svModel.transform(ts_data)
    
    evaluator = BinaryClassificationEvaluator(labelCol = 'output')
    print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [26]:
#Fill missing values
df_train = fill_missing_val(df_train)

In [27]:
#Get columns and stages
cols, num_cols, cat_cols = get_columns(df_train)
stages = create_string_indexer_vector_assm_pipeline_stages(num_cols, cat_cols)

In [None]:
t0 = time.time()

df_train = run_transformation_pipeline_stages(df_train, cols, stages)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

df_train = run_standard_scaler(df_train)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

df_train = feature_selection(df_train)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

train, test = train_test_split(df_train)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

train = get_balance_weight_ratio_data(train)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

run_logistic_regression(train)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

run_random_forest_algorithm(train, test)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

run_gradient_boost(train, test)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))

In [None]:
t0 = time.time()

run_lsvc(train, test)

t1 = time.time()
print('Runtime: %f seconds' % (float(t1 - t0)))