## Analysing User Clicks (Kelkoo.com)

Source: https://archive.ics.uci.edu/ml/datasets/KASANDR

Spark running on default HDInsight Cluster in Azure (2 head nodes, 4 workers); Data loaded into *'hdfs://{path}/lukasb23_dir/{file}'*.

Setup:
https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-jupyter-spark-sql-use-portal
<br> <br>
Adjusting the executor memory necessary for model fitting:

In [1]:
%%configure -f
{"executorMemory": "20G"}

### Data Loading

In [2]:
#Define active namenode 
path = 'hn1-sparky.4r0ycwlvbjpuxisejhaosb1dyb.ax.internal.cloudapp.net'

#Define csv file 
file = 'train_de.csv'

In [250]:
#Read data
df = spark.read.option("delimiter", "\t")\
          .option("header", "True")\
          .option("inferSchema", "True")\
          .csv('hdfs://{}/lukasb23_dir/{}'.format(path,file))

In [27]:
df.show(5)

+--------------------+--------------------+-----------+---------+--------------------+-------------------+------+
|              userid|             offerid|countrycode| category|            merchant|            utcdate|rating|
+--------------------+--------------------+-----------+---------+--------------------+-------------------+------+
|fa937b779184527f1...|c5f63750c2b5b0166...|         de|100020213|f3c93baa0cf443084...|2016-06-14 17:28:47|     0|
|f6c8958b9bc2d6033...|19754ec121b3a99ff...|         de|100020213|21a509189fb0875c3...|2016-06-14 17:28:48|     0|
|02fe7ccf1de19a387...|5ac4398e4d8ad4167...|         de|   125801|b042951fdb45ddef8...|2016-06-14 17:28:50|     0|
|9de5c06d0a16256b1...|be83df9772ec47fd2...|         de|   125801|4740b6c83b6e12e42...|2016-06-14 17:29:19|     0|
|8d26ade603ea5473c...|3735290a415dc236b...|         de|   125801|8bf8f87492a799528...|2016-06-14 17:29:31|     0|
+--------------------+--------------------+-----------+---------+--------------------+--

In [28]:
df.printSchema()

root
 |-- userid: string (nullable = true)
 |-- offerid: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- category: integer (nullable = true)
 |-- merchant: string (nullable = true)
 |-- utcdate: timestamp (nullable = true)
 |-- rating: integer (nullable = true)

In [31]:
df.head(1)

[Row(userid='fa937b779184527f12e2d71c711e6411236d1ab59f8597d7494af26d194f0979', offerid='c5f63750c2b5b0166e55511ee878b7a3', countrycode='de', category=100020213, merchant='f3c93baa0cf4430849611cedb3a40ec4094d1d370be8417181da5d13ac99ef3d', utcdate=datetime.datetime(2016, 6, 14, 17, 28, 47), rating=0)]

In [38]:
#Shape
print((df.count(), len(df.columns)))

(15844717, 7)

In [153]:
#Target variable
target = df.select('rating').groupBy('rating').count()
target.show()

total_yes = target.head(2)[0][1] 
total_no = target.head(2)[1][1] 

print('Target 1 zu 0: {}%'.format(round(total_yes/total_no,4)*100))

+------+--------+
|rating|   count|
+------+--------+
|     1|  705447|
|     0|15139270|
+------+--------+

Target 1 zu 0: 4.66%

In [34]:
df.select('countrycode').distinct().show()

+-----------+
|countrycode|
+-----------+
|         de|
+-----------+

### Data Cleaning

In [44]:
df.columns

['userid', 'offerid', 'countrycode', 'category', 'merchant', 'utcdate', 'rating']

In [43]:
#Check for missing values
for col in df.columns:    
    print(col, ': ', df.filter((df[col] == "") | df[col].isNull()).count())

userid :  0
offerid :  0
countrycode :  0
category :  0
merchant :  0
utcdate :  0
rating :  0

### Data Exploration

In [191]:
from pyspark.sql.functions import *

In [53]:
#Dates 
min_date, max_date = df.select(min("utcdate"), max("utcdate")).first()
min_date, max_date

(datetime.datetime(2016, 6, 1, 2, 0, 17), datetime.datetime(2016, 6, 14, 23, 52, 51))

In [66]:
#Unique Values
for col in df.columns[:4]:    
    print(col, ': ', df.select(col).distinct().count())

userid :  291485
offerid :  2158859
category :  271
merchant :  703

In [160]:
df_yes = df.filter(df['rating'] == 1)
df_no = df.filter(df['rating'] == 0)
df_yes.show(5)

