In [1]:
## Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Dataset_Prep").getOrCreate()

In [2]:
from pathlib import Path

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
data_folder = Path.cwd().parents[0].joinpath('data')

In [4]:
## Read members data
members_df = spark.read.format("csv") \
            .option("inferSchema", "False") \
            .option("header", "True") \
            .option("sep", ",") \
            .load(str(data_folder.joinpath('members_v3.csv')))

In [5]:
members_df.printSchema()

root
 |-- msno: string (nullable = true)
 |-- city: string (nullable = true)
 |-- bd: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- registration_init_time: string (nullable = true)



In [6]:
members_df.show(5)

+--------------------+----+---+------+--------------+----------------------+
|                msno|city| bd|gender|registered_via|registration_init_time|
+--------------------+----+---+------+--------------+----------------------+
|Rb9UwLQTrxzBVwCB6...|   1|  0|  null|            11|              20110911|
|+tJonkh+O1CA796Fm...|   1|  0|  null|             7|              20110914|
|cV358ssn7a0f7jZOw...|   1|  0|  null|            11|              20110915|
|9bzDeJP6sQodK73K5...|   1|  0|  null|            11|              20110915|
|WFLY3s7z4EZsieHCt...|   6| 32|female|             9|              20110915|
+--------------------+----+---+------+--------------+----------------------+
only showing top 5 rows



In [7]:
## Print number of null values in each column
#{col:(members_df.filter(members_df[col].isNull() | isnan(members_df[col]))).count() for col in members_df.columns}

In [8]:
## Read transactions data
transactions_df = spark.read.format("csv") \
                  .option("inferSchema", "False") \
                  .option("header", "True") \
                  .option("sep", ",") \
                  .load(str(data_folder.joinpath('transactions.csv')))

In [9]:
transactions_df.printSchema()

root
 |-- msno: string (nullable = true)
 |-- payment_method_id: string (nullable = true)
 |-- payment_plan_days: string (nullable = true)
 |-- plan_list_price: string (nullable = true)
 |-- actual_amount_paid: string (nullable = true)
 |-- is_auto_renew: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- membership_expire_date: string (nullable = true)
 |-- is_cancel: string (nullable = true)



In [10]:
transactions_df.show(5)

+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|                msno|payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|transaction_date|membership_expire_date|is_cancel|
+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|YyO+tlZtAXYXoZhNr...|               41|               30|            129|               129|            1|        20150930|              20151101|        0|
|AZtu6Wl0gPojrEQYB...|               41|               30|            149|               149|            1|        20150930|              20151031|        0|
|UkDFI97Qb6+s2LWci...|               41|               30|            129|               129|            1|        20150930|              20160427|        0|
|M1C56ijxozNaGD0t2...|               39|            

In [11]:
## Print number of null values in each column
#{col:(transactions_df.filter(transactions_df[col].isNull() | isnan(transactions_df[col]))).count() for col in transactions_df.columns}

In [12]:
## Read user logs data
user_logs_df = spark.read.format("csv") \
               .option("inferSchema", "False") \
               .option("header", "True") \
               .option("sep", ",") \
               .load(str(data_folder.joinpath('user_logs.csv')))

In [13]:
user_logs_df.printSchema()

root
 |-- msno: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_25: string (nullable = true)
 |-- num_50: string (nullable = true)
 |-- num_75: string (nullable = true)
 |-- num_985: string (nullable = true)
 |-- num_100: string (nullable = true)
 |-- num_unq: string (nullable = true)
 |-- total_secs: string (nullable = true)



In [14]:
user_logs_df.show(5)

+--------------------+--------+------+------+------+-------+-------+-------+----------+
|                msno|    date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+--------+------+------+------+-------+-------+-------+----------+
|rxIP2f2aN0rYNp+to...|20150513|     0|     0|     0|      0|      1|      1|  280.3350|
|rxIP2f2aN0rYNp+to...|20150709|     9|     1|     0|      0|      7|     11| 1658.9480|
|yxiEWwE9VR5utpUec...|20150105|     3|     3|     0|      0|     68|     36|17364.9560|
|yxiEWwE9VR5utpUec...|20150306|     1|     0|     1|      1|     97|     27|24667.3170|
|yxiEWwE9VR5utpUec...|20150501|     3|     0|     0|      0|     38|     38| 9649.0290|
+--------------------+--------+------+------+------+-------+-------+-------+----------+
only showing top 5 rows



In [15]:
## Print number of null values in each column
#{user_logs_df.filter(user_logs_df[col].isNull() | isnan(user_logs_df[col])).count() for col in user_logs_df.columns}

In [16]:
## Read ground truth data
train_df = spark.read.format("csv") \
           .option("inferSchema", False) \
           .option("header", True) \
           .option("delimiter", ",") \
           .load(str(data_folder.joinpath('train.csv')))

In [17]:
train_df.show(5)

+--------------------+--------+
|                msno|is_churn|
+--------------------+--------+
|waLDQMmcOu2jLDaV1...|       1|
|QA7uiXy8vIbUSPOkC...|       1|
|fGwBva6hikQmTJzrb...|       1|
|mT5V8rEpa+8wuqi6x...|       1|
|XaPhtGLk/5UvvOYHc...|       1|
+--------------------+--------+
only showing top 5 rows



## In transaction data, select only customers in train data

In [18]:
## Step 1: Keep only customer msno present in train data
transactions_df = transactions_df.join(train_df.select('msno'), 'msno', 'inner')

