<a href="https://colab.research.google.com/github/lakshayydua/WeblogChallenge/blob/master/WebLogAnalytics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

WITH DICTIONARY

In [0]:
# To set up Pyspark environment in Colab notebook
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WebLogAnalytics").master("local[*]").getOrCreate()

In [0]:
# spark env configuration
sc = spark.sparkContext
print("LocalProperty: ", sc.getLocalProperty)
print("defaultParallelism: ", sc.defaultParallelism)
print("ExecutorMemoryStatus: ", sc._jsc.sc().getExecutorMemoryStatus().size())

('LocalProperty: ', <bound method SparkContext.getLocalProperty of <SparkContext master=local[*] appName=WebLogAnalytics>>)
('defaultParallelism: ', 2)
('ExecutorMemoryStatus: ', 1)


In [0]:
from pyspark.sql import SparkSession, types, Window
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.functions import split, explode, struct
from datetime import datetime, date, timedelta
import pandas as pd

In [0]:
# creating a pyspark.sql.dataframe.DataFrame object
explicit_schema = types.StructType([
    types.StructField("timestamp", types.TimestampType(), True),
    types.StructField("elb", types.StringType(), True),
    types.StructField("client_port", types.StringType(), True),
    types.StructField("backend_port", types.StringType(), True),
    types.StructField("request_processing_time", types.DoubleType(), True),
    types.StructField("backend_processing_time", types.DoubleType(), True),
    types.StructField("response_processing_time", types.DoubleType(), True),
    types.StructField("elb_status_code", types.IntegerType(), True),
    types.StructField("backend_status_code", types.IntegerType(), True),
    types.StructField("received_bytes", types.DoubleType(), True),
    types.StructField("sent_bytes", types.DoubleType(), True),
    types.StructField("request", types.StringType(), True),
    types.StructField("user_agent", types.StringType(), True),
    types.StructField("ssl_cipher", types.StringType(), True),
    types.StructField("ssl_protocol", types.StringType(), True)
])

log_df = spark.read.format("csv") \
              .schema(explicit_schema) \
              .option("header", "false") \
              .option("delimiter", " ") \
              .load("/content/data/")

In [0]:
log_df.show()
log_df.createOrReplaceTempView('log_df')

+--------------------+----------------+--------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+--------------+----------+--------------------+--------------------+--------------------+------------+
|           timestamp|             elb|         client_port| backend_port|request_processing_time|backend_processing_time|response_processing_time|elb_status_code|backend_status_code|received_bytes|sent_bytes|             request|          user_agent|          ssl_cipher|ssl_protocol|
+--------------------+----------------+--------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+--------------+----------+--------------------+--------------------+--------------------+------------+
|2015-07-22 09:00:...|marketpalce-shop|123.242.248.130:5...|10.0.6.158:80|                 2.2E-5|               0.026109|                  2.

## **Processing & Analytical Goals**

In [0]:
log_df_summary = spark.sql("""
        SELECT COUNT(client_port) total_records_count, 
        COUNT(DISTINCT CONCAT(client_port, '_', user_agent)) unique_visitor_count, 
        MIN(timestamp) min_timestamp, 
        MAX(timestamp) max_timestamp
        FROM log_df
        """)

log_df_summary.show(truncate=False)

+-------------------+--------------------+-----------------------+-----------------------+
|total_records_count|unique_visitor_count|min_timestamp          |max_timestamp          |
+-------------------+--------------------+-----------------------+-----------------------+
|1158500            |413200              |2015-07-22 02:40:16.121|2015-07-22 21:27:03.632|
+-------------------+--------------------+-----------------------+-----------------------+



In [0]:
derived_df = spark.sql("""
                      SELECT CONCAT(client_port, '_', user_agent) visitor_id,
                      request,
                      timestamp
                      FROM log_df
                      """)

derived_df = derived_df.repartition("visitor_id").orderBy("timestamp")
derived_df.createOrReplaceTempView('derived_df')
derived_df.show()

+--------------------+--------------------+--------------------+
|          visitor_id|             request|           timestamp|
+--------------------+--------------------+--------------------+
|119.81.61.166:460...|GET https://paytm...|2015-07-22 02:40:...|
|1.39.62.163:34315...|GET https://paytm...|2015-07-22 02:40:...|
|1.39.60.241:50549...|GET https://paytm...|2015-07-22 02:40:...|
|119.81.61.166:449...|GET https://paytm...|2015-07-22 02:40:...|
|117.221.185.222:5...|GET https://paytm...|2015-07-22 02:40:...|
|119.81.61.166:458...|GET https://paytm...|2015-07-22 02:40:...|
|119.81.61.166:456...|GET https://paytm...|2015-07-22 02:40:...|
|119.81.61.166:460...|GET https://paytm...|2015-07-22 02:40:...|
|27.6.176.251:4950...|POST https://payt...|2015-07-22 02:40:...|
|27.107.95.49:1266...|GET https://paytm...|2015-07-22 02:40:...|
|182.71.61.18:3977...|GET https://paytm...|2015-07-22 02:40:...|
|115.99.105.216:50...|GET https://paytm...|2015-07-22 02:40:...|
|54.169.106.125:45...|GET

In [0]:
visitor_min_timestamp = spark.sql("""
                                  SELECT visitor_id,
                                  MIN(timestamp) last_session_timestamp
                                  FROM derived_df
                                  GROUP BY visitor_id
                                  """)

