## Cleaning the data

In [222]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

import pandas as pd
import numpy as np
import boto3

sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [223]:
# s3 = boto3.resource('s3')
# bucket = s3.Bucket("msds-630-finalproject")

# client = boto3.client("s3", aws_access_key_id=access_key, aws_secret_acess_key=secret_key)

# obj = client.get_object(Bucket="msds-630-finalproject", Key="sessions.csv")
# sessions = pd.read_csv(obj["Body"])

In [224]:
#RUN ON FULL DATA
df = ss.read.csv("session_100000.csv", header=True)
#df = ss.read.csv("s3a://msds-630-finalproject/sessions.csv", header=True)

In [225]:
df.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- timezone_offset: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- is_developer: string (nullable = true)
 |-- is_wau: string (nullable = true)
 |-- is_mau: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- user_id_hash: string (nullable = true)



In [226]:
df.show(2)

+----------------+-------------------+---------------+-----------+---------------+--------------------------+----------------------+---------------------+----------+------------+------+------+-------+------+-----------+------------------+------------------+------+----------+-------------+--------------------+--------------------+
|          app_id|         session_id|start_timestamp|   timezone|timezone_offset|previous_sessions_duration|user_created_timestamp|is_user_first_session|is_session|is_developer|is_wau|is_mau|country|region|       city|          latitude|         longitude|locale|   os_name|session_index|           device_id|        user_id_hash|
+----------------+-------------------+---------------+-----------+---------------+--------------------------+----------------------+---------------------+----------+------------+------+------+-------+------+-----------+------------------+------------------+------+----------+-------------+--------------------+--------------------+
|472

Dropping the columns

In [227]:
df = df.drop('timezone_offset', 'device_id')
df.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- is_developer: string (nullable = true)
 |-- is_wau: string (nullable = true)
 |-- is_mau: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- user_id_hash: string (nullable = true)



Dropping is developer true and is_mau and is_wau true

In [228]:
df = df.filter('is_developer==false').drop('is_developer')
df = df.filter('is_mau==false').drop('is_mau')
df = df.filter('is_wau==false').drop('is_wau')
df.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- user_id_hash: string (nullable = true)



Handling missing values

seems like if city is ? it is just an unknown, we will treat this as others

In [229]:
df = df.replace(["?"], ["Other"], ["city"])
df.select("city", "country", "locale").filter(df.city == "Other").show()

+-----+-------+------+
| city|country|locale|
+-----+-------+------+
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
|Other|     US| en_US|
+-----+-------+------+
only showing top 20 rows



In [230]:
df.select("city", "country", "locale", "latitude", "longitude", "timezone", "region").filter(df.city.isNull()).show()

+----+-------+------+--------+---------+-------------+------+
|city|country|locale|latitude|longitude|     timezone|region|
+----+-------+------+--------+---------+-------------+------+
|null|     ZZ| in_ID|    null|     null|Asia/Makassar|  null|
|null|     ZZ| in_ID|    null|     null|Asia/Makassar|  null|
|null|     ZZ| in_ID|    null|     null|Asia/Makassar|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|   null|  null|    null|     null|         null|  null|
|null|  

In [231]:
df.select("city", "country", "locale", "latitude", "longitude", "timezone", "region").filter(df.city.isNull() & df.timezone.isNotNull()).show()

+----+-------+--------+--------+---------+-----------------+------+
|city|country|  locale|latitude|longitude|         timezone|region|
+----+-------+--------+--------+---------+-----------------+------+
|null|     ZZ|   in_ID|    null|     null|    Asia/Makassar|  null|
|null|     ZZ|   in_ID|    null|     null|    Asia/Makassar|  null|
|null|     ZZ|   in_ID|    null|     null|    Asia/Makassar|  null|
|null|     ZZ|   en_US|    null|     null|  America/Jamaica|  null|
|null|     ZZ|   ru_RU|    null|     null|      Asia/Almaty|  null|
|null|     ZZ|   ru_RU|    null|     null|      Asia/Almaty|  null|
|null|     ZZ|   ru_RU|    null|     null|      Asia/Almaty|  null|
|null|     ZZ|   ru_RU|    null|     null|      Asia/Almaty|  null|
|null|     ZZ|   ru_RU|    null|     null|      Asia/Almaty|  null|
|null|     ZZ|   ru_RU|    null|     null|      Asia/Almaty|  null|
|null|     ZZ|   en_GB|    null|     null|     Asia/Kolkata|  null|
|null|     ZZ|   te_IN|    null|     null|    As

