# Intro

The most common measure of effectiveness of online ads is the **click-through rate (CTR)**, which is the ratio of clicks on a specific ad to its total number of views. The higher the CTR rate, the better targeted an ad is, and the more successful an online advertising campaign is.

# Imports

In [100]:
import gzip
import random
import timeit
import numpy as np
import pandas as pd
import xgboost as xgb

from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType, IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import warnings
warnings.filterwarnings('ignore')

In [73]:
spark.version

'3.4.0'

# Data

We will use the dataset from a Kaggle machine learning competition, Click-Through Rate Prediction (https://www.kaggle.com/c/avazu-ctr-prediction). The dataset can be downloaded from https://www.kaggle.com/c/avazu-ctr-prediction/data.

Only the `train.gz` file contains labeled samples, so we only need to download this and unzip it (it will take a while). We will first focus on only the first 300,000 samples from the train file unzipped from `train.gz`.

In [3]:
n_rows = 300000
with gzip.open('../datasets/avazu-ctr-prediction/train.gz') as f:
    df = pd.read_csv(f, nrows=n_rows)

In [4]:
df.head(5)

Unnamed: 0,id,click,hour,C1,banner_pos,site_id,site_domain,site_category,app_id,app_domain,...,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21
0,1.000009e+18,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,...,1,2,15706,320,50,1722,0,35,-1,79
1,1.000017e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,...,1,0,15704,320,50,1722,0,35,100084,79
2,1.000037e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,...,1,0,15704,320,50,1722,0,35,100084,79
3,1.000064e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,...,1,0,15706,320,50,1722,0,35,100084,79
4,1.000068e+19,0,14102100,1005,1,fe8cc448,9166c161,0569f928,ecad2386,7801e8d9,...,1,0,18993,320,50,2161,0,35,-1,157


Note the anonymized and hashed values. They are categorical features, and each possible value of them corresponds to a real and meaningful value, but it is presented this way due to privacy policy. 

In [5]:
# the target variable is the click column:
Y = df['click'].values

In [6]:
# drop irrelevant columns
X = df.drop(['click', 'id', 'hour', 'device_id', 'device_ip'], axis=1).values

The samples are in chronological order, as indicated in the `hour` field. Obviously, we cannot use future samples to predict the past ones. Hence, we take the first 90% as training samples and the rest as testing samples:

In [7]:
n_train = int(n_rows * 0.9)
X_train, Y_train = X[:n_train], Y[:n_train]
X_test, Y_test = X[n_train:], Y[n_train:]

# Predicting CTR with a Decision Tree

Tree-based algorithms in scikit-learn require categorical features to be encoded as numerical values before they can be used as input to the algorithms.

There are a few common approaches to encoding categorical features for tree-based algorithms:

1. **Label Encoding**: In this approach, each unique category is assigned a numerical label. For example, "red" could be encoded as 0, "blue" as 1, and "green" as 2. Scikit-learn provides the `LabelEncoder` class for this purpose.

2. **One-Hot Encoding**: This approach creates binary columns for each category, indicating the presence or absence of that category in the original feature. For example, if the original feature had three categories ("red", "blue", "green"), then three binary columns would be created, with values of 1 or 0 indicating the presence or absence of each category. Scikit-learn provides the `OneHotEncoder` class or the `get_dummies()` function in pandas for one-hot encoding.

3. **Ordinal Encoding**: This approach assigns numerical values to categories based on their order or rank. For example, if the categories are "low," "medium," and "high," they could be encoded as 0, 1, and 2, respectively. Scikit-learn does not have a built-in ordinal encoder, but you can use libraries such as the `category_encoders` package to perform ordinal encoding.

Once the categorical features are encoded as numerical values, you can use them as input to tree-based algorithms in scikit-learn, such as decision trees, random forests, or gradient boosting models.

We will now transform string-based categorical features into one-hot encoded vectors using the OneHotEncoder module from scikit-learn.

In the following line of code, the `OneHotEncoder` class is instantiated with the `handle_unknown='ignore'` parameter. This parameter specifies how to handle unknown categories that are encountered during encoding. By setting it to 'ignore', any unknown category found in the test set (categories not seen during training) will be ignored instead of raising an error.

In [8]:
enc = OneHotEncoder(handle_unknown='ignore')

Now we fit the encoder on the training data, transform the training data into its encoded form, and then use the learned encoding to transform the test data.

In [9]:
X_train_enc = enc.fit_transform(X_train)
X_test_enc = enc.transform(X_test)

In [10]:
X_train_enc[0]

<1x8204 sparse matrix of type '<class 'numpy.float64'>'
	with 19 stored elements in Compressed Sparse Row format>

We will train a decision tree model using grid search. 

We set `scoring= 'roc_auc'`, as it is an imbalanced binary case (only 51,211 out of 300,000 training samples are clicks, which is a 17% positive CTR):

In [11]:
len(df[df.click == 1])/n_rows

0.17070333333333335

In [12]:
parameters = {'max_depth': [3,10, None]}
decision_tree = DecisionTreeClassifier(criterion='gini',
                                       min_samples_split=30)

In [14]:
grid_search = GridSearchCV(decision_tree, 
                           parameters,
                           n_jobs=-1, # use all of the available CPU processors
                           cv=3, # three-fold cross validation
                           scoring='roc_auc')

In [15]:
grid_search.fit(X_train_enc, Y_train)
print(grid_search.best_params_)

{'max_depth': 10}


In [16]:
decision_tree_best = grid_search.best_estimator_

In [17]:
pos_prob = decision_tree_best.predict_proba(X_test_enc)[:, 1]

In [18]:
print(f'The ROC AUC on testing set is: {roc_auc_score(Y_test, pos_prob):.3f}')

The ROC AUC on testing set is: 0.719


The ROC AUC on testing set of 0.719 does not seem to be very high, but click-through involves many intricate human factors, which is why predicting it is not an easy task. Although we can further optimize the hyperparameters, an AUC of 0.72 is actually pretty good. Randomly selecting 17% of the samples to be clicked on will generate an AUC of 0.496:

In [19]:
percent = len(df[df.click == 1])/n_rows
pos_prob = np.zeros(len(Y_test))

random.seed(42) # set the random seed
click_index = np.random.choice(len(Y_test), int(len(Y_test) *  percent), 
                               replace=False)
pos_prob[click_index] = 1

print(f'The ROC AUC on testing set is: {roc_auc_score(Y_test, pos_prob):.3f}')

The ROC AUC on testing set is: 0.496


# Predicting CTR with a Random Forest

In [20]:
random_forest = RandomForestClassifier(n_estimators=100,
                                       criterion='gini',
                                       min_samples_split=30,
                                       n_jobs=-1)

In [21]:
# fine tune the max_depth hyperparameter
grid_search = GridSearchCV(random_forest, 
                           parameters,
                           n_jobs=-1, 
                           cv=3, 
                           scoring='roc_auc')

In [22]:
grid_search.fit(X_train_enc, Y_train)
print(grid_search.best_params_)
print(grid_search.best_score_)

{'max_depth': None}
0.7356234156699046


In [23]:
random_forest_best = grid_search.best_estimator_
pos_prob = random_forest_best.predict_proba(X_test_enc)[:, 1]
print(f'The ROC AUC on testing set is: {roc_auc_score(Y_test, pos_prob):.3f}')

The ROC AUC on testing set is: 0.759


It turns out that the random forest model gives an improvement to the performance.

# Predicting CTR with Gradient Boosting

In gradient boosted trees (GBT) (also called gradient boosting machines), individual trees are trained in succession where a tree aims to correct the errors made by the previous tree.

We will use the XGBoost package (https://xgboost.readthedocs.io/en/latest/) to implement GBT. Check [XGBClassifier](https://xgboost.readthedocs.io/en/latest/python/python_api.html#xgboost.XGBClassifier) to see what other hyperparameters we can tweak.

We first install the XGBoost Python API via the following command:

In [22]:
# pip install xgboost

In [24]:
model = xgb.XGBClassifier(learning_rate=0.1,
                          max_depth=10,
                          n_estimators=1000)

In [25]:
model.fit(X_train_enc, Y_train)

In [26]:
pos_prob = model.predict_proba(X_test_enc)[:, 1]
print(f'The ROC AUC on testing set is: {roc_auc_score(Y_test, pos_prob):.3f}')

The ROC AUC on testing set is: 0.771


We are able to achieve 0.77 AUC using the XGBoost GBT model.

# Predicting CTR with Logistic Regression

In gradient descent-based logistic regression models, all training samples are used to update the weights in every single iteration. Hence, if the number of training samples is large, the whole training process will become very time-consuming and computationally expensive.

In stochastic gradient descent (SGD), for each weight update, only one training sample is consumed, instead of the complete training set. The model moves a step based on the error calculated by a single training sample. Once all samples are used, one iteration finishes. SGD generally converges much faster than gradient descent where a large number of iterations is usually needed.

In [27]:
# SGD logistic regression model without regularization
sgd_lr = SGDClassifier(loss='log_loss', 
                       penalty=None,
                       fit_intercept=True, 
                       max_iter=10,
                       learning_rate='constant', 
                       eta0=0.01)

In [28]:
sgd_lr.fit(X_train_enc.toarray(), Y_train)

In [29]:
pred = sgd_lr.predict_proba(X_test_enc.toarray())[:, 1]
print(f'Training samples: {n_train}, AUC on testing set: {roc_auc_score(Y_test, pred):.3f}')

Training samples: 270000, AUC on testing set: 0.767


In [30]:
# SGD logistic regression model with L1 regularization
sgd_lr_l1 = SGDClassifier(loss='log_loss', 
                          penalty='l1', 
                          alpha=0.0001, 
                          fit_intercept=True, 
                          max_iter=10, 
                          learning_rate='constant', 
                          eta0=0.01)
sgd_lr_l1.fit(X_train_enc.toarray(), Y_train)

In [31]:
pred = sgd_lr_l1.predict_proba(X_test_enc.toarray())[:, 1]
print(f'Training samples: {n_train}, AUC on testing set: {roc_auc_score(Y_test, pred):.3f}')

Training samples: 270000, AUC on testing set: 0.750


# Scale up SGD with online learning

 We can scale up SGD further with online learning techniques. In online learning, new data for training is available in sequential order or in real time, as opposed to all at once in an offline learning environment. A relatively small chunk of data is loaded and preprocessed for training at a time, which releases the memory used to hold the entire large dataset. Besides better computational feasibility,
online learning is also used because of its adaptability to cases where new data is generated in real time and is needed for modernizing the model. 

Online learning allows the model to continue training with new arriving data. However, in offline learning, we have to retrain the whole model with the new arriving data along with the old data. Click-through prediction models need to include the most recent data reflecting users' latest behaviors and tastes.

The `SGDClassifier` module in scikit-learn implements online learning with
the `partial_fit` method. We will train the model with 1,000,000 samples, where we feed in 100,000 samples at one time to simulate an online learning environment. And we will test the trained model on another 100,000 samples.

In [33]:
n_rows = 1100000
with gzip.open('../datasets/avazu-ctr-prediction/train.gz') as f:
    df = pd.read_csv(f, nrows=n_rows)

In [34]:
X = df.drop(['click', 'id', 'hour', 'device_id', 'device_ip'], axis=1).values
Y = df['click'].values

In [35]:
n_train = 1000000
X_train, Y_train = X[:n_train], Y[:n_train]
X_test, Y_test = X[n_train:], Y[n_train:]

In [36]:
enc = OneHotEncoder(handle_unknown='ignore')
enc.fit(X_train)

In [37]:
X_train.shape

(1000000, 19)

In [38]:
sgd_lr_online = SGDClassifier(loss='log_loss', 
                              penalty=None, 
                              fit_intercept=True, 
                              max_iter=1, 
                              learning_rate='constant',
                              eta0=0.01)

In [39]:
start_time = timeit.default_timer()

batch_size = 100000
num_samples = len(X_train)

for i in range(0, num_samples, batch_size):
    x_train = X_train[i:i+batch_size]
    y_train = Y_train[i:i+batch_size]
    x_train_enc = enc.transform(x_train)
    sgd_lr_online.partial_fit(x_train_enc.toarray(), y_train, classes=[0, 1])

print(f"--- {(timeit.default_timer() - start_time)}.3fs seconds ---")

--- 128.24422533300003.3fs seconds ---


With online learning, training based on a total of 1 million samples only takes 128 seconds.


In [40]:
x_test_enc = enc.transform(X_test)

In [41]:
pred = sgd_lr_online.predict_proba(x_test_enc.toarray())[:, 1]
print(f'Training samples: {n_train}, AUC on testing set: {roc_auc_score(Y_test, pred):.3f}')

Training samples: 1000000, AUC on testing set: 0.758


# Scale up with PySpark

Now we will see how we can efficiently train the model on the entire dataset of 40 million samples. We will utilize tools such as Spark and the PySpark module to scale up our solution.

Before configuring PySpark, you need to have [Apache Spark](https://spark.apache.org/downloads.html) installed. 

In [73]:
#pip install pyspark

In [42]:
spark = SparkSession \
    .builder \
    .appName("CTR") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/26 15:48:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


With a Spark session, `spark`, a DataFrame object can be created by reading a file. In the following example, we will create a DataFrame object, `df`, from a CSV file. First, we define the schema:

In [43]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("click", IntegerType(), True),
    StructField("hour", IntegerType(), True),
    StructField("C1", StringType(), True),
    StructField("banner_pos", StringType(), True),
    StructField("site_id", StringType(), True),
    StructField("site_domain", StringType(), True),
    StructField("site_category", StringType(), True),
    StructField("app_id", StringType(), True),
    StructField("app_domain", StringType(), True),
    StructField("app_category", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_ip", StringType(), True),
    StructField("device_model", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("device_conn_type", StringType(), True),
    StructField("C14", StringType(), True),
    StructField("C15", StringType(), True),
    StructField("C16", StringType(), True),
    StructField("C17", StringType(), True),
    StructField("C18", StringType(), True),
    StructField("C19", StringType(), True),
    StructField("C20", StringType(), True),
    StructField("C21", StringType(), True),
])

In [44]:
df = spark.read.csv('../datasets/avazu-ctr-prediction/train.gz', header=True, schema=schema)

In [45]:
df.show(5)

                                                                                

+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
|                  id|click|    hour|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18|C19|   C20|C21|
+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
| 1000009418151094273|    0|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| ddd2926e|    44956a24|          1|               2|15706|320| 50|1722|  0| 35|    -1| 79|
|10000169349117863715|    0|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    

In [46]:
df.count()

                                                                                

40428967

In [47]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- click: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- C1: string (nullable = true)
 |-- banner_pos: string (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_conn_type: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string (nullable = true)
 |-- C16: string (nullable = true)
 |-- C17: string (nullable = true)
 |-- C18: string (nullable = true)
 |-- C19: string (nullable = true)
 |-- C20: string (nullable = true)
 |-- C21: string (nullable = true)



In [48]:
df = df.drop('id').drop('hour').drop('device_id').drop('device_ip')
df = df.withColumnRenamed("click", "label")

In [49]:
df.show(5)

+-----+----+----------+--------+-----------+-------------+--------+----------+------------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
|label|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18|C19|   C20|C21|
+-----+----+----------+--------+-----------+-------------+--------+----------+------------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22|    44956a24|          1|               2|15706|320| 50|1722|  0| 35|    -1| 79|
|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22|    711ee120|          1|               0|15704|320| 50|1722|  0| 35|100084| 79|
|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22|    8a4875bd|          1|               0|15704|3

In [50]:
df.columns

['label',
 'C1',
 'banner_pos',
 'site_id',
 'site_domain',
 'site_category',
 'app_id',
 'app_domain',
 'app_category',
 'device_model',
 'device_type',
 'device_conn_type',
 'C14',
 'C15',
 'C16',
 'C17',
 'C18',
 'C19',
 'C20',
 'C21']

In [51]:
df.select("label").show()

+-----+
|label|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    1|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



We split the data into a training set and testing set: 70% of the samples are used for training and the remaining samples are used for testing. We use PySpark's [randomSplit()](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.randomSplit.html) method.





In [52]:
df_train, df_test = df.randomSplit([0.7, 0.3], seed=42)

## Caching the Data

In Spark, caching and persistence are optimization techniques that reduce the computation overhead. This saves the intermediate results of RDD or DataFrame operations in memory and/or on disk. Without caching or persistence, whenever an intermediate DataFrame is needed, it will be recalculated again according to how it was created originally. Depending on the storage level, persistence behaves differently:

* MEMORY_ONLY: The object is only stored in memory. If it does not fit in memory, the remaining part will be recomputed each time it is needed.
* DISK_ONLY: The object is only kept on disk. A persisted object can be extracted directly from storage without being recalculated.
* MEMORY_AND_DISK: The object is stored in memory, and might be on disk as well. If the full object does not fit in memory, the remaining partition will be stored on disk, instead of being recalculated every time it is needed. This is the default mode for caching and persistence in Spark. It takes advantage of both the fast retrieval of in-memory storage and the high accessibility and capacity of disk storage.

Let's cache both the training and testing DataFrames:

In [53]:
df_train.cache()

DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]

In [54]:
df_test.cache()

DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]

## One-hot encoding categorical features

In [65]:
categorical = df_train.columns

In [66]:
categorical.remove('label')

In [74]:
indexers = [StringIndexer(inputCol=c,outputCol="{0}_indexed".format(c)).setHandleInvalid("keep") for c in categorical]

The `setHandleInvalid("keep")` handle makes sure the application won't crash if any new categorical value occurs.

We perform one-hot encoding on each individual indexed categorical column:

In [76]:
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers])

