In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import gzip

spark = SparkSession.builder.appName('LogAnalyzer').getOrCreate()

# Set column names
col_names = ["timestamp", "elb", "client:port", "backend:port", "request_processing_time", "backend_processing_time",
                "response_processing_time", "elb_status_code", "backend_status_code", "received_bytes", "sent_bytes", 
                "request", "user_agent", "ssl_cipher", "ssl_protocol"]

# Import data
df = pd.read_csv('data/2015_07_22_mktplace_shop_web_log_sample.log.gz', delim_whitespace=True, names=col_names, compression='gzip')

# Ensure record count is consistent to file
print(df.count())

timestamp                   1158500
elb                         1158500
client:port                 1158500
backend:port                1158500
request_processing_time     1158500
backend_processing_time     1158500
response_processing_time    1158500
elb_status_code             1158500
backend_status_code         1158500
received_bytes              1158500
sent_bytes                  1158500
request                     1158500
user_agent                  1158495
ssl_cipher                  1158500
ssl_protocol                1158500
dtype: int64


In [2]:
from pyspark.sql.functions import to_timestamp

# Create df of features needed for analysis
df_sessions = df.loc[:,[ "client:port", "request", "timestamp"]]

# Strip out port from client:port to create "unique user IP" 
df_sessions["client_ip"] = df_sessions["client:port"].str.split(":").str[0]
df_sessions.drop("client:port", axis=1, inplace=True)

# Format timestamp from ISO-8601 string to datetime format 
df_sessions["timestamp"] = pd.to_datetime(df_sessions["timestamp"])

# Sort data (required later)
df_sessions = df_sessions.sort_values(by="timestamp")

In [3]:
from pyspark.sql.types import *

# Prepare schema for temp table
fields = [StructField("request", StringType(), True), StructField("timestamp", TimestampType(), True), StructField("client_ip", StringType(), True) ]
schema = StructType(fields)
# Apply schema
sparkDfSessions = spark.createDataFrame(df_sessions, schema)
sparkDfSessions.registerTempTable("weblogSessions")

In [4]:
spark.sql('SELECT * FROM weblogSessions').show()

+--------------------+--------------------+---------------+
|             request|           timestamp|      client_ip|
+--------------------+--------------------+---------------+
|GET https://paytm...|2015-07-21 22:40:...| 106.51.235.133|
|GET https://paytm...|2015-07-21 22:40:...| 115.250.16.146|
|GET https://paytm...|2015-07-21 22:40:...| 106.51.235.133|
|GET https://paytm...|2015-07-21 22:40:...|   52.74.219.71|
|GET https://paytm...|2015-07-21 22:40:...|  27.97.124.172|
|GET https://paytm...|2015-07-21 22:40:...| 106.78.125.179|
|GET https://paytm...|2015-07-21 22:40:...|   112.79.36.98|
|GET https://paytm...|2015-07-21 22:40:...|  119.81.61.166|
|GET https://paytm...|2015-07-21 22:40:...|117.197.179.139|
|GET https://paytm...|2015-07-21 22:40:...|    1.39.14.113|
|POST https://payt...|2015-07-21 22:40:...| 49.206.246.124|
|GET https://paytm...|2015-07-21 22:40:...|  119.81.61.166|
|GET https://paytm...|2015-07-21 22:40:...|  119.81.61.166|
|GET https://paytm...|2015-07-21 22:40:.

In [5]:
# Determine the previous timestamp entry by using LAG window func as 
# data grouped by client_ip as timestamp in ascending order

weblogSessions_timestamps_df = spark.sql("""
    SELECT client_ip, request, timestamp, 
        LAG(timestamp) OVER (PARTITION BY client_ip ORDER BY timestamp) AS previous_timestamp
    FROM weblogSessions""")

weblogSessions_timestamps_df.registerTempTable("weblogSessions_timestamps")
spark.sql('select client_ip, timestamp, previous_timestamp from weblogSessions_timestamps').show(truncate=False)

+------------+--------------------------+--------------------------+
|client_ip   |timestamp                 |previous_timestamp        |
+------------+--------------------------+--------------------------+
|1.186.143.37|2015-07-22 12:14:36.308131|null                      |
|1.186.143.37|2015-07-22 12:14:44.846873|2015-07-22 12:14:36.308131|
|1.187.164.29|2015-07-21 22:43:31.135201|null                      |
|1.187.164.29|2015-07-21 22:43:42.735304|2015-07-21 22:43:31.135201|
|1.187.164.29|2015-07-21 22:43:47.685225|2015-07-21 22:43:42.735304|
|1.187.164.29|2015-07-21 22:44:19.075268|2015-07-21 22:43:47.685225|
|1.187.164.29|2015-07-21 22:44:20.084871|2015-07-21 22:44:19.075268|
|1.187.164.29|2015-07-21 22:44:21.995557|2015-07-21 22:44:20.084871|
|1.187.164.29|2015-07-21 22:44:23.254646|2015-07-21 22:44:21.995557|
|1.187.164.29|2015-07-21 22:44:23.295011|2015-07-21 22:44:23.254646|
|1.187.164.29|2015-07-21 22:44:40.245266|2015-07-21 22:44:23.295011|
|1.22.41.76  |2015-07-22 12:42:59.

# Part 1. Sessionize the web log by IP.

In [6]:
# Calculated using current - previous timestamp within unique client_ip 
# and continuous sesion distinguished via session threshold
# using 15 mins as session interval (can try standard 30 mins intervals also)

weblogSessions_sessions_df = spark.sql("""    
    SELECT *,
        CASE 
            WHEN unix_timestamp(timestamp) - unix_timestamp(previous_timestamp) >= (60 * 15) 
            OR previous_timestamp IS NULL
        THEN 1 ELSE 0 END AS is_new_session
    FROM weblogSessions_timestamps""")