+--------------------+--------------------+---------+--------------------+-------------------+------+-------+----+
|              userid|             offerid| category|            merchant|            utcdate|rating|weekday|hour|
+--------------------+--------------------+---------+--------------------+-------------------+------+-------+----+
|5bafdc0592dff7fa8...|b24526a20f4e4412d...|   138001|9f6c66333880924b1...|2016-06-02 17:50:48|     1|      5|  17|
|6eb2fe43a01f9daf1...|2bfd670e616b8f088...|100354123|ab8863ef55e574c00...|2016-06-02 17:51:20|     1|      5|  17|
|7c61831be00442150...|05aa287e0a53f6a5c...|100091613|418899436d9314d6a...|2016-06-02 17:53:50|     1|      5|  17|
|7735de9a62b5bdd43...|9674edeb73f6e51ad...|   142101|c26503aa822d9652c...|2016-06-02 17:54:47|     1|      5|  17|
|2014a976a7e6775f7...|9157b97ae21f55ed3...|   164401|b042951fdb45ddef8...|2016-06-02 17:55:45|     1|      5|  17|
+--------------------+--------------------+---------+--------------------+------

#### Categories

In [197]:
#Categories Grouping 
cats = df.groupBy("category").agg({'rating': 'mean', 'category': 'count'})

cats = cats.withColumn('count(positive)', (col('avg(rating)') * col('count(category)')).cast('int'))
cats = cats.sort("avg(rating)", ascending=False)
cats.show(5)

+---------+------------------+---------------+---------------+
| category|       avg(rating)|count(category)|count(positive)|
+---------+------------------+---------------+---------------+
|100020813|               1.0|             54|             54|
|   121201|0.9902912621359223|            103|            102|
|100345723|0.9705882352941176|            170|            165|
|   128101|               0.9|             20|             18|
|100333423|0.8773584905660378|            212|            186|
+---------+------------------+---------------+---------------+
only showing top 5 rows

In [213]:
cats.filter(cats['count(positive)'] > 10000).show()
print('Total Yes: ', total_yes)

#top 4 interesting, make up >30% of all clicks

+---------+--------------------+---------------+---------------+
| category|         avg(rating)|count(category)|count(positive)|
+---------+--------------------+---------------+---------------+
|100010713|  0.5184821498016645|         102856|          53329|
|   125801|  0.1227365557895503|         439706|          53968|
|100020213| 0.08024719256855496|         663774|          53266|
|100354123| 0.07432864699549051|         769044|          57162|
|   130401|0.050088623711211526|         200285|          10032|
|100434023|0.049709758354654014|         202073|          10045|
|   134101| 0.04908641060584009|         226743|          11130|
|100091613| 0.04691772227138838|         302828|          14208|
|   142101| 0.04081416620078929|         302297|          12337|
|   168001| 0.03791937619216083|         274187|          10397|
|100232023| 0.03331888597215748|         366849|          12222|
|   113501|0.028335950614609844|         461675|          13082|
|   108301| 0.02050122659

#### Merchants

In [204]:
#Merchants Grouping 
mercs = df.groupBy('merchant').agg({'rating': 'mean', 'merchant': 'count'})

mercs = mercs.withColumn('count(positive)', (col('avg(rating)') * col('count(merchant)')).cast('int'))
mercs = mercs.sort("avg(rating)", ascending=False)
mercs.show(5)

+--------------------+------------------+---------------+---------------+
|            merchant|       avg(rating)|count(merchant)|count(positive)|
+--------------------+------------------+---------------+---------------+
|245d2c7b8e6fc4de4...|               1.0|             54|             54|
|3e0c8ff0db6c0ba0a...|               1.0|             31|             31|
|36e2130a3c07037b1...|0.9523809523809523|             21|             20|
|ea0c486f7e2afef05...|0.9354838709677419|             31|             29|
|cde1bb72b28d4a887...|0.9230769230769231|             13|             12|
+--------------------+------------------+---------------+---------------+
only showing top 5 rows

In [215]:
mercs.filter(mercs['count(positive)'] > 10000).show()
print('Total Yes: ', total_yes)

#top three >21% of clicks

+--------------------+-------------------+---------------+---------------+
|            merchant|        avg(rating)|count(merchant)|count(positive)|
+--------------------+-------------------+---------------+---------------+
|66863da8db7e6c51b...| 0.6265489999207334|          75694|          47425|
|a7b2f269064dbe77e...|0.30314010305378086|         187669|          56890|
|ac26975cf46eae989...| 0.1618780124357797|         314737|          50949|
|70ea724342fb2d118...| 0.1512924051551128|          82714|          12514|
|eb49b22a1bbd88fbd...|0.07760938578329883|         181125|          14057|
|ab8863ef55e574c00...|0.06480676778336472|         579924|          37583|
|5878d16d0c0691283...| 0.0536400383381256|         239970|          12872|
|fca91704667a53350...|0.04780318920597915|         286529|          13697|
|8497a9dd86ab3b7f1...|0.04425939471683428|         277523|          12283|
|154f65f908a740682...| 0.0244791419290489|         484249|          11854|
+--------------------+---

