In [1]:
from pathlib import Path
import os
import sys
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


CURRENT_DIR = Path(os.path.dirname(''))
UTILS_DIR = CURRENT_DIR / '../utils'
DATA_DIR = CURRENT_DIR / '../data'
# add UTILS_DIR to system path so we can use it
sys.path.append(UTILS_DIR.absolute().as_posix())


In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().set("spark.executor.memory", "4g").set('spark.driver.memory', '10g').\
                    setMaster('local[20]').setAppName('WebLogAnalysis')
sc = SparkContext(conf = conf)

# import spark.sql
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [3]:
# unpack the data if not already done
comp_data_path = DATA_DIR / '2015_07_22_mktplace_shop_web_log_sample.log.gz'
comp_data_path_str = comp_data_path.absolute().as_posix()
! gunzip $comp_data_path_str
data_path = (DATA_DIR / '2015_07_22_mktplace_shop_web_log_sample.log').absolute().as_posix()

In [4]:
column_names = ['timestamp', 'elb', 'client_port', 'backend_port', 'request_processing_time',
               'backend_processing_time', 'response_processing_time', 'elb_status_code',
                'backend_status_code', 'received_bytes', 'sent_bytes', 'request',
                'user_agent', 'ssl_cipher', 'ssl_protocol'
               ]
colname_to_idx = dict(zip(column_names, range(len(column_names))))

from pyspark.sql.types import *

schema = StructType([
    StructField('timestamp', TimestampType(), False),
    StructField('elb', StringType(), False),
    StructField('client_port', StringType(), False),
    StructField('backend_port', StringType(), False),
    StructField('request_processing_time', DoubleType(), False),
    StructField('backend_processing_time', DoubleType(), False),
    StructField('response_processing_time', DoubleType(), False),
    StructField('elb_status_code', StringType(), False),
    StructField('backend_status_code', StringType(), False),
    StructField('received_bytes', IntegerType(), False),
    StructField('sent_bytes', IntegerType(), False),
    StructField('request', StringType(), False),
    StructField('user_agent', StringType(), False),
    StructField('ssl_cipher', StringType(), False),
    StructField('ssl_protocol', StringType(), False)
])

In [5]:
import dateutil.parser
import re

def parse_line(line):

    pattern = r'^(.*?)"(.*?)" "(.*?)"(.*?)$'
    parts = re.findall(pattern,line)[0]
    all_data_fields = parts[0].strip().split() + [parts[1].strip()] + [parts[2].strip()] + parts[3].strip().split()

    # convert timestamp to datetime
    all_data_fields[colname_to_idx['timestamp']] = dateutil.parser.parse(all_data_fields[colname_to_idx['timestamp']])
    # convert request_processing_time, backend_processing_time, response_processing_time to float
    all_data_fields[colname_to_idx['request_processing_time']] = float(all_data_fields[colname_to_idx['request_processing_time']])
    all_data_fields[colname_to_idx['backend_processing_time']] = float(all_data_fields[colname_to_idx['backend_processing_time']])
    all_data_fields[colname_to_idx['response_processing_time']] = float(all_data_fields[colname_to_idx['response_processing_time']])
    # convert bytes to integer
    all_data_fields[colname_to_idx['received_bytes']] = int(all_data_fields[colname_to_idx['received_bytes']])
    all_data_fields[colname_to_idx['sent_bytes']] = int(all_data_fields[colname_to_idx['sent_bytes']])    
    
    return all_data_fields


In [6]:
raw_text_file = sc.textFile(data_path)
parts = raw_text_file.map(lambda line: parse_line(line))
raw_data_df = spark.createDataFrame(parts, schema)


In [7]:
raw_data_df.count()

1158500

# Sessionize dataset
Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session

In [54]:
import pyspark.sql as pysql
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [55]:
# add IP to columns
fn = F.udf(lambda x:x.split(':')[0], StringType())
df_with_ip = raw_data_df.withColumn('IP', fn(raw_data_df.client_port))

In [56]:
# add URL to columns
def get_url_from_request(request):
    if request=='- - - ':
        return request
    else:
        return request.split()[1]

url_udf = F.udf(lambda x:get_url_from_request(x), StringType())
complete_df = df_with_ip.withColumn('URL', url_udf(df_with_ip.request))

In [57]:
# session time is set to 15 mins
session_time_seconds = 15*60

# define session window function!
time_diff_fn = pysql.Window.partitionBy('IP').orderBy('timestamp')

# add previous timestamp to each row
df_temp1 = complete_df.withColumn('prevtimestamp', F.lag('timestamp',1).over(time_diff_fn))