In [232]:
df.select("city", "country", "locale", "latitude", "longitude", "timezone", "region").filter(df.city.isNull() & df.timezone.isNotNull()).count()

31

In [233]:
df.select("city", "country", "locale", "latitude", "longitude", "timezone", "region").filter(df.city.isNull() & df.timezone.isNull()).count()

566

seems like when we have null country, the city is also null, locale, latitude and longitute doesnt help to figure out the location. time zone is available fot some of the missing values.
I will leave it here for now, but basically we can find the most common country amd city for the timezone and replace null city and counntry with it. For the rest we need to see if we can impute with most common one or we will ignore for now

In [234]:
df.groupBy(df["city"]).count()\
.orderBy("count", ascending=False).show()

+------------+-----+
|        city|count|
+------------+-----+
|      london| 1837|
|      dallas| 1656|
|    new york| 1571|
|     chicago| 1235|
|   melbourne| 1137|
| quezon city| 1035|
|     houston| 1013|
|      mumbai|  891|
|       lagos|  859|
|     orlando|  850|
|     atlanta|  849|
|kuala lumpur|  839|
|  chandigarh|  828|
|     jakarta|  812|
| los angeles|  733|
|    budapest|  731|
|       Other|  725|
|        pune|  710|
|    kingston|  683|
|     phoenix|  672|
+------------+-----+
only showing top 20 rows



In [235]:
df.select("city").filter(df.city == "?").count()

0

In [236]:
df.select("city").filter(df.city.isNull()).count()

597

In [237]:
df.select("country").filter(df.country == "?").count()

0

In [238]:
df.select("country").filter(df.country.isNull()).count()

566

In [239]:
df.select("os_name").describe().show()

+-------+----------+
|summary|   os_name|
+-------+----------+
|  count|     99127|
|   mean|      null|
| stddev|      null|
|    min|Android OS|
|    max| iPhone OS|
+-------+----------+



In [240]:
df.select("os_name").filter(df.os_name == "?").count()

0

In [241]:
df.select("os_name").filter(df.os_name.isNull()).count()

870

In [242]:
from pyspark.sql.functions import isnan, when, count, col

In [243]:
missing = df.select([count(when(isnan(c) | col(c)  \
                      .isNull(), c)).alias(c) for c in df.columns])
missing.show()

+------+----------+---------------+--------+--------------------------+----------------------+---------------------+----------+-------+------+----+--------+---------+------+-------+-------------+------------+
|app_id|session_id|start_timestamp|timezone|previous_sessions_duration|user_created_timestamp|is_user_first_session|is_session|country|region|city|latitude|longitude|locale|os_name|session_index|user_id_hash|
+------+----------+---------------+--------+--------------------------+----------------------+---------------------+----------+-------+------+----+--------+---------+------+-------+-------------+------------+
|     0|         0|              0|     566|                         0|                     0|                    0|         0|    566|   597| 597|     597|      597|   566|    870|            0|           0|
+------+----------+---------------+--------+--------------------------+----------------------+---------------------+----------+-------+------+----+--------+--------

In [244]:
missing.select("country", "city", "os_name").show()

+-------+----+-------+
|country|city|os_name|
+-------+----+-------+
|    566| 597|    870|
+-------+----+-------+



We dont have any other missing values other then this three categories, the country and city we wil irgnore for now. As mentioned above, any other regional information doesnt really help to identify the missing values

In [245]:
#dropping regional info:

In [246]:
df = df.drop("locale", "latitude", "longitude", "timezone", "region")

1. We can train the model to predict the os_name for the missing values, i can do it on total

In [247]:
df.count()

99997

In [248]:
df_ml = df

In [249]:
# converting to categorical

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

def toIntSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v)   

def indexStringColumns(df, cols):
    newdf = df   
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:   
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf


df_ml = indexStringColumns(df_ml, ["app_id"])
df_ml = indexStringColumns(df_ml, ["session_id"])
df_ml = indexStringColumns(df_ml, ["is_user_first_session"])

In [250]:
df_ml.show(2)

