# 1. Basic Environment Setup 
- Include libraries / packages
- Manage Spark Session
- Import data with basic preview

In [1]:
# Import packages
import os
from datetime import datetime
from dateutil import tz
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, count, hour, isnan, json_tuple, lpad, \
                                  max, rank, udf, when
from pyspark.sql.types import StringType

In [2]:
# Setup spark session
spark = SparkSession.builder \
      .master('local') \
      .getOrCreate()

In [3]:
# Read snappy downloaded locally
df = spark.read.parquet("part0.parquet")

df = df.union(sqlContext.read.parquet("part1.parquet")) \
  .union(sqlContext.read.parquet("part2.parquet")) \
  .union(sqlContext.read.parquet("part3.parquet")) 

In [4]:
# View basic data behavior for validation
print("Number of data:\n{0}\n".format(df.count()))
print("Data Schema:")
print(df.printSchema())

Number of data:
1303198

Data Schema:
root
 |-- server_ts: long (nullable = true)
 |-- e_n: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- device: string (nullable = true)

None


In [5]:
# View first few lines of data to get some feeling...
df.show(5)

+-------------+------------------+--------+-------+
|    server_ts|               e_n|     uid| device|
+-------------+------------------+--------+-------+
|1526835750863| {"article_id":57}|e9b0971f| mobile|
|1526835751136|{"article_id":849}|b808aba2| mobile|
|1526835751188|{"article_id":871}|27f07360| mobile|
|1526835751215|{"article_id":150}|2abe5349|desktop|
|1526835751211|{"article_id":806}|338d71ea|desktop|
+-------------+------------------+--------+-------+
only showing top 5 rows



# 2. Data Validation and Cleaning (if needed)

## Validation
- Check for any null value
- Verifiy number of unique values in "device"

## Cleansing
- Clean any row with null value (optional)
- Change server_ts: long -> datetime
- Change en: json_string -> string

In [6]:
# Data validation on null value
df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show()

+---------+---+---+------+
|server_ts|e_n|uid|device|
+---------+---+---+------+
|        0|  0|119|     0|
+---------+---+---+------+



In [7]:
# """
# uid empty sounds be acceptable anyway?...
# 
# Prepared cleaning but optional
# """
# df = df.filter(df['uid'].isNotNull())
# df.select([
#     count(when(col(c).isNull(), c)).alias(c)
#     for c in df.columns
# ]).show()

In [8]:
# Validating distinct value of devices...
df.select('device').distinct().show()

+-------+
| device|
+-------+
|desktop|
| mobile|
+-------+



In [9]:
# Change event time time
gmt_timezone = tz.gettz('GMT')
hk_timezone = tz.gettz('Asia/Hong_Kong')

def long_to_datetime(time_long):
    return datetime.fromtimestamp(float(time_long)/1000) \
      .replace(tzinfo=hk_timezone) \
      .astimezone(gmt_timezone) \
      .strftime('%Y-%m-%d %H:%M:%S.%f')

# Validate if function work as expected!
long_to_datetime(1526835750863)

'2018-05-20 17:02:30.863000'

In [10]:
# Apply long_to_datetime as a UDF...
long_to_datetime_udf = udf(long_to_datetime, StringType())
df = df.withColumn(
    'server_dt', 
    long_to_datetime_udf(df['server_ts'])
)

In [11]:
# Apply json_tuple in e_n. json_tuple is useful!
df = df.withColumn(
    'article_id',
    json_tuple(df['e_n'], 'article_id')
)

In [12]:
# Double Check if any null values for article_id...
df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show()

+---------+---+---+------+---------+----------+
|server_ts|e_n|uid|device|server_dt|article_id|
+---------+---+---+------+---------+----------+
|        0|  0|119|     0|        0|        31|
+---------+---+---+------+---------+----------+



In [13]:
# Ouch ...
df.filter(df['article_id'].isNull()).show(5)

+-------------+---+--------+------+--------------------+----------+
|    server_ts|e_n|     uid|device|           server_dt|article_id|
+-------------+---+--------+------+--------------------+----------+
|1526829577338| {}|74143400|mobile|2018-05-20 15:19:...|      null|
|1526837193820| {}|577cbbc9|mobile|2018-05-20 17:26:...|      null|
|1526823633405| {}|656e375c|mobile|2018-05-20 13:40:...|      null|
|1526824766497| {}|7c0a3a8b|mobile|2018-05-20 13:59:...|      null|
|1526830116875| {}|69b067fe|mobile|2018-05-20 15:28:...|      null|
+-------------+---+--------+------+--------------------+----------+
only showing top 5 rows



In [14]:
# Better remove again
# df = df.filter(df['article_id'].isNotNull())
# df.filter(df['article_id'].isNull()).show(5)

# 3. Basic View Analysis
Doing Question 1

In [15]:
# 1a)
print("1a): {0}".format(df.count()))

1a): 1303198


In [16]:
# 1b)
print("1b):\n")
df.groupBy('article_id') \
      .count() \
      .orderBy('count', ascending=False) \
      .show(1)

1b):

+----------+-----+
|article_id|count|
+----------+-----+
|       827|49249|
+----------+-----+
only showing top 1 row