**Does merchant no.1 sell cat no. 1?**

In [220]:
merch_id = mercs.filter(mercs['count(positive)'] > 10000).head(1)[0][0]

df.filter(df['merchant'] == merch_id).groupby('category').count().show()

+---------+-----+
| category|count|
+---------+-----+
|100010713|75694|
+---------+-----+

Yes, solely. 

**How about merchant No. 2?**

In [224]:
merch_id = mercs.filter(mercs['count(positive)'] > 10000).head(2)[1][0]

df.filter(df['merchant'] == merch_id).groupby('category').count().sort('count', ascending=False).show(10)

+---------+-----+
| category|count|
+---------+-----+
|100020213|68673|
|   125801|64063|
|   113501| 4770|
|   142101| 4712|
|   143101| 4562|
|   133301| 3953|
|   120901| 2963|
|     6513| 2770|
|100046613| 2180|
|100367723| 2136|
+---------+-----+
only showing top 10 rows

Heavily invoilved in cats 2 & 3.

#### Users

In [226]:
#Users Grouping
users = df.groupBy('userid').agg({'rating': 'mean', 'userid': 'count'})

users = users.withColumn('count(positive)', (col('avg(rating)') * col('count(userid)')).cast('int'))
users = users.sort("avg(rating)", ascending=False)
users.show(5)

+--------------------+-----------+-------------+---------------+
|              userid|avg(rating)|count(userid)|count(positive)|
+--------------------+-----------+-------------+---------------+
|b30b99b5627c4b0f6...|        1.0|            6|              6|
|9e3dfcddda9bfd607...|        1.0|            5|              5|
|bfc7f5a747eec2abd...|        1.0|            2|              2|
|b112ac315de5042e0...|        1.0|            7|              7|
|84c3e6783aee36656...|        1.0|            2|              2|
+--------------------+-----------+-------------+---------------+
only showing top 5 rows

In [235]:
users.filter(users['count(positive)'] > 1000).show(10)

+--------------------+-------------------+-------------+---------------+
|              userid|        avg(rating)|count(userid)|count(positive)|
+--------------------+-------------------+-------------+---------------+
|6e2ab4134ce6b24d1...|0.30091743119266057|         4360|           1312|
|7511572a7068fe6e7...|                0.3|         3550|           1065|
|cad7e4a68117616ba...| 0.2992623814541623|         3796|           1136|
|7625efac4a89c43c4...|0.29914529914529914|         6318|           1890|
|4ba6cb76318d7db81...|0.29873150105708246|         4730|           1413|
|cd5fc9305c30dfd50...|0.29870441458733205|         4168|           1245|
|f0fbac7eb4c2c0ded...|0.29798870853916726|         5668|           1689|
|314dc010def122b88...| 0.2978501045088086|         6698|           1995|
|c08bfca37471d9c79...| 0.2968835429196282|         3658|           1086|
|ce57395c3fe3f1037...|0.29661354581673305|         5020|           1489|
+--------------------+-------------------+---------

Got some power users here.

#### Offers

In [246]:
#Offers Grouping
offers = df.groupBy('offerid').agg({'rating': 'mean', 'offerid': 'count'})

offers = offers.withColumn('count(positive)', (col('avg(rating)') * col('count(offerid)')).cast('int'))
offers = offers.sort("avg(rating)", ascending=False)
offers.show(10)

+--------------------+-----------+--------------+---------------+
|             offerid|avg(rating)|count(offerid)|count(positive)|
+--------------------+-----------+--------------+---------------+
|3c0b61ef6ae1c918f...|        1.0|             1|              1|
|67b9ac169bbd6506f...|        1.0|             4|              4|
|9e6ed5473e4f27ded...|        1.0|             1|              1|
|436394f91445351b2...|        1.0|             1|              1|
|f02cb8e4667b90dfc...|        1.0|             2|              2|
|9cb582f6f1d01fab8...|        1.0|             1|              1|
|2e72138ae92fc046f...|        1.0|             1|              1|
|30a07c874c2dac337...|        1.0|             1|              1|
|990beb5468802a385...|        1.0|             4|              4|
|4d9c77966a04b0b14...|        1.0|             2|              2|
+--------------------+-----------+--------------+---------------+
only showing top 10 rows

