In [6]:
# import the goodies
import numpy as np
import pandas as pd
import os
import csv
import datetime
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,RandomForestClassificationModel

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import NGram
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr



In [8]:
findspark.init('/home/polya/Downloads/spark-2.2.0-bin-hadoop2.7')
s =  SparkSession.builder.getOrCreate()

In [9]:
sessionWindow = W.partitionBy(F.col('household_id')).orderBy(F.col('session_start').asc())

In [10]:
sessions = (s.read.format("com.databricks.spark.csv").option("inferschema","true").option("header","true").option("delimiter", "\t")
    .load('/home/polya/Downloads/code/kaggle/libertyglobal/data/viewership_extract.csv')
    #.filter("dob != '1900-01-01 00:00:00'")
    .withColumn('day_of_month', F.dayofmonth('session_start'))
    .withColumn('day_of_week',  F.date_format('session_start', 'EEEE'))
    .withColumn('hour', F.hour('session_start'))
    .withColumn('minute', F.minute('session_start'))
    .withColumn('session_duration',F.unix_timestamp('session_end') - F.unix_timestamp('session_start'))
    .withColumn('overlapped',F.when((F.unix_timestamp('session_start') <=  F.unix_timestamp('original_broadcast_start')) & (F.unix_timestamp('session_end') >=  F.unix_timestamp('original_broadcast_start')),1)
            .otherwise(0))
    .withColumn('age', F.year('session_start') - F.year('dob'))
    .withColumn('age_band', F.when(F.col('age') < 25, 'less_than_25').when(((F.col('age') >= 25) & (F.col('age') < 35)),'25to34')
                .when(((F.col('age') >= 35) & (F.col('age') < 45)),'35to44')
                .when(((F.col('age') >= 45) & (F.col('age') < 55)),'45to54')
                .when(((F.col('age') >= 55) & (F.col('age') < 65)),'55to64')
                .otherwise('greater_than_65') )        
            
    .withColumn('hit_nr',F.row_number().over(sessionWindow)))      
           

In [11]:
sessions.count()

3863142

In [13]:
sessions_grouping =  sessions.groupBy(['household_id'])\
                    .agg(F.max('hit_nr').alias('max_hit_nr'),
                    F.round(F.avg('session_duration'),1).alias('avg_session_duration'),
                    F.max('session_duration').alias('max_session_duration'),
                    F.min('session_duration').alias('min_session_duration'),
                    F.round(F.stddev('session_duration'),2).alias('sd_session_duration'),
                    F.round(F.skewness('session_duration'),2).alias('skewnews_ssession_duration'),
                    F.sum('session_duration').alias('sum_session_duration'),
                    F.sum('overlapped').alias('sum_overlaps'),
                    F.round(F.avg('overlapped'),2).alias('avg_overlaps'),
                    F.round(F.avg('playback_speed'),2).alias('avg_playback_speed'),
                    F.first('age_band').alias('age_band'),
                    F.first('gender').alias('gender')     
                    )






In [14]:
sessions_grouping.show()

+------------+----------+--------------------+--------------------+--------------------+-------------------+--------------------------+--------------------+------------+------------+------------------+---------------+--------------+
|household_id|max_hit_nr|avg_session_duration|max_session_duration|min_session_duration|sd_session_duration|skewnews_ssession_duration|sum_session_duration|sum_overlaps|avg_overlaps|avg_playback_speed|       age_band|        gender|
+------------+----------+--------------------+--------------------+--------------------+-------------------+--------------------------+--------------------+------------+------------+------------------+---------------+--------------+
|   432214580|        51|               914.9|                3900|                   0|            1301.22|                      1.19|               46659|          46|         0.9|            980.39|greater_than_65|Please Specify|
|   432215859|       644|               863.0|                9593| 

In [15]:
channels_per_household = (sessions
                          .select('household_id', 'channel_name')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','channel_name'])
                         .agg(F.count('channel_name').alias('channel_counts'))
                         .orderBy(['household_id','channel_counts'],ascending=[0,0]))

In [16]:
channels_per_household.show(10)

+------------+--------------+--------------+
|household_id|  channel_name|channel_counts|
+------------+--------------+--------------+
|   432229515|          Surf|           240|
|   432229515|       RTE2 HD|           177|
|   432229515|     Channel 4|           171|
|   432229515|       BBC1 HD|           144|
|   432229515|         BBC 2|           108|
|   432229515|        FOX HD|            99|
|   432229515|            E4|            66|
|   432229515|    RTE One HD|            50|
|   432229515|            E!|            45|
|   432229515|UTV Ireland HD|            44|
+------------+--------------+--------------+
only showing top 10 rows



In [17]:
#Top 5 channels by time spend for each family
from pyspark.sql.window import Window
from pyspark.sql.functions import concat_ws, lit


