## PayPay Data Engineer Challenge - Raja

In [1]:
# import Necessary Modules
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.window import Window


import pandas as pd
import re

import warnings
warnings.filterwarnings('ignore')

import shutup
shutup.please()

In [2]:
# Create Spark Session
spark.stop()

spark = SparkSession.Builder() \
    .appName('payPayApp') \
    .master('local[2]') \
    .config('spark.scheduler.mode', 'FAIR') \
    .config('spark.sql.shuffle.partitions', 4) \
    .getOrCreate()

spark

22/05/29 15:05:35 WARN FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.


In [3]:
# Read Log Data

logFilePath = "../data/2015_07_22_mktplace_shop_web_log_sample.log.gz"
df = spark.read.text(logFilePath)
df.printSchema()

root
 |-- value: string (nullable = true)



In [4]:
# Reg Ex Pattern and parse analytical data

v_new_regex = """(\S+) (\S+) (\S+):(\d+) (\S+):(\d+) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) (\d+) (\d+) "([^ ]*) ([^ ]*) (- |[^ ]*)" (\"[^\"]*\") ([A-Z0-9-]+) ([A-Za-z0-9.-]*)$"""


# Function to parse each line with RegEx Pattern
def fnParseLogLineValue(logline, pattern=v_new_regex):
    value = logline.value
    match = re.search(pattern, value)
    
    parsedValues = []
    if match is None:
        parsedValues =  [None for i in range(1,20)]
    else:
        parsedValues = [match.group(i).strip() for i in range(1,20)]
    
    returnValue = tuple(i for i in parsedValues) 
    return returnValue

In [5]:
# Parsing Values
# ts:String,
# domain_nm:String,
# fe_ip:String,
# bcknd_ip:String,
# fe_prcss_time:Double,
# bcknd_prcss_time:Double,
# clnt_resp_tm:Double,
# fe_resp_cd:String,
# bcknd_resp_cd:String,
# received_bytes:Long,
# sent_bytes:Long,
# rq_type:String,
# url:String,
# protocol:String,
# usr_agnt:String,
# ssl1:String,
# ssl2:String,

#Additional cols

# delta : time_spend
# flag: unique_ssn_flg
# sessionid: ssn_id

col_names = ['ts', 'domain_nm', 'fe_ip', 'fe_port', 'bcknd_ip', 'bcknd_port', 'fe_prcss_time', 'bcknd_prcss_time', 'clnt_resp_tm', 'fe_resp_cd', 'bcknd_resp_cd', 'received_bytes', 'sent_bytes', 'rq_type', 'url', 'protocol', 'usr_agnt', 'ssl', 'ssl2']

df_parsed = df.rdd.map(fnParseLogLineValue)
df_parsed = df_parsed.toDF(col_names)

                                                                                

In [6]:
# Adding additional calculative columns

v_max_active_ssn_seconds = 900 # i.e. 15 mins
v_window_fn = Window.partitionBy("fe_ip", "bcknd_ip")
v_window_time_diff = v_window_fn.orderBy(asc("ts"))
v_window_session = v_window_fn.orderBy([asc("fe_ip"), asc("bcknd_ip"), asc("time_spend")])


pvs_ts = lag("ts",1).over(v_window_time_diff)                           
df_parsed = df_parsed.withColumn('ts', to_timestamp('ts')) \
    .withColumn('pvs_ts', pvs_ts) \
    .withColumn('time_spend', unix_timestamp('ts') - unix_timestamp('pvs_ts')).na.fill(0) \
    .withColumn('unique_ssn_flg', when(col('time_spend') < lit(v_max_active_ssn_seconds), lit(0)).otherwise(lit(1))) \
    .withColumn('ssn_id', sum(col('unique_ssn_flg')).over(v_window_session))


In [7]:
# Select only the columns needed for analytics

lst_of_cols = ['ts', "fe_ip", "bcknd_ip", 'pvs_ts', 'url', 'time_spend', 'unique_ssn_flg', 'ssn_id']
df_log = df_parsed.select(lst_of_cols)