In [248]:
offers.filter(offers['count(positive)'] > 100).show(10)

+--------------------+------------------+--------------+---------------+
|             offerid|       avg(rating)|count(offerid)|count(positive)|
+--------------------+------------------+--------------+---------------+
|a066572754a00f7a0...|0.9955156950672646|           223|            222|
|ae19aab375ac925bd...|0.9913793103448276|           232|            230|
|61ba4686f0a6b704a...|0.9911764705882353|           340|            337|
|c07db2553dc287e0d...|0.9902912621359223|           103|            102|
|e27570f3348a9fec4...|0.9883551673944687|           687|            679|
|5c2f1d277d6922bb7...|0.9870689655172413|           232|            229|
|ea268a1ab5ba20bdd...| 0.981203007518797|           266|            261|
|a1cb8243ccdbfda93...|0.9809523809523809|           105|            103|
|01c0d039ff17aa7f4...|0.9752066115702479|           121|            118|
|30a81c7e560b76f51...|0.9635036496350365|           137|            132|
+--------------------+------------------+----------

### A tiny little bit of feature engineering 

In [255]:
from pyspark.sql.functions import hour, dayofweek
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame

df = df.withColumn('weekday', dayofweek('utcdate'))\
       .withColumn('hour', hour('utcdate'))\
       .drop("countrycode")

+--------------------+--------------------+---------+--------------------+-------------------+------+-------+----+
|              userid|             offerid| category|            merchant|            utcdate|rating|weekday|hour|
+--------------------+--------------------+---------+--------------------+-------------------+------+-------+----+
|fa937b779184527f1...|c5f63750c2b5b0166...|100020213|f3c93baa0cf443084...|2016-06-14 17:28:47|     0|      3|  17|
|f6c8958b9bc2d6033...|19754ec121b3a99ff...|100020213|21a509189fb0875c3...|2016-06-14 17:28:48|     0|      3|  17|
|02fe7ccf1de19a387...|5ac4398e4d8ad4167...|   125801|b042951fdb45ddef8...|2016-06-14 17:28:50|     0|      3|  17|
|9de5c06d0a16256b1...|be83df9772ec47fd2...|   125801|4740b6c83b6e12e42...|2016-06-14 17:29:19|     0|      3|  17|
|8d26ade603ea5473c...|3735290a415dc236b...|   125801|8bf8f87492a799528...|2016-06-14 17:29:31|     0|      3|  17|
+--------------------+--------------------+---------+--------------------+------

In [262]:
#Weekdays Grouping 
weekdays = df.groupBy('weekday').agg({'rating': 'mean', 'weekday': 'count'})

weekdays = weekdays.withColumn('count(positive)', (col('avg(rating)') * col('count(weekday)')).cast('int'))
weekdays = weekdays.sort("avg(rating)", ascending=False)
weekdays.show()

+-------+--------------------+--------------+---------------+
|weekday|         avg(rating)|count(weekday)|count(positive)|
+-------+--------------------+--------------+---------------+
|      3| 0.10241874599458782|       2103404|         215428|
|      4|0.041626950360412696|       1955536|          81403|
|      2|0.039021738813080464|       2055034|          80191|
|      6| 0.03882920266295414|       1923578|          74691|
|      5| 0.03401875987435719|       2403321|          81758|
|      1| 0.03237889335234592|       2991702|          96868|
|      7|0.031137470347931424|       2412142|          75108|
+-------+--------------------+--------------+---------------+

In [211]:
#Hour Grouping 
hours = df.groupBy('hour').agg({'rating': 'mean', 'hour': 'count'})

hours = hours.withColumn('count(positive)', (col('avg(rating)') * col('count(hour)')).cast('int'))
hours = hours.sort("avg(rating)", ascending=False)
hours.show(24)

+----+-----------+--------------------+---------------+
|hour|count(hour)|         avg(rating)|count(positive)|
+----+-----------+--------------------+---------------+
|   0|      27285|  0.3720725673446949|          10152|
|  20|     124057|  0.3546514908469494|          43997|
|   1|      19329| 0.32019245693000153|           6189|
|  21|     140220| 0.31691627442590214|          44438|
|  22|     128301| 0.30285812269584805|          38857|
|  23|     139157|  0.2634003319991089|          36654|
|  19|     264503| 0.16856519585789198|          44586|
|  18|     581059| 0.08287970756842249|          48158|
|  17|    1275499| 0.04213723413346463|          53746|
|   7|     499146| 0.03726164288604937|          18599|
|  16|    1558179| 0.03268302293895631|          50926|
|  10|    1348360| 0.03207600344121748|          43250|
|   8|     771206|0.031104529788409323|          23988|
|   9|    1022504|0.029534358789794466|          30199|
|  11|    1330400| 0.02866882140709561|         