In [17]:
# 1c)
df = df.withColumn(
    'hour',
    lpad(hour(df['server_dt']), 2, '0')
)

In [18]:
# Double Group by as preparation ...
grouped_df = df.groupBy('hour', 'article_id').count()

# Setup partitionBy-window ...
hour_window = Window.partitionBy(grouped_df['hour']).orderBy(grouped_df['count'].desc())

# Rank over window and save as temporary column ...
grouped_df = grouped_df.withColumn(
     'rank',
    rank().over(hour_window)
)

# Filter and show
print("1c):\n")
grouped_df.filter(grouped_df['rank'] == 1) \
      .select('hour', 'article_id', 'count') \
      .orderBy('hour').show(24)

1c):

+----+----------+-----+
|hour|article_id|count|
+----+----------+-----+
|  00|       830| 3105|
|  01|       155| 4933|
|  02|       830| 2914|
|  03|       374| 5404|
|  04|       374| 6139|
|  05|       897| 4418|
|  06|       813| 3214|
|  07|       740| 2517|
|  08|       712| 3045|
|  09|       712| 1937|
|  10|       827| 6702|
|  11|       827| 7198|
|  12|       827| 6428|
|  13|       827| 5518|
|  14|       827| 4806|
|  15|        67| 6942|
|  16|       827| 3139|
|  17|       871| 2274|
|  18|       827| 1072|
|  19|       827|  696|
|  20|       827|  391|
|  21|       827|  430|
|  22|        85| 1636|
|  23|        85| 4229|
+----+----------+-----+



In [19]:
# 3.4 AVG / MEDIAN for user

# Sort for median
df_grp_by_uid = df.groupBy('uid').count().orderBy("count")

print("1d):\n")
# Average
print("Average:\n")
df_grp_by_uid.agg({
    'count': 'avg'
}).show()

# Approximate Median
print("Median:\n")
df_grp_by_uid.approxQuantile("count", [0.5], 0.25)

1d):

Average:

+----------------+
|      avg(count)|
+----------------+
|2.39954557339137|
+----------------+

Median:



[1.0]

# 4. Session Analysis
Doing question 2

In [20]:
# Define session generating function
SESSION_LIMIT = 30 * 60 * 1000
def session_formation(ts_list):
    ts_list = sorted(list(ts_list))
    
    session_length_list = []
    session_start_temp = 0
    
    for ts in ts_list:
        session_diff = ts - session_start_temp
    
        # When new session is found...
        if session_diff > SESSION_LIMIT:
            session_start_temp = ts
            session_length_list.append(0)
        
        # else, Extending session length 
        else:
            session_length_list[-1] = session_diff
        
    # Generating session index at the same time...
    return [(index + 1, session) for index, session in enumerate(session_length_list)]


# Rough validating if function work as expected!
session_formation([1526782447295, 1526782500043, 1526793444112, 1526793657771, 1526793877913, 1526797801606])

[(1, 52748), (2, 433801), (3, 0)]

In [21]:
# groupByKey and flatMapValues looks more clean
session_df = df.rdd.map(lambda x: (x['uid'], x['server_ts'])) \
      .groupByKey() \
      .flatMapValues(session_formation) \
      .map(lambda x: (x[0], x[1][0], x[1][1])) \
      .toDF(['uid', 'session_id', 'session_length'])

session_df.show()

+--------+----------+--------------+
|     uid|session_id|session_length|
+--------+----------+--------------+
|600cc101|         1|             0|
|f2b99545|         1|         95475|
|3f277bd6|         1|             0|
|7f08a3d3|         1|             0|
|9bb3e690|         1|             0|
|504d03bc|         1|             0|
|550c8d71|         1|             0|
|2a53a1f4|         1|             0|
|e508c8b1|         1|             0|
|809cece6|         1|             0|
|d168378f|         1|             0|
|d6fcc9da|         1|        191104|
|7511ed8f|         1|             0|
|a84893d5|         1|             0|
|149eb5a6|         1|        827957|
|149eb5a6|         2|           233|
|f6965068|         1|             0|
|a13d2299|         1|             0|
|e47b2288|         1|             0|
|e5e8baa8|         1|             0|
+--------+----------+--------------+
only showing top 20 rows



In [22]:
# 2a
print("2a): {0}".format(session_df.count()))

2a): 774587


In [23]:
# 2b
print("2b):\n")
session_df.groupBy('uid') \
      .count() \
      .orderBy('count', ascending=False) \
      .show(1)

2b):

+--------+-----+
|     uid|count|
+--------+-----+
|c82275f3|   39|
+--------+-----+
only showing top 1 row



In [24]:
# 4.3 AVG / MEDIAN for session 

print("2c):\n")
# Average
print("Average:\n")
session_df.agg({
    'session_length': 'avg'
}).show()

# Approximate Median
print("Median:\n")
session_df.orderBy('session_length') \
      .approxQuantile("session_length", [0.5], 0.25)

2c):

Average:

+-------------------+
|avg(session_length)|
+-------------------+
| 144024.08804304746|
+-------------------+

Median:



[0.0]