In [2]:
import os
import numpy as np

In [3]:
import pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [14]:
%run configuration.ipynb

## build the spark context

In [4]:
sc = SparkContext("local", "churn")
sqlContext = SparkSession.builder.getOrCreate()

22/09/12 05:34:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
sql = lambda sql_commend: sqlContext.sql(sql_commend).show()

In [6]:
sql_to_df = lambda sql_commend: sqlContext.sql(sql_commend)

In [6]:
sqlContext

see the spark jobs at 

http://localhost:4040/jobs/

## load the data

### profile

The profiles dataset contains user profiles with following columns:

player_id (integer) - unique identifier of the player

registration_date (yyyy-MM-dd) - date when the player 1st played the game

country code (integer) - country of the user

operating system (integer) - operating system of the user

device type (integer) - type of device used by the player

In [6]:
profile_schema = StructType([
    StructField('player_id', IntegerType(), True),
    StructField('registration_date', DateType(), True),
    StructField('country_code', IntegerType(), True),
    StructField('operating_system', IntegerType(), False),
    StructField('device_type', IntegerType(), True),
])

In [7]:
profiles = sqlContext.read\
    .option("header", False)\
    .schema(profile_schema)\
    .csv('data/profiles')

In [8]:
profiles.createOrReplaceTempView('profiles')

In [15]:
sql(u"""
select * from profiles limit 3;
""")

+---------+-----------------+------------+----------------+-----------+
|player_id|registration_date|country_code|operating_system|device_type|
+---------+-----------------+------------+----------------+-----------+
|  1442781|       2017-09-02|           0|               0|       1583|
|  6438953|       2017-10-18|           8|               0|       1230|
|  2155741|       2017-09-03|          32|               0|       1826|
+---------+-----------------+------------+----------------+-----------+



## activity

The activity contains the information on players' daily visits in the game. E.g. if player with ID 123 plays the game at least once on 2018-09-02 then there is a row with those values in the data set. Complete schema of activity dataset contains columns:

event_date (yyyy-MM-dd)

player_id (integer) - unique identifier of the player

money_spent (float) - Total money spent during the day

session_count (integer) - Number of game sessions for the day

purchase_count (integer) - Number of purchases during the day

time_spent_seconds (integer) - Total time spent playing during the day

ads_impressions (integer) - Total number of seen ads during the day

ads_clicks (integer) - Total number of clicked ads during the day

In [9]:
activity_schema = StructType([
    StructField('event_date', DateType(), True),
    StructField('player_id', IntegerType(), True),
    StructField('money_spent', FloatType(), True),
    StructField('session_count', IntegerType(), True),
    StructField('purchase_count', IntegerType(), False),
    StructField('time_spent_seconds', IntegerType(), True),
    StructField('ads_impressions', IntegerType(), True),
    StructField('ads_clicks', IntegerType(), True),
])

In [10]:
activity = sqlContext.read\
    .option("header", False)\
    .schema(activity_schema)\
    .csv('data/activity')

activity.write.mode('Overwrite').parquet('data/activity.parquet')
activity = sqlContext.read.parquet('data/activity.parquet')

In [11]:
activity.createOrReplaceTempView('activity')

In [50]:
sql(u"""
select * from activity
order by event_date desc
limit 3;
""")



+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+
|event_date|player_id|money_spent|session_count|purchase_count|time_spent_seconds|ads_impressions|ads_clicks|
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+
|2017-11-01|  6928404|        0.0|            1|             0|               927|              2|         1|
|2017-11-01|  4168636|        0.0|            1|             0|              2710|              0|         0|
|2017-11-01|  3950188|        0.0|            1|             0|               786|              0|         0|
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+



                                                                                

the last event date is from 2017. no worries about the current date is still less than 7 days from the registration

## calculate the label

Implement a churn (3) prediction model, and report its accuracy using the metric you think is the most fitting one	