weblogSessions_sessions_df.registerTempTable("weblogSessions_sessions")
# spark.sql("select client_ip, timestamp, previous_timestamp, is_new_session from weblogSessions_sessions").show(truncate=False)

In [1]:
# Create id per sessions (running total of is_new_session)

weblogSessions_sessionIds_df = spark.sql("""    
    SELECT *, SUM(is_new_session) OVER (PARTITION BY client_ip ORDER BY timestamp) AS session_id
    FROM weblogSessions_sessions
    """)
weblogSessions_sessionIds_df.registerTempTable("weblogSessions_sessionIds")

NameError: name 'spark' is not defined

# Part 2. Determine the average session time.

In [8]:
# Calculate the total session time across all sessions (based on client_ip and session_id)
# Sum the total number of sessions (with threshold 15 mins) per client_ip
# Divide total session time by total number of sessions for avg session time.

spark.sql("""
SELECT 
    SUM(total_session_time_per_ip) AS amount_of_time_in_seconds, 
    SUM(num_of_sessions_per_ip) AS num_of_sessions,
    (SUM(total_session_time_per_ip) / SUM(num_of_sessions_per_ip)) / 60 AS average_session_time_in_mins
FROM (
    SELECT client_ip, MAX(session_id) AS num_of_sessions_per_ip, SUM(session_time) AS total_session_time_per_ip
    FROM (
        SELECT client_ip, session_id, MAX(unix_timestamp(timestamp)) - MIN(unix_timestamp(timestamp)) AS session_time
        FROM weblogSessions_sessionIds
        GROUP BY client_ip, session_id 
        ORDER BY client_ip, session_id
    ) grouped_sessions
    GROUP BY client_ip
) average_sessions
""").show()


+-------------------------+---------------+----------------------------+
|amount_of_time_in_seconds|num_of_sessions|average_session_time_in_mins|
+-------------------------+---------------+----------------------------+
|                 11161579|         110841|          1.6783168382337463|
+-------------------------+---------------+----------------------------+



# Part 3. Determine unique URL visits per session.

In [9]:
# Determine distinct URL requests by clients per session.
# Number of unique requests per client divided by the total number of sessions per client is unique URL per session

spark.sql("""    
    SELECT
        client_ip, MAX(session_id) AS total_sessions_per_ip, COUNT(request) AS total_unique_visits,
        COUNT(request) / MAX(session_id) AS unique_visits_per_session
    FROM (
        SELECT DISTINCT client_ip, session_id, request
        FROM weblogSessions_sessionIds  
    ) sessions
    GROUP BY client_ip
    """).show()

+------------+---------------------+-------------------+-------------------------+
|   client_ip|total_sessions_per_ip|total_unique_visits|unique_visits_per_session|
+------------+---------------------+-------------------+-------------------------+
|1.186.143.37|                    1|                  2|                      2.0|
|1.187.164.29|                    1|                  8|                      8.0|
|  1.22.41.76|                    1|                  5|                      5.0|
| 1.23.208.26|                    2|                  6|                      3.0|
| 1.23.36.184|                    1|                  4|                      4.0|
|   1.38.19.8|                    1|                  1|                      1.0|
|  1.38.20.34|                    1|                 14|                     14.0|
|  1.39.13.13|                    1|                  2|                      2.0|
| 1.39.32.249|                    2|                  6|                      3.0|
|  1

# Part 4. Find the most engaged users, ie the IPs with the longest session times

In [10]:
# Group by client_ip and their sessions, and calculate the longest session they held

spark.sql("""
    SELECT client_ip, session_id, (MAX(unix_timestamp(timestamp)) - MIN(unix_timestamp(timestamp))) / 60 AS session_time_in_mins
    FROM weblogSessions_sessionIds
    GROUP BY client_ip, session_id
    ORDER BY session_time_in_mins DESC
""").show()


+---------------+----------+--------------------+
|      client_ip|session_id|session_time_in_mins|
+---------------+----------+--------------------+
|  119.81.61.166|         5|  34.483333333333334|
|   52.74.219.71|         5|  34.483333333333334|
|  106.186.23.95|         5|  34.483333333333334|
|   125.20.39.66|         4|   34.46666666666667|
|   125.19.44.66|         5|   34.46666666666667|
| 180.211.69.209|         4|               34.45|
|   192.8.190.10|         3|               34.45|
|  54.251.151.39|         5|               34.45|
| 180.179.213.70|         5|   34.43333333333333|
|  122.15.156.64|         3|   34.43333333333333|
| 203.191.34.178|         2|   34.43333333333333|
| 203.189.176.14|         5|   34.43333333333333|
| 180.151.80.140|         4|  34.416666666666664|
| 125.16.218.194|         1|  34.416666666666664|
| 103.29.159.138|         1|  34.416666666666664|
|213.239.204.204|         2|  34.416666666666664|
|    78.46.60.71|         1|                34.4|


# Additional Notes:

IP addresses do not guarantee distinct users, but this is the limitation of the data. As a bonus, consider what additional data would help make better analytical conclusions.

There are additional methods for identifying visitors, sessions and paths:

1. IP-Agent, in addition to IP address you can use user-agent (browser) information to determine unique sessions. This will help detect multiple users from one IP address. 

2. Username data if the website is configured to require authentication.

3. Session ID, if found in URL query or stored in a Cookie can result in more accurate visitor session and path analysis.

In addition, we can also filter entries which do not add value to the analytics. 
Requests that are made by automatic activity such as bots can be filtered.
Or some requests for components of the webpage such as to images may not be relevant.