df_log.to_pandas_on_spark().head()

22/05/29 15:05:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:05:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:06:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:06:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Unnamed: 0,ts,fe_ip,bcknd_ip,pvs_ts,url,time_spend,unique_ssn_flg,ssn_id
0,2015-07-22 16:17:06.893987,1.186.101.79,10.0.4.176,NaT,https://paytm.com:443/shop/summary/1116587591,0,0,0
1,2015-07-22 16:31:15.474375,1.186.101.79,10.0.4.176,2015-07-22 16:17:06.893987,https://paytm.com:443/shop/action,849,0,0
2,2015-07-22 16:17:01.782695,1.186.101.79,10.0.4.225,NaT,https://paytm.com:443/api/v1/expresscart/check...,0,0,0
3,2015-07-22 16:34:33.419972,1.186.101.79,10.0.4.225,2015-07-22 16:34:33.269296,https://paytm.com:443/shop/wallet/balance?chan...,0,0,0
4,2015-07-22 16:17:38.076241,1.186.101.79,10.0.4.225,2015-07-22 16:17:01.782695,https://paytm.com:443/shop/orderdetail/1116587...,37,0,0


### Log Analytics using Spark
#### 1.Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session.

In [8]:
# Group by IP and session ID and get distinct URLs


group_by_cols = ['fe_ip', 'ssn_id']
df_sessionise = df_log.groupBy(group_by_cols).agg(count('url').alias('ip_hits'))

df_sessionise.to_pandas_on_spark().head()

22/05/29 15:06:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:06:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Unnamed: 0,fe_ip,ssn_id,ip_hits
0,1.186.101.79,1,1
1,1.186.103.78,0,4
2,1.186.108.242,0,3
3,1.186.108.28,0,80
4,1.186.135.123,0,15


#### 2. Determine the average session time


In [9]:
group_by_cols = ['fe_ip', 'ssn_id']

# Group by Ip and Session ID and difference the max timestamp with min timestamp and get average across all
df_avg_ssn_tm = df_log.groupBy(group_by_cols).agg(max('ts').alias('max_ts'), min('ts').alias('min_ts')) \
            .withColumn('session_time', unix_timestamp('max_ts') - unix_timestamp('min_ts')) \
            .groupBy().agg(avg('session_time').alias('average_ssn_time'))

df_avg_ssn_tm.to_pandas_on_spark().head()

22/05/29 15:07:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Unnamed: 0,average_ssn_time
0,2910.048425


#### 3. Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.


In [10]:
group_by_cols = ['fe_ip', 'ssn_id']

# Group By IP and Session and Get Unique URL page
df_unique_url = df_log.groupBy(group_by_cols).agg(count_distinct('url').alias('unique_url_per_ssn'))

df_unique_url.to_pandas_on_spark().head()


22/05/29 15:07:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

Unnamed: 0,fe_ip,ssn_id,unique_url_per_ssn
0,1.186.108.242,0,3
1,1.186.108.28,0,80
2,1.186.135.123,0,11
3,1.186.146.89,0,8
4,1.186.180.183,0,4


#### 4.Find the most engaged users, ie the IPs with the longest session times


In [11]:
group_by_cols = ['fe_ip', 'ssn_id']

# Group By IP and session id and get Time difference of start and end of the session and get max of time Spend
df_long_ssn_ip = df_log.groupBy(group_by_cols).agg(max('ts').alias('max_ts'), min('ts').alias('min_ts')) \
            .withColumn('session_time', unix_timestamp('max_ts') - unix_timestamp('min_ts')) \
            .groupBy('fe_ip').agg(max('session_time').alias('long_ssn_time_per_ip'))

df_long_ssn_ip.to_pandas_on_spark().head()

22/05/29 15:07:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:07:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:08:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:08:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:08:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/29 15:08:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

Unnamed: 0,fe_ip,long_ssn_time_per_ip
0,1.186.103.78,9
1,1.186.108.242,3
2,1.186.108.28,6
3,1.186.146.89,114
4,1.186.180.183,17330