+---------------+--------------------------+----------------------+----------+-------+-----------+----------+-------------+--------------------+------+----------+---------------------+
|start_timestamp|previous_sessions_duration|user_created_timestamp|is_session|country|       city|   os_name|session_index|        user_id_hash|app_id|session_id|is_user_first_session|
+---------------+--------------------------+----------------------+----------+-------+-----------+----------+-------------+--------------------+------+----------+---------------------+
|  1542215364580|                  25837591|         1538874289458|      true|     PH|     makati|Android OS|           30|9943447915df3a45f...|   0.0|   93536.0|                  0.0|
|  1543712977293|                  35050130|         1538874289458|      true|     PH|quezon city|Android OS|           47|9943447915df3a45f...|   0.0|   21561.0|                  0.0|
+---------------+--------------------------+----------------------+--------

In [251]:
os_not_null = df_ml.filter(df_ml.os_name.isNotNull())

In [252]:
os_not_null = indexStringColumns(os_not_null, ["os_name"])

In [253]:
os_not_null.groupBy("os_name").count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|    0.0|67300|
|    1.0|31127|
|    2.0|  700|
+-------+-----+



In [254]:
os_0 = os_not_null.filter("os_name = 0.0").limit(700)

In [255]:
os_1 = os_not_null.filter("os_name = 1.0").limit(700)

In [256]:
os_2 = os_not_null.filter("os_name = 2.0")

In [257]:
from pyspark.sql.functions import *


os = os_0.union(os_1)
os_not_null_red = os.union(os_2)

In [258]:
# creating a feature vector:

from pyspark.ml.feature import VectorAssembler

input_cols=["app_id", "session_id",
           "is_user_first_session"]

va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(os_not_null_red).select("features", "os_name")\
.withColumnRenamed("os_name", "label")

In [259]:
#splitting the data

splits = lpoints.randomSplit([0.8, 0.2])
train = splits[0].cache()
valid = splits[1].cache()

In [260]:
train.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  549|
|  1.0|  570|
|  2.0|  574|
+-----+-----+



In [261]:
predict = df_ml.filter(df_ml.os_name.isNull())

In [262]:
predict.groupBy('os_name').count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|   null|  870|
+-------+-----+



In [263]:
predict = predict.withColumn('os_name_1', lit(0))

In [264]:
predict = predict.drop("os_name").withColumnRenamed('os_name_1', 'os_name')

In [265]:
predict.groupBy('os_name').count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|      0|  870|
+-------+-----+



In [266]:
input_cols=["app_id", "session_id",
           "is_user_first_session"]

va = VectorAssembler(outputCol="features", inputCols=input_cols)

predict_df = va.transform(predict).select("features", "os_name")\
.withColumnRenamed("os_name", "label")

In [267]:
lpoints.show(2)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[0.0,93536.0,0.0]|  0.0|
|[0.0,21561.0,0.0]|  0.0|
+-----------------+-----+
only showing top 2 rows



In [268]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(train)

In [269]:
validpredicts = lrmodel.transform(valid)
validpredicts.groupby("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  165|
|       1.0|   54|
|       2.0|  188|
+----------+-----+



In [270]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predicts = lrmodel.transform(valid)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predicts)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.601966


Not great but we will take it!

