### Import necessary libraries

In [19]:
from pyspark.sql import SparkSession

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorIndexer

#### For Model building and evaluation 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor

from pyspark.ml.evaluation import RegressionEvaluator



## Build the SparkSession


In [1]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Weblog_Analysis") \
    .config("spark.executor.memory", "1gb") \
    .getOrCreate()
sc = spark.sparkContext

## Load the data by creating RDD


In [2]:
rdd = sc.textFile('E:/paytm/WeblogChallenge-master/data/2015_07_22_mktplace_shop_web_log_sample.log')
# split the data into columns
rdd = rdd.map(lambda line: line.split(" "))

## Map the RDD to a DF 

In [3]:

mainDF = rdd.map(lambda line: Row(timestamp=line[0], ipaddress=line[2].split(':')[0],url=line[12])).toDF()
mainDF.show(10)

+---------------+--------------------+--------------------+
|      ipaddress|           timestamp|                 url|
+---------------+--------------------+--------------------+
|123.242.248.130|2015-07-22T09:00:...|https://paytm.com...|
|  203.91.211.44|2015-07-22T09:00:...|https://paytm.com...|
|    1.39.32.179|2015-07-22T09:00:...|https://paytm.com...|
| 180.179.213.94|2015-07-22T09:00:...|https://paytm.com...|
| 120.59.192.208|2015-07-22T09:00:...|https://paytm.com...|
| 117.239.195.66|2015-07-22T09:00:...|https://paytm.com...|
|  101.60.186.26|2015-07-22T09:00:...|https://paytm.com...|
|   59.183.41.47|2015-07-22T09:00:...|https://paytm.com...|
| 117.239.195.66|2015-07-22T09:00:...|https://paytm.com...|
|  183.83.237.83|2015-07-22T09:00:...|https://paytm.com...|
+---------------+--------------------+--------------------+
only showing top 10 rows



## convert timestamps from string to timestamp datatype


In [4]:
mainDF = mainDF.withColumn('timestamp', mainDF['timestamp'].cast(TimestampType()))

## sessionizing data based on 10 min fixed window time(threshold for a session) - Time Oriented approach
### assign an sessionId to each session

In [17]:
SessionDF = mainDF.select(window("timestamp", "10 minutes").alias('FixedTimeWindow'),'timestamp',"ipaddress").groupBy('FixedTimeWindow','ipaddress').count().withColumnRenamed('count', 'NumberHitsInSessionForIp')
SessionDF = SessionDF.withColumn("SessionId", monotonically_increasing_id())
SessionDF.show(10,False)