# compute the difference between timestamps for each row
df_temp2 = df_temp1.withColumn('diff',\
                 F.when(F.isnull(F.unix_timestamp(df_temp1.timestamp)-F.unix_timestamp(df_temp1.prevtimestamp)),0)\
                 .otherwise(F.unix_timestamp(df_temp1.timestamp)-F.unix_timestamp(df_temp1.prevtimestamp)))

# set flag for each row if new session is detected!
df_temp3 = df_temp2.withColumn('is_new_session', F.when(df_temp2.diff > 15*60, 1).otherwise(0))

# create a session_id column for each user (IP). We need to create a window to look at all rows (for each IP)
# from the beggining till the current row. Note that rows are sorted by timestamp.
new_sess_window_fn = pysql.Window.partitionBy('IP').orderBy('timestamp').\
                  rowsBetween(pysql.Window.unboundedPreceding,pysql.Window.currentRow)

df_with_session_id = df_temp3.withColumn('session_id', F.sum('is_new_session').over(new_sess_window_fn))

In [11]:
df_with_session_id.show()

+--------------------+----------------+------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+--------------+----------+--------------------+--------------------+--------------------+------------+------------+--------------------+----+--------------+----------+
|           timestamp|             elb|       client_port| backend_port|request_processing_time|backend_processing_time|response_processing_time|elb_status_code|backend_status_code|received_bytes|sent_bytes|             request|          user_agent|          ssl_cipher|ssl_protocol|          IP|       prevtimestamp|diff|is_new_session|session_id|
+--------------------+----------------+------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+--------------+----------+--------------------+--------------------+--------------------+------------+------------+------

# Average session time
Determine the average session time

In [12]:
df_with_session_id.columns

['timestamp',
 'elb',
 'client_port',
 'backend_port',
 'request_processing_time',
 'backend_processing_time',
 'response_processing_time',
 'elb_status_code',
 'backend_status_code',
 'received_bytes',
 'sent_bytes',
 'request',
 'user_agent',
 'ssl_cipher',
 'ssl_protocol',
 'IP',
 'prevtimestamp',
 'diff',
 'is_new_session',
 'session_id']

In [58]:
truncated_df = df_with_session_id.select('IP','timestamp','session_id','diff')

In [70]:
user_session_window_fn = pysql.Window.partitionBy('IP','session_id')
temp = truncated_df.withColumn('session_time', F.sum('diff').over(user_session_window_fn))
user_session_time_df = temp.drop('diff','timestamp').drop_duplicates(['IP','session_id'])
user_session_time_df.agg({'session_time':'avg'}).collect()

[Row(avg(session_time)=2175.9683673929717)]

# Determine unique URL visits per session
Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.



In [61]:
url_df = df_with_session_id.select('IP','session_id','URL')

In [69]:
url_window_fn = pysql.Window.partitionBy('IP','session_id')
temp = url_df.withColumn('unique_urls', F.collect_set('URL').over(url_window_fn))
temp.withColumn('unique_url_count', F.size(temp.unique_urls)).show()

+------------+----------+--------------------+--------------------+----------------+
|          IP|session_id|                 URL|         unique_urls|unique_url_count|
+------------+----------+--------------------+--------------------+----------------+
|1.186.143.37|         0|https://paytm.com...|[https://paytm.co...|               2|
|1.186.143.37|         0|https://paytm.com...|[https://paytm.co...|               2|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co...|               8|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co...|               8|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co...|               8|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co...|               8|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co...|               8|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co...|               8|
|1.187.164.29|         0|https://paytm.com...|[https://paytm.co..

# Find the most engaged users
Find the most engaged users, ie the IPs with the longest session times

In [73]:
user_session_time_df.orderBy('session_time',ascending=False).show(40)

+---------------+----------+------------+
|             IP|session_id|session_time|
+---------------+----------+------------+
|   27.120.106.3|         1|       66299|
|117.255.253.155|         1|       57423|
|   14.139.69.64|         1|       57386|
| 98.230.153.173|         1|       57314|
|  103.24.125.26|         1|       57286|
| 150.228.40.140|         1|       57214|
|  66.249.82.186|         1|       57127|
| 37.228.107.126|         1|       55503|
| 168.235.194.47|         1|       55476|
|    1.39.63.157|         1|       55353|
|     1.39.35.89|         1|       55350|
|  163.47.14.170|         1|       55349|
|  163.47.12.254|         1|       55325|
| 107.167.109.55|         1|       55313|
|     1.39.32.67|         1|       55295|
|107.167.107.108|         1|       55293|
|    1.39.12.226|         1|       55289|
| 107.167.107.41|         1|       55187|
| 37.228.104.174|         1|       55183|
| 192.20.246.138|         1|       55163|
|    59.177.1.75|         1|      