3. Predict which players are not seen after 7th day from the registration


In [17]:
sql(u"""
desc activity
""")

+------------------+---------+-------+
|          col_name|data_type|comment|
+------------------+---------+-------+
|        event_date|     date|   null|
|         player_id|      int|   null|
|       money_spent|    float|   null|
|     session_count|      int|   null|
|    purchase_count|      int|   null|
|time_spent_seconds|      int|   null|
|   ads_impressions|      int|   null|
|        ads_clicks|      int|   null|
+------------------+---------+-------+



### find the last activity date

In [39]:
sql_to_df(u"""
    select player_id, 
    max(event_date) as last_event_date,
    min(event_date) as first_event_date
    from activity
    group by player_id
    """).write.mode('Overwrite').parquet('/data/player_last_event_date.parquet')

                                                                                

In [40]:
sqlContext.read.parquet('/data/player_last_event_date.parquet').createOrReplaceTempView('player_last_event_date')

In [43]:
sql(u"""
select *,
datediff(last_event_date, first_event_date) activity_duration
from player_last_event_date limit 3;
""")

+---------+---------------+----------------+-----------------+
|player_id|last_event_date|first_event_date|activity_duration|
+---------+---------------+----------------+-----------------+
|  1583070|     2017-09-24|      2017-09-02|               22|
|  6253908|     2017-11-01|      2017-10-12|               20|
|  2398918|     2017-10-08|      2017-09-04|               34|
+---------+---------------+----------------+-----------------+



### check if the last event date is more than 7 days later than the registration date

In [44]:
sql(u"""
desc profiles
""")

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|        player_id|      int|   null|
|registration_date|     date|   null|
|     country_code|      int|   null|
| operating_system|      int|   null|
|      device_type|      int|   null|
+-----------------+---------+-------+



In [73]:
sql_to_df(u"""
    select *, 
    case 
        when active_days <= 7 then 1
        else 0
    end as label
    from (
        select p.*, 
        l.last_event_date,
        datediff(l.last_event_date, p.registration_date)+1 as active_days
        from profiles as p
        left join player_last_event_date as l
        on l.player_id = p.player_id
        )
    """).write.mode('Overwrite').parquet('/data/player_label.parquet')

22/09/11 20:57:45 WARN CacheManager: Asked to cache already cached data.
                                                                                

In [74]:
sqlContext.read.parquet('/data/player_label.parquet').createOrReplaceTempView('player_label')

In [79]:
sql(u"""
select player_id, 
registration_date,
last_event_date,
active_days,
label
from player_label
limit 5
""")

+---------+-----------------+---------------+-----------+-----+
|player_id|registration_date|last_event_date|active_days|label|
+---------+-----------------+---------------+-----------+-----+
|   812220|       2017-09-01|     2017-09-02|          2|    1|
|   812226|       2017-09-01|     2017-09-12|         12|    0|
|   812504|       2017-09-01|     2017-09-01|          1|    1|
|   812544|       2017-09-01|     2017-09-18|         18|    0|
|   812734|       2017-09-01|     2017-09-22|         22|    0|
+---------+-----------------+---------------+-----------+-----+



In [77]:
sql(u"""
select label, count(*)
from player_label
group by label
""")

                                                                                

+-----+--------+
|label|count(1)|
+-----+--------+
|    1| 3667088|
|    0| 2452263|
+-----+--------+



## feature generation

### filter the event date to limit them within 7 days from the registration

In [80]:
sql_to_df(u"""
    select a.*, p.registration_date
    from activity as a
    join profiles as p
    on p.player_id = a.player_id    
    where datediff(
        a.event_date,
        p.registration_date
        ) < 7
    """).write.mode('Overwrite').parquet('/data/activity_within_7_days.parquet')

22/09/11 21:00:17 WARN CacheManager: Asked to cache already cached data.
                                                                                