In [19]:
## Step 2: For each msno, keep only the latest transaction
transactions_df = transactions_df.withColumn('transaction_date', to_date(transactions_df['transaction_date'], format='yyyyMMdd'))
transactions_df = transactions_df.sort(col('transaction_date').desc())
transactions_df = transactions_df.dropDuplicates(['msno'])

In [20]:
transactions_df.show(5)

+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|                msno|payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|transaction_date|membership_expire_date|is_cancel|
+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|++4RuqBw0Ss6bQU4o...|               41|               30|            149|               149|            1|      2017-02-13|              20170313|        0|
|+/HS8LzrRGXolKbxR...|               40|               30|            149|               149|            1|      2017-02-17|              20170316|        0|
|+/namlXq+u3izRjHC...|               34|               30|            149|               149|            1|      2017-02-28|              20170331|        0|
|+0/X9tkmyHyet9X80...|               34|            

## In user logs data, select only customers in train data

In [21]:
## Step 1: Keep only customer msno present in train data 
user_logs_df = user_logs_df.join(train_df.select('msno'), 'msno', 'inner')

In [22]:
## Step 2: Cast date column to DateType
user_logs_df = user_logs_df.withColumn('date', to_date(user_logs_df['date'], format='yyyyMMdd'))

In [23]:
## Step 3: Since train set contains customer membership date which expires in the month of February, to use last one month's filter df starting from January 30
user_logs_df = user_logs_df.filter((user_logs_df['date'] >= (lit('2017-01-30').cast(DateType()))) & (user_logs_df['date'] <= (lit('2017-02-28').cast(DateType()))))

In [24]:
user_logs_df.show(5)

+--------------------+----------+------+------+------+-------+-------+-------+----------+
|                msno|      date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+----------+------+------+------+-------+-------+-------+----------+
|++4RuqBw0Ss6bQU4o...|2017-02-20|     3|     0|     0|      1|      4|      7| 1368.1910|
|+/namlXq+u3izRjHC...|2017-01-31|    21|     2|     2|      0|      7|     32| 2378.9580|
|+/namlXq+u3izRjHC...|2017-02-01|     4|     0|     0|      1|      4|      9| 1378.2200|
|+/namlXq+u3izRjHC...|2017-02-02|    23|     3|     0|      2|      7|     35| 2599.7370|
|+/namlXq+u3izRjHC...|2017-02-03|    13|     0|     0|      0|     27|     40| 7097.6090|
+--------------------+----------+------+------+------+-------+-------+-------+----------+
only showing top 5 rows



## For members data, subset only records which are present in train set

In [25]:
members_df = members_df.join(train_df.select('msno'), 'msno', 'inner')

In [26]:
members_df.show(5)

+--------------------+----+---+------+--------------+----------------------+
|                msno|city| bd|gender|registered_via|registration_init_time|
+--------------------+----+---+------+--------------+----------------------+
|++4RuqBw0Ss6bQU4o...|   1|  0|  null|             7|              20140714|
|+/namlXq+u3izRjHC...|  15| 31|  male|             9|              20060603|
|+0/X9tkmyHyet9X80...|   9| 31|  male|             9|              20040330|
|+09YGn842g6h2EZUX...|  15| 29|  male|             9|              20080322|
|+0RJtbyhoPAHPa+34...|  13| 29|female|             3|              20120612|
+--------------------+----+---+------+--------------+----------------------+
only showing top 5 rows



## Next step
1. For each user, compute the last 30 day user logs (add a new column for number of times the user has used the service)

In [35]:
## Convert required columns to numeric/float type
int_col = ["num_25", "num_50", "num_75", "num_985", "num_100", "num_unq"]
float_col = ["total_secs"] 

for col in int_col:
    user_logs_df = user_logs_df.withColumn(col, user_logs_df[col].cast(IntegerType()))
    
for col in float_col:
    user_logs_df = user_logs_df.withColumn(col, user_logs_df[col].cast(FloatType()))

In [37]:
## Aggregate data
user_logs_df = user_logs_df.groupBy("msno") \
               .agg(mean("num_25").alias("avg_num_25"), \
                    mean("num_50").alias("avg_num_50"), \
                    mean("num_75").alias("avg_num_75"), \
                    mean("num_985").alias("avg_num_985"), \
                    mean("num_100").alias("avg_num_100"), \
                    mean("num_unq").alias("avg_num_unq"), \
                    mean("total_secs").alias("avg_total_secs"), \
                    count("num_25").alias("number_of_days_used"))

In [39]:
user_logs_df.show(5, truncate=False)

+--------------------------------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|msno                                        |avg_num_25        |avg_num_50        |avg_num_75        |avg_num_985       |avg_num_100       |avg_num_unq       |avg_total_secs    |number_of_days_used|
+--------------------------------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|++4RuqBw0Ss6bQU4oMxaRlbBPoWzoEiIZaxPM04Y4+U=|3.0               |0.0               |0.0               |1.0               |4.0               |7.0               |1368.1910400390625|1                  |
|+/namlXq+u3izRjHCFJV4MgqcXcLidZYszVsROOq/y4=|24.846153846153847|1.1923076923076923|1.0769230769230769|1.1538461538461537|27.923076923076923|51.80769230769231 |7806.437842735877 |26                 |


In [40]:
user_logs_df.count()

767168

In [None]:
model_dataset = train_df.join(members_df, 'msno', 'left') \
                .join(transactions_df, 'msno', 'left') \
                .join(user_logs_df, 'msno', 'left')