We concatenate all sets of generated binary vectors into a single one using the `VectorAssembler` module. This creates the final encoded vector column called `features`.

In [78]:
assembler = VectorAssembler(inputCols=encoder.getOutputCols(), outputCol="features")

In [83]:
stages = indexers + [encoder, assembler]
pipeline = Pipeline(stages=stages)

In [84]:
one_hot_encoder = pipeline.fit(df_train)

23/06/26 16:33:13 WARN MemoryStore: Not enough space to cache rdd_23_0 in memory! (computed 123.0 MiB so far)
23/06/26 16:33:13 WARN BlockManager: Persisting block rdd_23_0 to disk instead.
                                                                                

In [85]:
df_train_encoded = one_hot_encoder.transform(df_train)

In [86]:
df_train_encoded = df_train_encoded.select( ["label", "features"])

In [87]:
df_train_encoded.show()

23/06/26 16:38:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/06/26 16:39:00 WARN DAGScheduler: Broadcasting large task binary with size 1220.8 KiB


+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,3990,...|
|    0|(31490,[5,7,4005,...|
|    0|(31490,[5,7,3744,...|
|    0|(31490,[5,7,3744,...|
|    0|(31490,[5,7,4061,...|
|    0|(31490,[5,7,3765,...|
|    0|(31490,[5,7,3765,...|
|    0|(31490,[5,7,3776,...|
+-----+--------------------+
only showing top 20 rows



In [88]:
df_train_encoded.cache()

DataFrame[label: int, features: vector]

In [91]:
df_test_encoded = one_hot_encoder.transform(df_test)
df_test_encoded = df_test_encoded.select(["label", "features"])
df_test_encoded.show()

23/06/26 16:43:03 WARN DAGScheduler: Broadcasting large task binary with size 1220.8 KiB
23/06/26 16:48:27 WARN MemoryStore: Not enough space to cache rdd_185_0 in memory! (computed 43.1 MiB so far)
23/06/26 16:48:27 WARN BlockManager: Persisting block rdd_185_0 to disk instead.
[Stage 64:>                                                         (0 + 1) / 1]

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,2794,...|
|    0|(31490,[5,7,4005,...|
|    0|(31490,[5,7,4005,...|
|    0|(31490,[5,7,3744,...|
|    0|(31490,[5,7,4061,...|
|    0|(31490,[5,7,4061,...|
|    0|(31490,[5,7,4530,...|
|    0|(31490,[5,7,3765,...|
|    0|(31490,[5,7,4530,...|
|    0|(31490,[5,7,3776,...|
|    0|(31490,[5,7,3028,...|
|    0|(31490,[5,7,3028,...|
|    0|(31490,[5,7,3028,...|
+-----+--------------------+
only showing top 20 rows



                                                                                

In [92]:
df_test_encoded.cache()

DataFrame[label: int, features: vector]

To release some space, we uncache `df_train` and `df_test`:


In [90]:
df_train.unpersist()

DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]

In [93]:
df_test.unpersist()

DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]

## Training and testing a [logistic regression model](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html)

In [95]:
classifier = LogisticRegression(maxIter=20, 
                                regParam=0.001,
                                elasticNetParam=0.001)

In [96]:
lr_model = classifier.fit(df_train_encoded)

23/06/26 16:56:01 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
23/06/26 17:00:26 WARN MemoryStore: Not enough space to cache rdd_195_0 in memory! (computed 331.9 MiB so far)
23/06/26 17:00:26 WARN BlockManager: Persisting block rdd_195_0 to disk instead.
23/06/26 17:08:05 WARN MemoryStore: Not enough space to cache rdd_195_0 in memory! (computed 331.9 MiB so far)
23/06/26 17:08:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/26 17:08:55 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
23/06/26 17:08:56 WARN MemoryStore: Not enough space to cache rdd_195_0 in memory! (computed 331.9 MiB so far)
23/06/26 17:08:57 WARN MemoryStore: Not enough space to cache rdd_213_0 in memory! (computed 65.0 MiB so far)
23/06/26 17:08:57 WARN BlockManager: Persisting block rdd_213_0 to disk instead.
23/06/26 17:10:11 WARN MemoryStore: Not enough space to cache rdd_213_0 in memory! (computed 272.8 MiB so far)
23/06

In [97]:
predictions = lr_model.transform(df_test_encoded)

In [98]:
predictions.cache()

DataFrame[label: int, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [99]:
predictions.show()

23/06/26 17:28:54 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/06/26 17:33:16 WARN MemoryStore: Not enough space to cache rdd_259_0 in memory! (computed 190.9 MiB so far)
23/06/26 17:33:16 WARN BlockManager: Persisting block rdd_259_0 to disk instead.
23/06/26 17:36:31 WARN MemoryStore: Not enough space to cache rdd_259_0 in memory! (computed 331.9 MiB so far)
23/06/26 17:36:31 WARN MemoryStore: Not enough space to cache rdd_265_0 in memory! (computed 67.4 MiB so far)
23/06/26 17:36:31 WARN BlockManager: Persisting block rdd_265_0 to disk instead.
23/06/26 17:37:13 WARN MemoryStore: Not enough space to cache rdd_265_0 in memory! (computed 257.8 MiB so far)


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|(31490,[5,7,2794,...|[5.62405163440240...|[0.99640300001753...|       0.0|
|    0|(31490,[5,7,2794,...|[5.63177753732283...|[0.99643058420230...|       0.0|
|    0|(31490,[5,7,2794,...|[5.76519716496787...|[0.99687501888506...|       0.0|
|    0|(31490,[5,7,2794,...|[5.42726668833494...|[0.99562413975328...|       0.0|
|    0|(31490,[5,7,2794,...|[5.42726668833494...|[0.99562413975328...|       0.0|
|    0|(31490,[5,7,2794,...|[5.42726668833494...|[0.99562413975328...|       0.0|
|    0|(31490,[5,7,2794,...|[5.42726668833494...|[0.99562413975328...|       0.0|
|    0|(31490,[5,7,2794,...|[5.43499259125538...|[0.99565767072626...|       0.0|
|    0|(31490,[5,7,4005,...|[-2.8232800910379...|[0.05607905157199...|       1.0|
|    0|(31490,[5

                                                                                

In [101]:
ev = BinaryClassificationEvaluator(rawPredictionCol ="rawPrediction", metricName = "areaUnderROC")
print(ev.evaluate(predictions))

23/06/26 17:38:13 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/06/26 17:38:13 WARN MemoryStore: Not enough space to cache rdd_265_0 in memory! (computed 257.8 MiB so far)
[Stage 99:>                                                         (0 + 1) / 1]

0.7490537166477022


                                                                                