In [81]:
sqlContext.read.parquet('/data/activity_within_7_days.parquet').createOrReplaceTempView('activity_within_7_days')

#### check the correctness

In [82]:
sql(u"""
select *
from activity_within_7_days
where player_id = 812226
order by event_date asc
""")



+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+-----------------+
|event_date|player_id|money_spent|session_count|purchase_count|time_spent_seconds|ads_impressions|ads_clicks|registration_date|
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+-----------------+
|2017-09-01|   812226|        0.0|            1|             0|                63|              0|         0|       2017-09-01|
|2017-09-02|   812226|        0.0|            5|             0|               350|              0|         0|       2017-09-01|
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+-----------------+





In [83]:
sql(u"""
select *
from activity
where player_id = 812226
order by event_date asc
""")



+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+
|event_date|player_id|money_spent|session_count|purchase_count|time_spent_seconds|ads_impressions|ads_clicks|
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+
|2017-09-01|   812226|        0.0|            1|             0|                63|              0|         0|
|2017-09-02|   812226|        0.0|            5|             0|               350|              0|         0|
|2017-09-08|   812226|        0.0|            2|             0|               612|              0|         0|
|2017-09-09|   812226|        0.0|            2|             0|              3095|              3|         0|
|2017-09-10|   812226|        0.0|            1|             0|               960|              0|         0|
|2017-09-12|   812226|        0.0|            1|             0|               351|              0|         0|
+---------

                                                                                

### generate the sequence of activities for players

In [104]:
sql_to_df(u"""
select player_id,
collect_list(
    struct(
        event_date,
        money_spent,
        session_count,
        purchase_count,
        time_spent_seconds,
        ads_impressions,
        ads_clicks,
        datediff(event_date, registration_date) as days_from_registration,
        dayofweek(event_date) as event_day_of_week
    )
) as event_sequence
from activity_within_7_days
group by player_id
""").write.mode('Overwrite').parquet('/data/player_event_sequence.parquet')

                                                                                

In [105]:
sqlContext.read.parquet('/data/player_event_sequence.parquet').createOrReplaceTempView('player_event_sequence')

In [116]:
sql(u"""
select *
from activity_within_7_days
where money_spent is null
""")



+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+-----------------+
|event_date|player_id|money_spent|session_count|purchase_count|time_spent_seconds|ads_impressions|ads_clicks|registration_date|
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+-----------------+
+----------+---------+-----------+-------------+--------------+------------------+---------------+----------+-----------------+





### sort the sequence according to the event date

In [106]:
sql_to_df(u"""
select player_id, 
array_sort(event_sequence,  
    (left, right) -> case when left.event_date < right.event_date then -1 else 1 end
) as event_sequence
from player_event_sequence
""").write.mode('Overwrite').parquet('/data/player_event_sequence_sorted.parquet')

                                                                                

In [107]:
sqlContext.read.parquet('/data/player_event_sequence_sorted.parquet').createOrReplaceTempView('player_event_sequence_sorted')

In [110]:
sql_to_df(u"""
select *
from player_event_sequence_sorted
limit 10;
""").show(truncate = False)