### Model Buidling from Start

In [21]:
from pyspark.ml import Pipeline, Transformer
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler
from pyspark.sql import DataFrame
from pyspark.sql.functions import hour, dayofweek

In [3]:
#Define active namenode 
path = 'hn1-sparky.4r0ycwlvbjpuxisejhaosb1dyb.ax.internal.cloudapp.net'

#Define csv file 
file = 'train_de.csv'

In [4]:
#Read data raw
df = spark.read.option("delimiter", "\t")\
          .option("header", "True")\
          .option("inferSchema", "True")\
          .csv('hdfs://{}/lukasb23_dir/{}'.format(path,file))

In [83]:
# Custom Transformer
class CustomTransformer(Transformer):

    def __init__(self):
        super(CustomTransformer, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        
        df = df.withColumn('weekday', dayofweek('utcdate'))\
               .withColumn('hour', hour('utcdate'))\
               .drop("countrycode")
        return df
    
#String Indexer 
indexer_cat = StringIndexer(inputCol="category", outputCol="category_index")
#indexer_user = StringIndexer(inputCol="userid", outputCol="userid_index") 
indexer_merc = StringIndexer(inputCol="merchant", outputCol="merchant_index")

#One Hot Encoder
vectors = ["category_vec", "merchant_vec"]
encoder = OneHotEncoderEstimator(
    inputCols=["category_index", "merchant_index"],  
    outputCols=vectors
)

#Feature Assembler
vectorizer = VectorAssembler(inputCols=vectors+['weekday', 'hour'], outputCol='features')

#Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

#Modelling 
lr = LogisticRegression(featuresCol = 'scaledFeatures', labelCol = 'rating', maxIter=10)

Feature Userid would result in +291.000 columns on 15m features; kicked for reasons of feasibility.

In [87]:
#Modelling via Pipeline
pipeline = Pipeline(stages=[indexer_cat, indexer_merc, encoder, 
                         CustomTransformer(), vectorizer, scaler, lr])

model = pipeline.fit(df)

Training time: approx. 30secs. <br>
YARN Memory > 50% used.

In [88]:
trainingSummary = model.stages[-1].summary

trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

+--------------------+-------------------+
|                 FPR|                TPR|
+--------------------+-------------------+
|                 0.0|                0.0|
|0.002153736606850925|0.14141246613849093|
|0.006552825862805...| 0.1973684770081948|
|0.009952065059940143|0.23335558872601345|
| 0.01351405979284338| 0.2465160387669095|
| 0.01695947030471086|0.25820649885817076|
| 0.02018327171653587|0.27248822377868215|
| 0.02094559381000537|0.27578825907545146|
|0.023791107497257134|0.29007140153689787|
| 0.02712151906928141|0.30493006561796987|
|  0.0308175361163385|  0.318758177439269|
| 0.03403057082673075|0.33181939961471235|
| 0.03839762419191942|0.34629674518425907|
|0.043001941308927046|0.35940191112868863|
| 0.04885473341845412| 0.3722462495410711|
|0.054973786714947286| 0.3847815640296153|
| 0.05512438842823993|0.38511752123121934|
| 0.06098907014671117| 0.3976216498191927|
| 0.06752194788784399|0.40964523203018793|
| 0.07589441234616993| 0.4224626371648047|
+----------

#### Test Data

In [94]:
#Read data raw
df_test = spark.read.option("delimiter", "\t")\
          .option("header", "True")\
          .option("inferSchema", "True")\
          .csv('hdfs://{}/lukasb23_dir/test_de.csv'.format(path,file))

In [95]:
predictions = model.transform(df_test)

In [99]:
predictions

DataFrame[userid: string, offerid: string, category: int, merchant: string, utcdate: timestamp, rating: int, category_index: double, merchant_index: double, category_vec: vector, merchant_vec: vector, weekday: int, hour: int, features: vector, scaledFeatures: vector, rawPrediction: vector, probability: vector, prediction: double]

Example like in https://spark.apache.org/docs/2.3.0/ml-pipeline.html. 
However, access to predictions is error-prone...

In [100]:
predictions.show(1)

An error occurred while calling o5386.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 326.0 failed 4 times, most recent failure: Lost task 0.3 in stage 326.0 (TID 2858, wn5-sparky.4r0ycwlvbjpuxisejhaosb1dyb.ax.internal.cloudapp.net, executor 3): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$9: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
