<a href="https://www.kaggle.com/code/c14210099/ctr-prediction?scriptVersionId=180427998" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import RobustScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

## Import Pyspark

In [None]:
!pip install -q pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('Avazu CTR').getOrCreate()

## Load the Training Data

total rows : 40428967

In [None]:
total_data = 40428967

# load 10% data
train_df = spark.read.csv('/kaggle/input/avazu-ctr-prediction/train.gz', header=True, inferSchema=True).limit(int(total_data * 0.1))

In [None]:
train_df.printSchema()

In [None]:
# show how many data
train_df.count()

### Exploratory Data Analysis

In [None]:
# eda
train_df.describe().toPandas()

In [None]:
# number of click
print(f'Number of click: {train_df[train_df["click"] == 1].count()}')

# number of non-click
print(f'Number of non-click: {train_df[train_df["click"] == 0].count()}')

In [None]:
# group by site id
train_df.groupBy('site_id').count().orderBy('count', ascending=False).toPandas()

In [None]:
# group by site domain
train_df.groupBy('site_domain').count().orderBy('count', ascending=False).toPandas()

In [None]:
# group by site category
train_df.groupBy('site_category').count().orderBy('count', ascending=False).toPandas()

In [None]:
# group by app id
train_df.groupBy('app_id').count().orderBy('count', ascending=False).toPandas()

In [None]:
# group by app domain
train_df.groupBy('app_domain').count().orderBy('count', ascending=False).toPandas()

In [None]:
# check missing value
from pyspark.sql.functions import isnan, when, count, col
print(f'Number of missing value: {train_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df.columns]).collect()[0].asDict()}')

## Data Preprocessing

In [None]:
# print numerical and categorical columns
numerical_columns = [train_df.dtypes[i] for i in range(len(train_df.dtypes)) if train_df.dtypes[i][1] == 'int' or train_df.dtypes[i][1] == 'double']
categorical_columns = [train_df.dtypes[i] for i in range(len(train_df.dtypes)) if train_df.dtypes[i][1] == 'string']

In [None]:
numerical_columns

In [None]:
categorical_columns

### Label Encoding

In [None]:
click_count = train_df[train_df["click"] == 1].count()
no_click_count = train_df[train_df["click"] == 0].count()

In [None]:
plt.title('Click vs. Non-Click')
plt.bar(['Click', 'Non-Click'], [click_count, no_click_count])
plt.ylabel('Count')
plt.xlabel('Click')
plt.show()

In [None]:
def undersample_data (df, click_count, no_click_count) : 
    fractions = click_count/no_click_count
    undersampled_df = df.sampleBy('click', fractions={0: fractions, 1: 1}, seed=11)
    
    return undersampled_df

In [None]:
preprocessed_df = undersample_data(train_df, click_count, no_click_count)
print(f'Number of click: {preprocessed_df[preprocessed_df["click"] == 1].count()}')
print(f'Number of non-click: {preprocessed_df[preprocessed_df["click"] == 0].count()}')

### Categorical Label Encoding

In [None]:
def labelEncoding (df) :
    categorical_columns = [df.dtypes[i] for i in range(len(df.dtypes)) if df.dtypes[i][1] == 'string']

    for col in categorical_columns:
        string_indexer = StringIndexer(inputCol=col[0], outputCol=col[0]+'_index')
        df = string_indexer.fit(df).transform(df)
        df = df.drop(col[0])

    return df

In [None]:
preprocessed_df = labelEncoding(preprocessed_df)

In [None]:
# show 10 rows of preprocessed df
preprocessed_df.limit(10).toPandas()

### Get Correlation Matrix

In [None]:
def get_corr (df) :
    vector_col = 'corr_features'
    assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
    df_vector = assembler.transform(df).select(vector_col)

    matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]
    matrix = np.array(matrix.toArray())
    return matrix

In [None]:
corr = get_corr(preprocessed_df)

In [None]:
plt.figure(figsize=(10, 10))
plt.matshow(corr, fignum=1)
plt.title('Correlation Heatmap')
plt.xticks(range(len(preprocessed_df.columns)), preprocessed_df.columns, rotation='vertical')
plt.yticks(range(len(preprocessed_df.columns)), preprocessed_df.columns)
plt.colorbar()
plt.show()

### Drop unnecessary columns

In [None]:
def drop_columns (df, collumns_to_drop) :
    for col in collumns_to_drop :
        df = df.drop(col)
    return df

In [None]:
collumns_to_drop = ['id']
preprocessed_df = drop_columns(preprocessed_df, collumns_to_drop)

In [None]:
preprocessed_df.printSchema()

### Features Columns

In [None]:
def add_features_column (df) :
    columns = []
    
    for each in df.columns :
        if each != 'click' :
            columns.append(each)
            
    assembler = VectorAssembler(inputCols=columns, outputCol='features')
    df = assembler.transform(df)
    df = df.drop(*columns)
            
    return df

In [None]:
preprocessed_df = add_features_column(preprocessed_df)

In [None]:
preprocessed_df.limit(5).toPandas()

### Normalize Features

In [None]:
def normalize_features (df) :
    scaler = RobustScaler(inputCol='features', outputCol='scaled_features')
    df = scaler.fit(df).transform(df)
    df = df.drop('features')
    df = df.withColumnRenamed('scaled_features', 'features')
    return df

In [None]:
preprocessed_df = normalize_features(preprocessed_df)

In [None]:
preprocessed_df.limit(5).toPandas()

## Data Training

### Split Data

In [None]:
train_df, test_df = preprocessed_df.randomSplit([0.8, 0.2], seed=11)

In [None]:
print(f'Number of train data: {train_df.count()}')
print(f'Number of test data: {test_df.count()}')

### Model Building

In [None]:
model = DecisionTreeClassifier(labelCol='click', featuresCol='features')
model = model.fit(train_df)
result = model.transform(test_df)

### Model Evaluation

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='click', metricName='accuracy')
accuracy = evaluator.evaluate(result)
precision = evaluator.evaluate(result, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(result, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(result, {evaluator.metricName: "f1"})

In [None]:
print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1: {f1}')