+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|player_id|event_sequence                                                                                                                                                                                                        |
+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|812293   |[[2017-09-01, 0.0, 1, 0, 137, 0, 0, 0, 6]]                                                                                                                                                                            |
|812413   |[[2017-09-01, 0.0, 1, 0, 34, 0, 0, 0, 6], [2017-09-02, 0.0, 2, 0, 37, 3, 1, 1, 7]

                                                                                

## join the event features and label to the profile

In [131]:
sql_to_df(u"""
    select p.*,
    s.event_sequence,
    l.label
    from profiles as p
    left join player_event_sequence_sorted as s on s.player_id = p.player_id
    left join player_label as l on l.player_id = p.player_id
    """).write.mode('Overwrite').parquet('/data/profile_feature_label.parquet')

                                                                                

In [7]:
sqlContext.read.parquet('/data/profile_feature_label.parquet').createOrReplaceTempView('profile_feature_label')

In [13]:
sql(u"""
select count(*), count(distinct player_id)
from profile_feature_label
""")



+--------+-------------------------+
|count(1)|count(DISTINCT player_id)|
+--------+-------------------------+
| 6119351|                  6119351|
+--------+-------------------------+





### transform to arrays

In [11]:
def dict_array_to_float_array(
    input,
    key,
    ):
    output = []
    return [a[key] for a in input]

sqlContext.udf.register("dict_array_to_float_array", dict_array_to_int_array, ArrayType(FloatType()))

def dict_array_to_int_array(
    input,
    key,
    ):
    output = []
    return [a[key] for a in input]

sqlContext.udf.register("dict_array_to_int_array", dict_array_to_int_array, ArrayType(IntegerType()))

22/09/11 21:43:53 WARN SimpleFunctionRegistry: The function dict_array_to_int_array replaced a previously registered function.


<function __main__.dict_array_to_int_array(input, key)>

In [9]:
sql_to_df(u"""
select * from profile_feature_label
""").printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- operating_system: integer (nullable = true)
 |-- device_type: integer (nullable = true)
 |-- event_sequence: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- event_date: date (nullable = true)
 |    |    |-- money_spent: float (nullable = true)
 |    |    |-- session_count: integer (nullable = true)
 |    |    |-- purchase_count: integer (nullable = true)
 |    |    |-- time_spent_seconds: integer (nullable = true)
 |    |    |-- ads_impressions: integer (nullable = true)
 |    |    |-- ads_clicks: integer (nullable = true)
 |    |    |-- days_from_registration: integer (nullable = true)
 |    |    |-- event_day_of_week: integer (nullable = true)
 |-- label: integer (nullable = true)



In [22]:
sql_to_df(u"""
select 
player_id,
label,
dayofweek(registration_date) as registration_date_day_of_week,
country_code,
operating_system,
device_type,
dict_array_to_int_array(event_sequence, 'days_from_registration') as days_from_registration,
dict_array_to_int_array(event_sequence, 'event_day_of_week') as event_day_of_week,
dict_array_to_float_array(event_sequence, 'money_spent') as money_spent,
dict_array_to_int_array(event_sequence, 'session_count') as session_count,
dict_array_to_int_array(event_sequence, 'time_spent_seconds') as time_spent_seconds,
dict_array_to_int_array(event_sequence, 'ads_impressions') as ads_impressions,
dict_array_to_int_array(event_sequence, 'ads_clicks') as ads_clicks
from profile_feature_label
""").write.mode('Overwrite').parquet('/data/profile_data_array.parquet')

                                                                                

In [7]:
profile_data_array = sqlContext.read.parquet('/data/profile_data_array.parquet')

### save to npy arrays

for c in profile_data_array.columns:
    print(f'\'{c}\',')

In [9]:
npy_directory = '/data/npy_data'

In [11]:
for c in profile_attributes:
    print(f'saving {c}')
    x = profile_data_array.select(c).collect()
    x = [l[0] for l in x]
    x = np.array(x)
    np.save(f'{npy_directory}/{c}.npy', x)

saving player_id


                                                                                

saving label


                                                                                

saving registration_date_day_of_week




saving country_code


                                                                                

saving operating_system


                                                                                

saving device_type


                                                                                

In [13]:
for c in even_attributes:
    print(f'saving {c}')
    x = profile_data_array.select(c).collect()
    x = [np.asarray(l[0]) for l in x]
    x = np.array(x)
    np.save(f'{npy_directory}/{c}.npy', x)

saving days_from_registration


  x = np.array(x)


saving event_day_of_week


                                                                                

saving money_spent


                                                                                

saving session_count


                                                                                

saving time_spent_seconds


                                                                                

saving ads_impressions


                                                                                

saving ads_clicks


                                                                                

## end