In [1]:
#Reading the data from the S3 bucket. 
#Note: to initialise spark session 'spark' was not called. Directly running a spark command will create a spark session by itself.  

df = spark.read.csv('s3a://.../10M.csv', header=True, inferSchema=True)
df.printSchema()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1589506821050_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- click: boolean (nullable = true)
 |-- C1: integer (nullable = true)
 |-- banner_pos: integer (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: integer (nullable = true)
 |-- device_conn_type: integer (nullable = true)
 |-- C14: integer (nullable = true)
 |-- C15: integer (nullable = true)
 |-- C16: integer (nullable = true)
 |-- C17: integer (nullable = true)
 |-- C18: integer (nullable = true)
 |-- C19: integer (nullable = true)
 |-- C20: integer (nullable = true)
 |-- C21: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- 

In [2]:
# cast the click column to interger data type. 

from pyspark.sql.types import IntegerType

df = df.withColumn("y", df["click"].cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Stratified Sampling

In [3]:
#A user defined function for sampling the data. 

from math import floor
from pyspark.sql.functions import rand
from pyspark.sql.functions import col

def stratifiedSample(df, N, labelCol="y"):
    ctx = df.groupby(labelCol).count()
    ctx = ctx.withColumn('frac', col("count") / df.count())
    frac = ctx.select("y", "frac").rdd.collectAsMap()
    pos = int(floor(frac[1] * N))
    neg = int(floor(frac[0] * N))
    posDF = df.filter(col(labelCol) == 1).orderBy(rand()).limit(pos)
    negDF = df.filter(col(labelCol) == 0).orderBy(rand()).limit(neg)
    return posDF.unionAll(negDF)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# use the function created to sample the 5M rows from the complete dataset. 

xdf = stratifiedSample(df, 5_000_000)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
#Check the distribution of calss labels in the data set

xdf.groupby("y").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+
|  y|  count|
+---+-------+
|  1| 849028|
|  0|4150971|
+---+-------+

In [6]:
# calculate the factions in the dataset

xdf.groupby("y").count().withColumn("frac", col("count") / xdf.count()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+------------------+
|  y|  count|              frac|
+---+-------+------------------+
|  1| 849028|0.1698056339611268|
|  0|4150971|0.8301943660388732|
+---+-------+------------------+

## Feature Extraction / Transformation

### One-Hot-Encoding

In [7]:
# explore the banner position variable

xdf.select("banner_pos").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|banner_pos|
+----------+
|         0|
|         1|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
|         0|
|         0|
+----------+
only showing top 20 rows

In [8]:
# get unique values from the banner_pos column

xdf.select("banner_pos").distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|banner_pos|
+----------+
|         1|
|         3|
|         4|
|         5|
|         2|
|         7|
|         0|
+----------+

In [9]:
# onehot encoding 
#import the onehot encoder
from pyspark.ml.feature import OneHotEncoderEstimator
#create the encoder object
ohe = OneHotEncoderEstimator(inputCols=['banner_pos'], outputCols=['banner_posEnc'])
#fit the obejct to the dataframe
oh_encoder = ohe.fit(xdf)
#tranform the dataframe, by adding the 
encoded = oh_encoder.transform(xdf)

encoded.select("banner_posEnc").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|banner_posEnc|
+-------------+
|(7,[0],[1.0])|
|(7,[1],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
|(7,[1],[1.0])|
|(7,[0],[1.0])|
|(7,[0],[1.0])|
+-------------+
only showing top 20 rows

## String Indexing + One-Hot Encoding

In [10]:
#exploring the site catagore column
encoded.select("site_category").distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

24

In [11]:
encoded.select("site_category").limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|site_category|
+-------------+
|     50e219e0|
|     f028772b|
|     50e219e0|
|     50e219e0|
|     28905ebd|
+-------------+

In [12]:
# import the string indexer
from pyspark.ml.feature import StringIndexer
#similar to the one hot encoder, create a string indexer object and fit it to the dataframe, use the fitted object to transform the dataset.
si = StringIndexer(inputCol='site_category', outputCol='site_category_ix')
encoded = si.fit(encoded).transform(encoded)
#use the output of the sting indexer as an input to the onehot encoder. 
ohe = OneHotEncoderEstimator(inputCols=['site_category_ix'], outputCols=['site_category_ixEnc'])
oh_encoder = ohe.fit(encoded)
encoded = oh_encoder.transform(encoded)

encoded.select("site_category_ixEnc").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|site_category_ixEnc|
+-------------------+
|     (23,[0],[1.0])|
|     (23,[1],[1.0])|
|     (23,[0],[1.0])|
|     (23,[0],[1.0])|
|     (23,[2],[1.0])|
|     (23,[0],[1.0])|
|     (23,[1],[1.0])|
|     (23,[2],[1.0])|
|     (23,[2],[1.0])|
|     (23,[0],[1.0])|
|     (23,[1],[1.0])|
|     (23,[0],[1.0])|
|     (23,[0],[1.0])|
|     (23,[3],[1.0])|
|     (23,[1],[1.0])|
|     (23,[3],[1.0])|
|     (23,[0],[1.0])|
|     (23,[1],[1.0])|
|     (23,[0],[1.0])|
|     (23,[0],[1.0])|
+-------------------+
only showing top 20 rows

### Collecting Encoded Features - VectorAssembler

In [13]:
#import the vector assembler 
from pyspark.ml.feature import VectorAssembler

#create the assembler object
assembler = VectorAssembler(inputCols=["site_category_ixEnc", "banner_posEnc",
                                       "month", "dayofweek", "day", "hour"],
                            outputCol="features")
#transform the data frame using the assembler object. 
encoded = assembler.transform(encoded)
encoded.select("features").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|            features|
+--------------------+
|(34,[0,23,30,31,3...|
|(34,[1,24,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[2,23,30,32,3...|
|(34,[0,23,30,31,3...|
|(34,[1,23,30,31,3...|
|(34,[2,23,30,31,3...|
|(34,[2,23,30,32,3...|
|(34,[0,23,30,31,3...|
|(34,[1,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[3,23,30,31,3...|
|(34,[1,23,30,31,3...|
|(34,[3,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[1,24,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[0,23,30,31,3...|
+--------------------+
only showing top 20 rows

## Putting it all together - pipelines

In [14]:
# import pipline API
from pyspark.ml import Pipeline
#create the bjects of all the setps you want to pipeline
si = StringIndexer(inputCol='site_category', outputCol='site_category_ix')
ohe = OneHotEncoderEstimator(inputCols=['site_category_ix', 'banner_pos'],
                             outputCols=['site_category_ixEnc', 'banner_posEnc'])
assembler = VectorAssembler(inputCols=["site_category_ixEnc", "banner_posEnc",
                                       "month", "dayofweek", "day", "hour"],
                            outputCol="features")
#create the pipeline object
pipeline = Pipeline(stages=[si, ohe, assembler])
#use the object to transform the dataframe 
encoded = pipeline.fit(xdf).transform(xdf)

encoded.select("features").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|            features|
+--------------------+
|(34,[0,23,30,31,3...|
|(34,[1,24,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[2,23,30,32,3...|
|(34,[0,23,30,31,3...|
|(34,[1,23,30,31,3...|
|(34,[2,23,30,31,3...|
|(34,[2,23,30,32,3...|
|(34,[0,23,30,31,3...|
|(34,[1,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[3,23,30,31,3...|
|(34,[1,23,30,31,3...|
|(34,[3,23,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[1,24,30,31,3...|
|(34,[0,23,30,31,3...|
|(34,[0,23,30,31,3...|
+--------------------+
only showing top 20 rows