In [1]:
df = sqlContext.read.csv('s3a://sparkdemonstration/10M.csv', header=True, inferSchema=True)

from pyspark.sql.types import IntegerType
from math import floor
from pyspark.sql.functions import rand
from pyspark.sql.functions import col

def stratifiedSample(df, N, labelCol="y"):
    ctx = df.groupby(labelCol).count()
    ctx = ctx.withColumn('frac', col("count") / df.count())
    frac = ctx.select("y", "frac").rdd.collectAsMap()
    pos = int(floor(frac[1] * N))
    neg = int(floor(frac[0] * N))
    posDF = df.filter(col(labelCol) == 1).orderBy(rand()).limit(pos)
    negDF = df.filter(col(labelCol) == 0).orderBy(rand()).limit(neg)
    return posDF.unionAll(negDF).orderBy(rand())

df = df.withColumn("y", df["click"].cast(IntegerType()))
xdf = stratifiedSample(df, 500_000)

xdf.printSchema()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1589649955619_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- click: boolean (nullable = true)
 |-- C1: integer (nullable = true)
 |-- banner_pos: integer (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: integer (nullable = true)
 |-- device_conn_type: integer (nullable = true)
 |-- C14: integer (nullable = true)
 |-- C15: integer (nullable = true)
 |-- C16: integer (nullable = true)
 |-- C17: integer (nullable = true)
 |-- C18: integer (nullable = true)
 |-- C19: integer (nullable = true)
 |-- C20: integer (nullable = true)
 |-- C21: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- 

In [5]:
cCols = ['C1', 'banner_pos', 'site_domain', 'site_category', 'app_domain',
           'app_category', 'device_model', 'device_type', 'device_conn_type']
cCols += ['C{}'.format(i) for i in range(14, 22)]
#xdf.select(*cCols).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler

stringCols = ['site_domain', 'site_category', 'app_domain', 'app_category', 'device_model']
for c in stringCols:
    outCol = c + '_ix'
    si = StringIndexer(inputCol=c, outputCol=outCol)
    xdf = si.fit(xdf).transform(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

posMapper = udf(lambda x: 0 if x < 0 else x)
xdf = xdf.withColumn('C20_1', posMapper(xdf['C20']))
xdf = xdf.withColumn("C20_1int", xdf['C20_1'].cast(IntegerType()))

categoricalCols = [c+'_ix' for c in ['site_domain', 'site_category', 'app_domain', 'app_category', 'device_model']]
categoricalCols += ['C1', 'C14', 'C15', 'C16', 'C17', 'C19', 'C20_1int', 'C21']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
ohe = OneHotEncoderEstimator(inputCols=categoricalCols, outputCols=[c+'Enc' for c in categoricalCols])
enc_model = ohe.fit(xdf)
xdf = enc_model.transform(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
trainCols = [c+'Enc' for c in categoricalCols]
trainCols += ['hour', 'day', 'dayofweek']
xdf.select('month').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+
|month|
+-----+
|   10|
+-----+

In [10]:
assembler = VectorAssembler(inputCols=trainCols, outputCol='features')
xdf = assembler.transform(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='y')
model = lr.fit(xdf)
result = model.evaluate(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
result.areaUnderROC

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.754245328919811

In [15]:
result.recallByLabel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.9879955769374387, 0.09568679183058114]

## Using L1 Regularization
### For L1, set `elasticNetParam = 1` and for L2, set `elasticNetParam = 0`

In [16]:
lr = LogisticRegression(featuresCol='features', labelCol='y', elasticNetParam=1, regParam=0.001)
model = lr.fit(xdf)
result = model.evaluate(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
result.recallByLabel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.9923054129516715, 0.0600692563190502]

In [20]:
lr = LogisticRegression(featuresCol='features', labelCol='y', elasticNetParam=1, regParam=0.0001)
model = lr.fit(xdf)
result = model.evaluate(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
result.recallByLabel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.9886026639556538, 0.09189418388259404]

In [22]:
lr = LogisticRegression(featuresCol='features', labelCol='y', elasticNetParam=1, regParam=0.01)
model = lr.fit(xdf)
result = model.evaluate(xdf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
result.recallByLabel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.9994483217175745, 0.006678287908412052]

In [34]:
test = stratifiedSample(xdf, 100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
test_results = model.evaluate(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
test_results.predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+----+------+---+-----+---------+---+----+---+--------------+----------------+-------------+---------------+---------------+------+--------+------------------+-----------------+-------------------+----------------+------------------+------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|click|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18| C19|   C20|C21|month|dayofweek|day|hour|  y|site_domain_ix|site_category_ix|app_domain_ix|app_category_ix|device_model_ix| C20_1|C20_1int|            C15Enc|           C21Enc|     