In [271]:
predict_on_missing = lrmodel.transform(predict_df)
predict_on_missing.show()

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[0.0,95901.0,0.0]|    0|[-0.2235651856262...|[0.26265640007752...|       2.0|
|[0.0,55236.0,1.0]|    0|[-0.2805032404383...|[0.24371312540990...|       1.0|
|[0.0,37583.0,0.0]|    0|[0.03571404276013...|[0.34531081250746...|       0.0|
|[0.0,83417.0,0.0]|    0|[-0.1680618781195...|[0.27933099776396...|       2.0|
|[0.0,88299.0,1.0]|    0|[-0.4274998646047...|[0.20649896618707...|       1.0|
|[0.0,77036.0,0.0]|    0|[-0.1396922365623...|[0.28808249752722...|       2.0|
|[0.0,29987.0,0.0]|    0|[0.06948552015687...|[0.35687622497794...|       0.0|
|[0.0,56000.0,0.0]|    0|[-0.0461671182740...|[0.31796385090025...|       2.0|
|[0.0,76718.0,0.0]|    0|[-0.1382784227376...|[0.28852257118479...|       2.0|
|[0.0,24638.0,0.0]|    0|[0.09326693571705...|[0.365

In [272]:
predicticted_os = predict_on_missing.select("prediction")
predicticted_os.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  316|
|       1.0|  144|
|       2.0|  410|
+----------+-----+



In [273]:
#merge with nulls and merge with other to put in full df
df_nulls = df.filter(df.os_name.isNull())
df_nulls.groupBy("os_name").count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|   null|  870|
+-------+-----+



In [274]:
df1 = df_nulls.withColumn("id", monotonically_increasing_id())
df2 = predicticted_os.withColumn("id", monotonically_increasing_id())

df_pred_nulls = df1.join(df2, "id", "outer").drop("id")

In [275]:
df_imputed = df_pred_nulls.drop("os_name").withColumnRenamed("prediction", "os_name")

In [276]:
df_imputed.groupBy("os_name").count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|    0.0|  316|
|    1.0|  144|
|    2.0|  410|
+-------+-----+



In [277]:
df_imputed.count()

870

In [278]:
df_not_null = df.filter(df_ml.os_name.isNotNull())
df_not_null
df_not_null.count()

99127

In [279]:
os_not_null.groupBy("os_name").count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|    0.0|67300|
|    1.0|31127|
|    2.0|  700|
+-------+-----+



In [280]:
os_not_null.count()

99127

In [281]:
os_not_null.printSchema()

root
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- user_id_hash: string (nullable = true)
 |-- app_id: double (nullable = false)
 |-- session_id: double (nullable = false)
 |-- is_user_first_session: double (nullable = false)
 |-- os_name: double (nullable = false)



In [282]:
870+99127

99997

In [283]:
os_not_null = os_not_null.select("app_id", "session_id", "start_timestamp", "previous_sessions_duration",
                                 "user_created_timestamp","is_user_first_session","is_session", "country", "city",
                                "session_index", "user_id_hash","os_name")
os_not_null.printSchema()

root
 |-- app_id: double (nullable = false)
 |-- session_id: double (nullable = false)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_user_first_session: double (nullable = false)
 |-- is_session: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- user_id_hash: string (nullable = true)
 |-- os_name: double (nullable = false)



In [284]:
df_imputed.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- user_id_hash: string (nullable = true)
 |-- os_name: double (nullable = true)



In [285]:
df_t = os_not_null.union(df_imputed)

In [286]:
df_t.groupBy("os_name").count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|    0.0|67616|
|    1.0|31271|
|    2.0| 1110|
+-------+-----+



In [287]:
df_t.count()

99997

2. after done i will tranform it to categorical and save to cvs and run test  train split, transform to one hot the 

In [288]:
df_t.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- user_id_hash: string (nullable = true)
 |-- os_name: double (nullable = true)



In [289]:
df_t.select("session_index").show(2)

+-------------+
|session_index|
+-------------+
|           30|
|           47|
+-------------+
only showing top 2 rows



In [290]:
# converting strings to categorical values:

df_n = indexStringColumns(df_t, ["is_user_first_session"]) # its a boolean
df_n = indexStringColumns(df_n, ["is_session"]) # its a boolean

#what are we doing with, one hot?: I will leave it for now, we can do target encoding on that later
#country
#city

# os_name for sure categorical but normal or one_hot?
df_n = indexStringColumns(df_n, ["os_name"])

In [291]:
df_n.show(2)

+------+----------+---------------+--------------------------+----------------------+-------+-----------+-------------+--------------------+---------------------+----------+-------+
|app_id|session_id|start_timestamp|previous_sessions_duration|user_created_timestamp|country|       city|session_index|        user_id_hash|is_user_first_session|is_session|os_name|
+------+----------+---------------+--------------------------+----------------------+-------+-----------+-------------+--------------------+---------------------+----------+-------+
|   0.0|   93536.0|  1542215364580|                  25837591|         1538874289458|     PH|     makati|           30|9943447915df3a45f...|                  0.0|       0.0|    0.0|
|   0.0|   21561.0|  1543712977293|                  35050130|         1538874289458|     PH|quezon city|           47|9943447915df3a45f...|                  0.0|       0.0|    0.0|
+------+----------+---------------+--------------------------+----------------------+-----

In [292]:
from pyspark.sql.types import IntegerType

df_n = df_n.withColumn("previous_sessions_duration", df_n["previous_sessions_duration"].cast(IntegerType()))
df_n = df_n.withColumn("session_index", df_n["session_index"].cast(IntegerType()))

In [293]:
df_n.select("app_id", "session_id").show(2)

+------+----------+
|app_id|session_id|
+------+----------+
|   0.0|   93536.0|
|   0.0|   21561.0|
+------+----------+
only showing top 2 rows



In [294]:
#df_n = df_n.withColumn("app_id", df_n["app_id"].cast(IntegerType())) #if i cast them they got nulled
#df_n = df_n.withColumn("session_id", df_n["session_id"].cast(IntegerType())) #if i cast them they got nulled
#df_n = df_n.withColumn("start_timestamp", df_n["start_timestamp"].cast(IntegerType())) #not casting since we use it below as str
#df_n = df_n.withColumn("user_created_timestamp", df_n["user_created_timestamp"].cast(IntegerType())) #not casting since we use it below as str

In [295]:
df_n = df_n.withColumn("is_user_first_session", df_n["is_user_first_session"].cast(IntegerType()))
df_n = df_n.withColumn("is_session", df_n["is_session"].cast(IntegerType()))
df_n = df_n.withColumn("os_name", df_n["os_name"].cast(IntegerType()))
df_n.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: integer (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: integer (nullable = true)
 |-- user_id_hash: string (nullable = true)
 |-- is_user_first_session: integer (nullable = true)
 |-- is_session: integer (nullable = true)
 |-- os_name: integer (nullable = true)



In [296]:
df_n.show(2)

+------+----------+---------------+--------------------------+----------------------+-------+-----------+-------------+--------------------+---------------------+----------+-------+
|app_id|session_id|start_timestamp|previous_sessions_duration|user_created_timestamp|country|       city|session_index|        user_id_hash|is_user_first_session|is_session|os_name|
+------+----------+---------------+--------------------------+----------------------+-------+-----------+-------------+--------------------+---------------------+----------+-------+
|   0.0|   93536.0|  1542215364580|                  25837591|         1538874289458|     PH|     makati|           30|9943447915df3a45f...|                    0|         0|      0|
|   0.0|   21561.0|  1543712977293|                  35050130|         1538874289458|     PH|quezon city|           47|9943447915df3a45f...|                    0|         0|      0|
+------+----------+---------------+--------------------------+----------------------+-----

In [298]:
df_n.repartition(1).write.format("com.databricks.spark.csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("delimiter", ",")\
  .save("sessions_cleaned.csv")


## Spliting into test and train: 

In [299]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("csv").load('sessions_cleaned.csv', header=True, inferSchema="true")

In [300]:
from datetime import datetime
import pandas as pd

In [301]:
df = df.toPandas()

In [302]:
df.head()

Unnamed: 0,app_id,session_id,start_timestamp,previous_sessions_duration,user_created_timestamp,country,city,session_index,user_id_hash,is_user_first_session,is_session,os_name
0,0.0,93536.0,1542215364580,25837591,1538874289458,PH,makati,30,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
1,0.0,21561.0,1543712977293,35050130,1538874289458,PH,quezon city,47,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
2,0.0,92180.0,1539215568666,11343848,1538874289458,PH,makati,10,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
3,0.0,27200.0,1540120743010,13499724,1538874289458,PH,davao city,13,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
4,0.0,42027.0,1542671625528,32788010,1538874289458,PH,makati,41,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0


In [303]:
df['start_timestamp'] = pd.to_datetime(df['start_timestamp'], unit='ms')
df['user_created_timestamp'] = pd.to_datetime(df['user_created_timestamp'], unit='ms')

df.info(memory_usage='deep')

dec1_cutoff = datetime(2018, 12, 1, 0, 0, 0)
dec14_cutoff = datetime(2018, 12, 14, 0, 0, 0)

df_train = df[df['start_timestamp'] < dec1_cutoff]

df_dec1_dec14 = df[df['start_timestamp'] > dec1_cutoff]

#df_test is dec1
df_test = df_dec1_dec14[df_dec1_dec14['start_timestamp'] < dec14_cutoff]
#df_dec1_dec14 = df_dec1_dec14[df_dec1_dec14['start_timestamp'] > dec14_cutoff]

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 99997 entries, 0 to 99996
Data columns (total 12 columns):
app_id                        99997 non-null float64
session_id                    99997 non-null float64
start_timestamp               99997 non-null datetime64[ns]
previous_sessions_duration    99997 non-null int32
user_created_timestamp        99997 non-null datetime64[ns]
country                       99431 non-null object
city                          99400 non-null object
session_index                 99997 non-null int32
user_id_hash                  99997 non-null object
is_user_first_session         99997 non-null int32
is_session                    99997 non-null int32
os_name                       99997 non-null int32
dtypes: datetime64[ns](2), float64(2), int32(5), object(3)
memory usage: 28.3 MB


In [304]:
df_train.head()

Unnamed: 0,app_id,session_id,start_timestamp,previous_sessions_duration,user_created_timestamp,country,city,session_index,user_id_hash,is_user_first_session,is_session,os_name
0,0.0,93536.0,2018-11-14 17:09:24.580,25837591,2018-10-07 01:04:49.458,PH,makati,30,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
2,0.0,92180.0,2018-10-10 23:52:48.666,11343848,2018-10-07 01:04:49.458,PH,makati,10,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
3,0.0,27200.0,2018-10-21 11:19:03.010,13499724,2018-10-07 01:04:49.458,PH,davao city,13,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
4,0.0,42027.0,2018-11-19 23:53:45.528,32788010,2018-10-07 01:04:49.458,PH,makati,41,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0
5,0.0,81362.0,2018-10-08 11:25:13.013,5872534,2018-10-07 01:04:49.458,PH,davao city,4,9943447915df3a45fd6720a026af905b6da6b56a37701b...,0,0,0


In [305]:
df_train.to_csv("train.csv", sep=",")

In [306]:
df_test.to_csv("test.csv", sep=",")

In [307]:
# df_train.write.format("csv").save('train.csv')
# df_test.write.format("csv").save('test.csv')

## 3. Adding feature os_freq on test and train separatly 

In [308]:
df = ss.read.csv("train.csv", header=True)

In [309]:
df = df.drop("_c0")

In [310]:
df.groupBy("os_name").count().show()

+-------+-----+
|os_name|count|
+-------+-----+
|      0|58878|
|      1|26518|
|      2|  925|
+-------+-----+



In [311]:
from pyspark.sql.functions import col, expr, when

os_t = df.count()
os_0 = df.filter("os_name=0").count()
os_1 = df.filter("os_name=1").count()
os_2 = df.filter("os_name=2").count()

new_col = when(col("os_name")==0, os_0/os_t).when(col("os_name")==1, os_1/os_t).when(col("os_name")==2, os_2/os_t)

df = df.withColumn("os_freq", new_col)

df.filter("os_name=0").select("os_freq").show(2)

+------------------+
|           os_freq|
+------------------+
|0.6820819962697374|
|0.6820819962697374|
+------------------+
only showing top 2 rows



In [312]:
df1 = ss.read.csv("test.csv", header=True)
df1.groupBy("os_name").count().show()
df1 = df1.drop("_c0")

+-------+-----+
|os_name|count|
+-------+-----+
|      0| 8228|
|      1| 4436|
|      2|  181|
+-------+-----+



In [313]:
os_t = df1.count()
os_0 = df1.filter("os_name=0").count()
os_1 = df1.filter("os_name=1").count()
os_2 = df1.filter("os_name=2").count()

new_col = when(col("os_name")==0, os_0/os_t).when(col("os_name")==1, os_1/os_t).when(col("os_name")==2, os_2/os_t)

df1 = df1.withColumn("os_freq", new_col)

df1.filter("os_name=0").select("os_freq").show(2)

+------------------+
|           os_freq|
+------------------+
|0.6405605293888673|
|0.6405605293888673|
+------------------+
only showing top 2 rows



## 4. group by user_hash and create total num of sessions created on train and on test and
## 5. group by user_hash and create avg previous session durationon train and on test (both below)

In [314]:
df.count()

86321

In [315]:
number_sessions = df.groupBy("user_id_hash").agg(count("session_id"))
df = df.join(number_sessions, "user_id_hash", "left")
df.count()

86321

In [316]:
df = df.withColumnRenamed("count(session_id)", "number_sessions")
df.printSchema()

root
 |-- user_id_hash: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- os_freq: double (nullable = true)
 |-- number_sessions: long (nullable = true)



In [317]:
avg_prev_session_dur = df.groupBy("user_id_hash").agg(avg("previous_sessions_duration"))
df = df.join(avg_prev_session_dur, "user_id_hash", "left")
df.printSchema()

root
 |-- user_id_hash: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- os_freq: double (nullable = true)
 |-- number_sessions: long (nullable = true)
 |-- avg(previous_sessions_duration): double (nullable = true)



In [318]:
df = df.withColumnRenamed("avg(previous_sessions_duration)", "avg_prev_session_dur")
df.printSchema()

root
 |-- user_id_hash: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- os_freq: double (nullable = true)
 |-- number_sessions: long (nullable = true)
 |-- avg_prev_session_dur: double (nullable = true)



In [319]:
number_sessions = df1.groupBy("user_id_hash").agg(count("session_id"))
df1 = df1.join(number_sessions, "user_id_hash", "left")
df1 = df1.withColumnRenamed("count(session_id)", "number_sessions")
avg_prev_session_dur = df1.groupBy("user_id_hash").agg(avg("previous_sessions_duration"))
df1 = df1.join(avg_prev_session_dur, "user_id_hash", "left")
df1 = df1.withColumnRenamed("avg(previous_sessions_duration)", "avg_prev_session_dur")

get country and city info:

In [320]:
a = df.groupBy("user_id_hash")

In [321]:
a.count().show()

+--------------------+-----+
|        user_id_hash|count|
+--------------------+-----+
|000d22a1e4e1abdd0...|  210|
|030b7aaefeeebc223...|    2|
|03663d0eda3bf33a2...|    2|
|04b6836e16e856579...|    2|
|09028ad40995ddae7...|   13|
|09580bf48092fd097...|    2|
|0de90f3a2a04b4d4b...|    2|
|1acd4823fae34d820...|    2|
|2aed29211c979f9df...|    2|
|32f1ae64f27d330b0...|    7|
|3818994b1a66a47a4...|    5|
|3b1be5c17d58562da...|   22|
|448a7de13d61c19a1...|    7|
|4541c8aad5a9c69cc...|    8|
|470195b52fda2a7bf...|    3|
|4bf7f1a538c09c3ca...|    2|
|5cf6a15acc79d6bb0...|    1|
|71f0ebc9b9c7b0747...|    3|
|78797634a127768e3...|    2|
|7be48439102699b30...|    9|
+--------------------+-----+
only showing top 20 rows



In [322]:
from pyspark.sql.functions import col, countDistinct

df.groupBy("user_id_hash", "country").count().show()

+--------------------+-------+-----+
|        user_id_hash|country|count|
+--------------------+-------+-----+
|000d22a1e4e1abdd0...|     NG|  210|
|030b7aaefeeebc223...|     SG|    2|
|03663d0eda3bf33a2...|     EE|    2|
|04b6836e16e856579...|     JM|    2|
|09028ad40995ddae7...|     US|   13|
|09580bf48092fd097...|     PR|    2|
|0de90f3a2a04b4d4b...|     US|    2|
|1acd4823fae34d820...|     US|    2|
|2aed29211c979f9df...|     US|    2|
|32f1ae64f27d330b0...|     IN|    7|
|3818994b1a66a47a4...|     US|    5|
|3b1be5c17d58562da...|     US|   22|
|448a7de13d61c19a1...|     ES|    7|
|4541c8aad5a9c69cc...|     IN|    8|
|470195b52fda2a7bf...|     US|    3|
|4bf7f1a538c09c3ca...|     GR|    2|
|5cf6a15acc79d6bb0...|     SA|    1|
|71f0ebc9b9c7b0747...|     AU|    3|
|78797634a127768e3...|     ID|    2|
|7be48439102699b30...|     IN|    9|
+--------------------+-------+-----+
only showing top 20 rows



In [323]:
from pyspark.sql import Window

grouped = df.groupBy("user_id_hash", "country", "city", "os_name", "os_freq", "number_sessions", "avg_prev_session_dur").count()

window = Window.partitionBy("user_id_hash").orderBy(desc("count"))

gropuped = grouped\
    .withColumn('order', row_number().over(window))\
    .where(col('order') == 1).drop("order")

grouped

DataFrame[user_id_hash: string, country: string, city: string, os_name: string, os_freq: double, number_sessions: bigint, avg_prev_session_dur: double, count: bigint]

In [324]:
grouped.repartition(1).write.format("com.databricks.spark.csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("delimiter", ",")\
  .save("train_features.csv")

In [328]:
df1.printSchema()

root
 |-- user_id_hash: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- start_timestamp: string (nullable = true)
 |-- previous_sessions_duration: string (nullable = true)
 |-- user_created_timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- session_index: string (nullable = true)
 |-- is_user_first_session: string (nullable = true)
 |-- is_session: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- os_freq: double (nullable = true)
 |-- number_sessions: long (nullable = true)
 |-- avg_prev_session_dur: double (nullable = true)



In [329]:
grouped1 = df1.groupBy("user_id_hash", "country", "city", "os_name", "os_freq", "number_sessions", 
                       "avg_prev_session_dur").count()

In [330]:
window1 = Window.partitionBy("user_id_hash").orderBy(desc("count"))

grouped1\
    .withColumn('order', row_number().over(window1))\
    .where(col('order') == 1).drop("order")

grouped1.repartition(1).write.format("com.databricks.spark.csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("delimiter", ",")\
  .save("test_features.csv")

to do after:
6. I have train and test data, they need to be both merged and
7. merge with labels and do target encoding on city and country

In [333]:
labels = pd.read_csv("features_train.csv")

In [334]:
labels.head()

Unnamed: 0.1,Unnamed: 0,user_id_hash,user_purchase_binary_7_days,user_purchase_binary_14_days,num_purchase,value_purchase
0,0,e469dfaed039ead9110165d9bc457acb11609ca34057dc...,0.0,0.0,0.0,0.0
1,1,afcc639a324b6c598ef83d360450afa011cb2dd1358bf9...,0.0,0.0,0.0,0.0
2,2,fd5a7cf211d08e3e00f7be6a9df6e6ea3d2e5c22a5d9c3...,0.0,0.0,0.0,0.0
3,3,00bfff98b9d0329f014c2eeac7ce47cd18b2bc6e10d608...,0.0,0.0,0.0,0.0
4,4,0d298f3638c43e915c119d4935e1ce8d168f81b5e3e8c1...,0.0,0.0,0.0,0.0


In [325]:
sesh_labels = pd.read_csv("train_features.csv/part-00000-d33b4915-8780-41d1-9e40-8c96a1251785-c000.csv")

In [326]:
sesh_labels.head()

Unnamed: 0,user_id_hash,country,city,os_name,os_freq,number_sessions,avg_prev_session_dur,count
0,000d22a1e4e1abdd0dcfce208299ebcf3d708456024ee9...,NG,lagos,0,0.682082,210,67301640.0,208
1,000d22a1e4e1abdd0dcfce208299ebcf3d708456024ee9...,NG,owerri,0,0.682082,210,67301640.0,2
2,030b7aaefeeebc223136cddf0d0a9f785d1e8e85554190...,SG,singapore,1,0.307202,2,530442.0,2
3,03663d0eda3bf33a23ae98d01e3533537d21383f9f7dcb...,EE,valga,0,0.682082,2,1294854.0,2
4,04b6836e16e8565796440451c969cd81187f9c97de9692...,JM,kingston,0,0.682082,2,266792.0,2


In [337]:
features_labels = labels.merge(sesh_labels, left_on='user_id_hash', right_on='user_id_hash')

In [338]:
features_labels.head()

Unnamed: 0.1,Unnamed: 0,user_id_hash,user_purchase_binary_7_days,user_purchase_binary_14_days,num_purchase,value_purchase,country,city,os_name,os_freq,number_sessions,avg_prev_session_dur,count
0,154,ff85e474e26272486fc9c379a344b68d779085e9753a8f...,0.0,0.0,0.0,0.0,DE,wuppertal,0,0.682082,4,4040769.75,4
1,274,6c338ad0a43d6d8b32ad185e349ea5032c86bcd4caa004...,0.0,0.0,0.0,0.0,PK,karachi,0,0.682082,2,1037072.5,2
2,333,e3e520438c89cb10c3a6f4f2a772f46fedf10df74d4e0d...,0.0,0.0,0.0,0.0,PE,arequipa,1,0.307202,10,1062377.1,10
3,358,5cec909f078fc6f35208b7166d2fa3500032fad798d046...,0.0,0.0,0.0,0.0,US,los angeles,0,0.682082,2,44849.5,2
4,366,63f98b74f066b34346662a5d002bfac15f09b8a4648bf0...,0.0,0.0,0.0,0.0,GB,bramhall,1,0.307202,144,73349722.5,143


In [439]:
sc.stop()