+------------------------------------------+---------------+------------------------+---------+
|FixedTimeWindow                           |ipaddress      |NumberHitsInSessionForIp|SessionId|
+------------------------------------------+---------------+------------------------+---------+
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|113.193.196.15 |8                       |0        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|116.202.36.65  |98                      |1        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|217.137.241.138|2                       |2        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|59.91.123.165  |28                      |3        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|175.101.10.107 |56                      |4        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|15.219.201.75  |30                      |5        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:00]|59.95.144.250  |9                       |6        |
|[2015-07-22 14:30:00, 2015-07-22 14:40:

In [18]:
# join the time stamps and url to the Sessionized DF
dfWithTimeStamps = mainDF.select(window("timestamp", "10 minutes").alias('FixedTimeWindow'),'timestamp',"ipaddress","url")
SessionDF = dfWithTimeStamps.join(SessionDF,['FixedTimeWindow','ipaddress'])
SessionDF.show(10)

+--------------------+---------------+--------------------+--------------------+------------------------+---------+
|     FixedTimeWindow|      ipaddress|           timestamp|                 url|NumberHitsInSessionForIp|SessionId|
+--------------------+---------------+--------------------+--------------------+------------------------+---------+
|[2015-07-22 08:10...|     1.38.17.27|2015-07-22 08:14:...|https://paytm.com...|                       1|      202|
|[2015-07-22 08:10...| 101.62.130.166|2015-07-22 08:10:...|https://paytm.com...|                       4|      190|
|[2015-07-22 08:10...| 101.62.130.166|2015-07-22 08:10:...|https://paytm.com...|                       4|      190|
|[2015-07-22 08:10...| 101.62.130.166|2015-07-22 08:10:...|https://paytm.com...|                       4|      190|
|[2015-07-22 08:10...| 101.62.130.166|2015-07-22 08:11:...|https://paytm.com...|                       4|      190|
|[2015-07-22 08:10...|106.215.143.117|2015-07-22 08:11:...|https://paytm

In [14]:
# Finding the first hit time of each ip for each session and join in to session df
FirstHitTimeStamps = SessionDF.groupBy("SessionId").agg(min("timestamp").alias('FristHitTime'))
SessionDF = FirstHitTimeStamps.join(SessionDF,['SessionId'])


SessionDF.select(col("SessionId"),col("ipaddress"),col("FristHitTime")).show(10)

+---------+--------------+--------------------+
|SessionId|     ipaddress|        FristHitTime|
+---------+--------------+--------------------+
|       26|  218.248.82.9|2015-07-22 14:32:...|
|       26|  218.248.82.9|2015-07-22 14:32:...|
|       26|  218.248.82.9|2015-07-22 14:32:...|
|       26|  218.248.82.9|2015-07-22 14:32:...|
|       29|  27.62.30.188|2015-07-22 14:32:...|
|       29|  27.62.30.188|2015-07-22 14:32:...|
|       29|  27.62.30.188|2015-07-22 14:32:...|
|       29|  27.62.30.188|2015-07-22 14:32:...|
|      474|101.221.128.95|2015-07-22 16:05:...|
|      474|101.221.128.95|2015-07-22 16:05:...|
+---------+--------------+--------------------+
only showing top 10 rows



### 2. Determine the average session time

In [13]:
# Among all the hits in a session the last one has the max diff with first hit
# we define the time difference of first and last hit in a session to be the duration of a session for an ip
# if there is only one hit in a session the duration is zero
timeDiff = (unix_timestamp(SessionDF.timestamp)-unix_timestamp(SessionDF.FristHitTime))
SessionDF = SessionDF.withColumn("timeDiffwithFirstHit", timeDiff)
tmpdf = SessionDF.groupBy("SessionId").agg(max("timeDiffwithFirstHit").alias("SessionDuration"))
SessionDF = SessionDF.join(tmpdf,['SessionId'])

SessionDF.select(col("SessionId"),col("ipaddress"),col("SessionDuration")).show(10)

+---------+--------------+---------------+
|SessionId|     ipaddress|SessionDuration|
+---------+--------------+---------------+
|       26|  218.248.82.9|             13|
|       26|  218.248.82.9|             13|
|       26|  218.248.82.9|             13|
|       26|  218.248.82.9|             13|
|       29|  27.62.30.188|             33|
|       29|  27.62.30.188|             33|
|       29|  27.62.30.188|             33|
|       29|  27.62.30.188|             33|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
+---------+--------------+---------------+
only showing top 10 rows



In [9]:
# showing the mean session duration
# the printed number is secconds
meandf = SessionDF.groupBy().avg('SessionDuration')
meandf.show()

+--------------------+
|avg(SessionDuration)|
+--------------------+
|  141.58578161415625|
+--------------------+



### 3.) Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session


In [10]:
dfURL = SessionDF.groupBy("SessionId","URL").count().distinct().withColumnRenamed('count', 'hitURLcount')
dfURL.show(10)

+---------+--------------------+-----------+
|SessionId|                 URL|hitURLcount|
+---------+--------------------+-----------+
|       26|https://paytm.com...|          2|
|       26|http://www.paytm....|          2|
|       29|https://paytm.com...|          1|
|       29|https://paytm.com...|          1|
|       29|https://paytm.com...|          1|
|       29|https://paytm.com...|          1|
|      474|https://paytm.com...|          2|
|      474|https://paytm.com...|          2|
|      474|https://paytm.com...|          2|
|      474|https://paytm.com...|          5|
+---------+--------------------+-----------+
only showing top 10 rows



### 4.) Find the most engaged users, ie the IPs with the longest session times


In [12]:
EngagedUsers = SessionDF.select("ipaddress","SessionID","SessionDuration").sort(col("SessionDuration").desc()).distinct()
EngagedUsers.show(10)

+---------------+-------------+---------------+
|      ipaddress|    SessionID|SessionDuration|
+---------------+-------------+---------------+
| 164.100.96.254| 249108103236|            847|
| 111.119.199.22| 283467841590|            839|
|117.220.186.227| 755914244158|            804|
|  15.211.153.75|  17179869239|            596|
| 119.235.53.134| 197568495681|            594|
|   116.50.79.74|1606317769130|            579|
|   52.74.219.71|1417339208087|            559|
|  103.36.251.10|1279900254642|            559|
|  106.186.23.95|1159641170327|            559|
|  14.139.85.180|1039382086048|            558|
+---------------+-------------+---------------+
only showing top 10 rows