visitor_min_timestamp.show()

+--------------------+----------------------+
|          visitor_id|last_session_timestamp|
+--------------------+----------------------+
|70.39.187.150:378...|  2015-07-22 02:40:...|
|117.241.18.31:537...|  2015-07-22 02:41:...|
|119.81.61.166:516...|  2015-07-22 02:41:...|
|106.186.23.95:411...|  2015-07-22 02:41:...|
|1.39.34.167:63679...|  2015-07-22 02:41:...|
|54.169.0.163:4366...|  2015-07-22 02:42:...|
|119.81.61.166:423...|  2015-07-22 02:42:...|
|54.169.79.101:409...|  2015-07-22 02:42:...|
|115.31.129.114:32...|  2015-07-22 02:42:...|
|119.81.61.166:592...|  2015-07-22 02:42:...|
|119.81.61.166:356...|  2015-07-22 02:43:...|
|119.81.61.166:473...|  2015-07-22 02:43:...|
|115.250.119.245:3...|  2015-07-22 02:44:...|
|119.81.61.166:373...|  2015-07-22 02:44:...|
|14.141.251.102:50...|  2015-07-22 02:45:...|
|106.76.232.9:5988...|  2015-07-22 02:45:...|
|1.39.40.16:4497_M...|  2015-07-22 02:45:...|
|182.71.128.138:42...|  2015-07-22 02:45:...|
|119.81.61.166:456...|  2015-07-22

In [0]:
visitor_min_timestamp_dict = visitor_min_timestamp  \
                                    .toPandas() \
                                    .set_index('visitor_id')['last_session_timestamp'] \
                                    .to_dict()

In [0]:
visitor_session_dict = {}

for visitor_key in visitor_min_timestamp_dict.keys():
  visitor_session_dict[visitor_key] = 0

In [0]:
# Inactivity threshold is set to 15 minutes or 15*60*1000 milliseconds
THRESHOLD = 15*60*1000
    
def square_array_right(visitor_record):
    visitor_id = visitor_record[0]
    timestamp = visitor_record[1]

    if timestamp > visitor_min_timestamp_dict[visitor_id] + timedelta(milliseconds=THRESHOLD):
      visitor_session_dict[visitor_id] = visitor_session_dict[visitor_id] + 1
    
    visitor_min_timestamp_dict[visitor_id] = timestamp

    return visitor_session_dict[visitor_id]

spark_square_array_right = udf(square_array_right, types.IntegerType())

In [0]:
zz = derived_df.withColumn('visitor_session_id', spark_square_array_right(struct('visitor_id','timestamp')))

In [0]:
zz.show()
zz.createOrReplaceTempView("zz")


+--------------------+--------------------+--------------------+------------------+
|          visitor_id|             request|           timestamp|visitor_session_id|
+--------------------+--------------------+--------------------+------------------+
|119.81.61.166:460...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|1.39.62.163:34315...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|1.39.60.241:50549...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|119.81.61.166:449...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|117.221.185.222:5...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|119.81.61.166:458...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|119.81.61.166:456...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|119.81.61.166:460...|GET https://paytm...|2015-07-22 02:40:...|                 0|
|27.6.176.251:4950...|POST https://payt...|2015-07-22 02:40:...|            

In [0]:
zz.orderBy("visitor_session_id", ascending=False).show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+-----------------------+------------------+
|visitor_id                                                                                                                        |request                                                                                       |timestamp              |visitor_session_id|
+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+-----------------------+------------------+
|203.191.34.178:10400_Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36|GET https://paytm.com:443/shop/cart?channel=web&version

In [0]:
n = spark.sql("""select visitor_id, count(*), max(visitor_session_id) from zz group by visitor_id""")
n.show()

KeyboardInterrupt: ignored

In [0]:
# new_df = spark.sql("""SELECT * 
#                       FROM agg_timestamp_df 
#                       WHERE unique_visitor = '119.81.61.166:45641_"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/534.55.3 (KHTML, like Gecko) Version/5.1.3 Safari/534.53.10"'
#                   """)

# new_df.show(truncate=False)


new_df = spark.sql("""SELECT * 
                      FROM log_df 
                      WHERE request like '%,%'
                  """)

new_df.show(truncate=False)

In [0]:
# Inactivity threshold is set to 15 minutes or 15*60*1000 milliseconds
THRESHOLD = 15*60*1000

pandas_udf("string", PandasUDFType.SCALAR)
 
def calculate_session_count_udf(visitor_timestamps):
  
  timestamp_list = visitor_timestamps

  last_active = timestamp_list[0]
  session_count = 1
  
  # early exit for a visitor with only one timestamp
  if len(timestamp_list) == 1:
    return session_count

  for timestamp in timestamp_list[1:]:
    if timestamp < last_active + timedelta(milliseconds=THRESHOLD):
      last_active = timestamp
    else:
      last_active = timestamp
      session_count = session_count + 1

  return session_count
    
spark.udf.register("calculate_session_count", calculate_session_count_udf)  

result_df = spark.sql("""SELECT visitor_id, timestamp_count, 
                         calculate_session_count(timestamp_list) visitor_session_count 
                         FROM agg_timestamp_df 
                         ORDER BY visitor_session_count DESC, timestamp_count DESC
                         """)

result_df.show(truncate=False)

## **1. Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session**

## **2. Determine the average session time**

## **3. Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session**

## **4. Find the most engaged users, ie the IPs with the longest session times**