windowSpec = Window.partitionBy(channels_per_household['household_id']).orderBy(channels_per_household['channel_counts'].desc())
converted_channels = (channels_per_household
    .select('household_id', 'channel_name', 'channel_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 5)
    .orderBy('household_id', 'dense_rank'))

In [18]:
converted_channels_appended = converted_channels.withColumn("dummy", lit("channel"))\
                              .withColumn("channel_rank", concat_ws("-","dummy","dense_rank"))
                                 


In [19]:
converted_channels_appended.show(5)

+------------+--------------+--------------+----------+-------+------------+
|household_id|  channel_name|channel_counts|dense_rank|  dummy|channel_rank|
+------------+--------------+--------------+----------+-------+------------+
|   432214532|    RTE One HD|           304|         1|channel|   channel-1|
|   432214532|          Surf|            59|         2|channel|   channel-2|
|   432214532|            3e|            37|         3|channel|   channel-3|
|   432214532|        TV3 HD|            35|         4|channel|   channel-4|
|   432214532|UTV Ireland HD|            32|         5|channel|   channel-5|
+------------+--------------+--------------+----------+-------+------------+
only showing top 5 rows



In [20]:
converted_channels.where(F.col('household_id') == 432219578).show()

+------------+--------------+--------------+----------+
|household_id|  channel_name|channel_counts|dense_rank|
+------------+--------------+--------------+----------+
|   432219578|            E!|            67|         1|
|   432219578|          ITV2|            23|         2|
|   432219578|           TLC|            20|         3|
|   432219578|UTV Ireland HD|            19|         4|
|   432219578|       RTE2 HD|            16|         5|
+------------+--------------+--------------+----------+



In [21]:

channels_pivot = converted_channels_appended.groupBy("household_id")\
                           .pivot("channel_rank")\
                           .agg(expr("coalesce(first(channel_name),5)"))
                           
        
       

In [22]:
channels_pivot.show(5)

+------------+-----------+--------------+----------+---------+----------+
|household_id|  channel-1|     channel-2| channel-3|channel-4| channel-5|
+------------+-----------+--------------+----------+---------+----------+
|   432214580|      BBC 2|  Nat Geo Wild|RTE One HD|     Surf|   BBC1 HD|
|   432215859|       Surf|    RTE One HD|    TV3 HD|    BBC 2|    TG4 HD|
|   432216992|       Dave|         BBC 2|        E4| BBC FOUR|BBC TWO HD|
|   432217982|       Surf|UTV Ireland HD| Channel 4|  BBC1 HD|    TV3 HD|
|   432218817|Nick Junior|          Surf|    TV3 HD|  RTE2 HD|RTE One HD|
+------------+-----------+--------------+----------+---------+----------+
only showing top 5 rows



In [23]:
genre_per_household = (sessions
                          .select('household_id', 'genre')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','genre'])
                         .agg(F.count('genre').alias('genre_counts'))
                         .orderBy(['household_id','genre_counts'],ascending=[0,0]))



In [24]:
windowSpec = Window.partitionBy(genre_per_household['household_id']).orderBy(genre_per_household['genre_counts'].desc())
converted_genre = (genre_per_household
    .select('household_id', 'genre', 'genre_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 5)
    .orderBy('household_id', 'dense_rank'))

In [25]:
converted_genre_appended = converted_genre.withColumn("dummy", lit("genre"))\
                              .withColumn("genre_rank", concat_ws("-","dummy","dense_rank"))
  

In [26]:

genre_pivot = converted_genre_appended.groupBy("household_id")\
                           .pivot("genre_rank")\
                           .agg(expr("coalesce(first(genre),5)"))

In [27]:
genre_pivot.show(5)

+------------+--------------+--------------+-------+----------------+------------+
|household_id|       genre-1|       genre-2|genre-3|         genre-4|     genre-5|
+------------+--------------+--------------+-------+----------------+------------+
|   432214580|          News|  Unclassified|Leisure|Social/Political|       Sport|
|   432215859|  Unclassified|          News|  Drama|  Show/Game Show|       Sport|
|   432216992|Show/Game Show|         Drama|   News|         Leisure|Unclassified|
|   432217982|  Unclassified|Show/Game Show|   News|           Drama|  Kids/Youth|
|   432218817|  Unclassified|    Kids/Youth|  Sport|           Movie|       Drama|
+------------+--------------+--------------+-------+----------------+------------+
only showing top 5 rows



In [28]:
sub_genre_per_household = (sessions
                          .select('household_id', 'sub_genre')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','sub_genre'])
                         .agg(F.count('sub_genre').alias('sub_genre_counts'))
                         .orderBy(['household_id','sub_genre_counts'],ascending=[0,0]))

windowSpec = Window.partitionBy(sub_genre_per_household['household_id']).orderBy(sub_genre_per_household['sub_genre_counts'].desc())
converted_sub_genre = (sub_genre_per_household
    .select('household_id', 'sub_genre', 'sub_genre_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 5)
    .orderBy('household_id', 'dense_rank'))
converted_sub_genre_appended = converted_sub_genre.withColumn("dummy", lit("sub_genre"))\
                              .withColumn("sub_genre_rank", concat_ws("-","dummy","dense_rank"))

sub_genre_pivot = converted_sub_genre_appended.groupBy("household_id")\
                           .pivot("sub_genre_rank")\
                           .agg(expr("coalesce(first(sub_genre),5)"))
sub_genre_pivot.show(5)        
        

+------------+------------+-------------+-----------+------------+-------------+
|household_id| sub_genre-1|  sub_genre-2|sub_genre-3| sub_genre-4|  sub_genre-5|
+------------+------------+-------------+-----------+------------+-------------+
|   432214580|      Nature|    Undefined|Documentary|Unclassified|    Melodrama|
|   432215859|Unclassified|    Undefined|Documentary|Variety Show|    Talk Show|
|   432216992|   Undefined|       Comedy|Documentary|    Motoring| Unclassified|
|   432217982|Unclassified|    Undefined|Documentary|Variety Show|For ages 6-14|
|   432218817|Unclassified|For ages 6-14|   Football|   Undefined|       Comedy|
+------------+------------+-------------+-----------+------------+-------------+
only showing top 5 rows



In [29]:
hour_per_household = (sessions
                          .select('household_id', 'hour')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','hour'])
                         .agg(F.count('hour').alias('hour_counts'))
                         .orderBy(['household_id','hour_counts'],ascending=[0,0]))

windowSpec = Window.partitionBy(hour_per_household['household_id']).orderBy(hour_per_household['hour_counts'].desc())
converted_hour = (hour_per_household
    .select('household_id', 'hour', 'hour_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 5)
    .orderBy('household_id', 'dense_rank'))
converted_hour_appended = converted_hour.withColumn("dummy", lit("hour"))\
                              .withColumn("hour_rank", concat_ws("-","dummy","dense_rank"))

hour_pivot = converted_hour_appended.groupBy("household_id")\
                           .pivot("hour_rank")\
                           .agg(expr("coalesce(first(hour),5)"))
hour_pivot.show(5)        
        

+------------+------+------+------+------+------+
|household_id|hour-1|hour-2|hour-3|hour-4|hour-5|
+------------+------+------+------+------+------+
|   432220484|    21|    22|    20|    19|    17|
|   432225320|    21|    22|    20|    18|    19|
|   432219237|    22|    19|    16|    23|    20|
|   432219578|    21|    20|    22|    19|    17|
|   432217982|    21|    22|    23|    19|     0|
+------------+------+------+------+------+------+
only showing top 5 rows



In [30]:
dow_per_household = (sessions
                          .select('household_id', 'day_of_week')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','day_of_week'])
                         .agg(F.count('day_of_week').alias('day_of_week_counts'))
                         .orderBy(['household_id','day_of_week_counts'],ascending=[0,0]))

windowSpec = Window.partitionBy(dow_per_household['household_id']).orderBy(dow_per_household['day_of_week_counts'].desc())
converted_dow = (dow_per_household
    .select('household_id', 'day_of_week', 'day_of_week_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 5)
    .orderBy('household_id', 'dense_rank'))

converted_dow_appended = converted_dow.withColumn("dummy", lit("dow"))\
                              .withColumn("dow_rank", concat_ws("-","dummy","dense_rank"))

dow_pivot = converted_dow_appended.groupBy("household_id")\
                           .pivot("dow_rank")\
                           .agg(expr("coalesce(first(day_of_week),5)"))
dow_pivot.show(5)        
        

+------------+---------+--------+--------+--------+---------+
|household_id|    dow-1|   dow-2|   dow-3|   dow-4|    dow-5|
+------------+---------+--------+--------+--------+---------+
|   432214580|Wednesday|Thursday|  Friday| Tuesday| Saturday|
|   432215859|   Sunday|Saturday| Tuesday|Thursday|   Friday|
|   432216992|   Sunday| Tuesday|Saturday|  Friday|   Monday|
|   432217982|   Sunday|Saturday|Thursday|  Friday|   Monday|
|   432218817|   Sunday|  Monday|  Friday|Saturday|Wednesday|
+------------+---------+--------+--------+--------+---------+
only showing top 5 rows



In [31]:
st_per_household = (sessions
                          .select('household_id', 'session_type')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','session_type'])
                         .agg(F.count('session_type').alias('session_type_counts'))
                         .orderBy(['household_id','session_type_counts'],ascending=[0,0]))

windowSpec = Window.partitionBy(st_per_household['household_id']).orderBy(st_per_household['session_type_counts'].desc())
converted_st = (st_per_household
    .select('household_id', 'session_type', 'session_type_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 2)
    .orderBy('household_id', 'dense_rank'))

converted_st_appended = converted_st.withColumn("dummy", lit("st"))\
                              .withColumn("st_rank", concat_ws("-","dummy","dense_rank"))

st_pivot = converted_st_appended.groupBy("household_id")\
                           .pivot("st_rank")\
                           .agg(expr("coalesce(first(session_type),5)"))
st_pivot.show(5)   

+------------+---------+---------+
|household_id|     st-1|     st-2|
+------------+---------+---------+
|   432214580|     LIVE|TIMESHIFT|
|   432215859|     LIVE|TIMESHIFT|
|   432216992|TIMESHIFT|     LIVE|
|   432217982|     LIVE|TIMESHIFT|
|   432218817|     LIVE|TIMESHIFT|
+------------+---------+---------+
only showing top 5 rows



In [32]:
sst_per_household = (sessions
                          .select('household_id', 'session_sub_type')
                          #.where(~ F.col("channel_name").like("%unknown%"))
                         .groupBy(['household_id','session_sub_type'])
                         .agg(F.count('session_sub_type').alias('session_sub_type_counts'))
                         .orderBy(['household_id','session_sub_type_counts'],ascending=[0,0]))

windowSpec = Window.partitionBy(sst_per_household['household_id']).orderBy(sst_per_household['session_sub_type_counts'].desc())
converted_sst = (sst_per_household
    .select('household_id', 'session_sub_type', 'session_sub_type_counts', F.dense_rank().over(windowSpec).alias('dense_rank'))
    .filter(F.col('dense_rank') <= 3)
    .orderBy('household_id', 'dense_rank'))

converted_sst_appended = converted_sst.withColumn("dummy", lit("sst"))\
                              .withColumn("sst_rank", concat_ws("-","dummy","dense_rank"))

sst_pivot = converted_sst_appended.groupBy("household_id")\
                           .pivot("sst_rank")\
                           .agg(expr("coalesce(first(session_sub_type),5)"))
sst_pivot.show(5) 

+------------+-------------------+-------------------+----------------+
|household_id|              sst-1|              sst-2|           sst-3|
+------------+-------------------+-------------------+----------------+
|   432214580|               LIVE|             BUFFER|               5|
|   432215859|               LIVE|             BUFFER|               5|
|   432216992|SERIES LINK BOOKING|             BUFFER|SINGLE RECORDING|
|   432217982|               LIVE|SERIES LINK BOOKING|          BUFFER|
|   432218817|               LIVE|             BUFFER|SINGLE RECORDING|
+------------+-------------------+-------------------+----------------+
only showing top 5 rows



In [33]:
sessions_final = sessions_grouping.join(channels_pivot, sessions_grouping.household_id == channels_pivot.household_id,'inner' ).drop(sessions_grouping.household_id)\
.join(genre_pivot, 'household_id','inner')\
.drop(channels_pivot.household_id)\
.join(sub_genre_pivot, 'household_id','inner')\
.drop(genre_pivot.household_id)\
.join(hour_pivot, 'household_id','inner')\
.drop(sub_genre_pivot.household_id)\
.join(dow_pivot, 'household_id','inner')\
.drop(hour_pivot.household_id)\
.join(st_pivot, 'household_id','inner')\
.drop(dow_pivot.household_id)\
.join(sst_pivot, 'household_id','inner')\
.drop(st_pivot.household_id)


In [34]:
sessions_final.show()

+------------+----------+--------------------+--------------------+--------------------+-------------------+--------------------------+--------------------+------------+------------+------------------+---------------+--------------+---------------+------------------+------------------+--------------+---------------+--------------+--------------+--------------+----------------+--------------+-------------+-------------+------------+------------+-------------+------+------+------+------+------+---------+---------+--------+---------+---------+---------+---------+-------------------+-------------------+----------------+
|household_id|max_hit_nr|avg_session_duration|max_session_duration|min_session_duration|sd_session_duration|skewnews_ssession_duration|sum_session_duration|sum_overlaps|avg_overlaps|avg_playback_speed|       age_band|        gender|      channel-1|         channel-2|         channel-3|     channel-4|      channel-5|       genre-1|       genre-2|       genre-3|         genre

In [35]:
sessions_final.select('age_band').distinct().show()

+---------------+
|       age_band|
+---------------+
|greater_than_65|
|   less_than_25|
|         45to54|
|         55to64|
|         25to34|
|         35to44|
+---------------+



In [36]:
sessions_final.coalesce(1).write.option("header", "true").csv("sample_file.csv")
