In [1]:
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import scala.util.matching.Regex
import sqlC.implicits._
import org.apache.spark.sql.functions._
import java.sql.Timestamp

In [2]:
/*
Implicit safe conversions from string to other numeric data types, and to java.sql.Timestamp.
*/
implicit class StringConversion(val s: String) {

  private def toTypeOrElse[T](convert: String=>T, defaultVal: T) = try {
    if (s matches "[\\+\\-0-9.e]+") convert(s)
    else defaultVal
  } catch {
    case _: NumberFormatException => defaultVal
  }

  def toShortOrElse(defaultVal: Short = 0) = toTypeOrElse[Short](_.toShort, defaultVal)
  def toByteOrElse(defaultVal: Byte = 0) = toTypeOrElse[Byte](_.toByte, defaultVal)
  def toIntOrElse(defaultVal: Int = 0) = toTypeOrElse[Int](_.toInt, defaultVal)
  def toDoubleOrElse(defaultVal: Double = 0D) = toTypeOrElse[Double](_.toDouble, defaultVal)
  def toLongOrElse(defaultVal: Long = 0L) = toTypeOrElse[Long](_.toLong, defaultVal)
  def toFloatOrElse(defaultVal: Float = 0F) = toTypeOrElse[Float](_.toFloat, defaultVal)
  
  val defaultTimestamp = new Timestamp(0)
  def toTimestampOrElse(defaultVal: Timestamp = defaultTimestamp) = try {
      Timestamp.valueOf(s.replace("T"," ").replace("Z", " "))
  } catch {
      case _: IllegalArgumentException => defaultTimestamp
  }
}

In [3]:
val rawRDD = sc.textFile("/resources/data/2015_07_22_mktplace_shop_web_log_sample.log.gz")

In [4]:
rawRDD.count

1158500

## Loading and Cleaning the Data

A log record is a string that can look as the following. My first thought was to use Regex to split by any space that are not in the quotes. But it turned out to be a time consuming step. Instead, I splited them by quotes. A typical record will be splited into 5 parts. Some logs will be splited differently, for example the following string, as it has quotes inside the quotes. These records (total 22 records) are filtered out in the analysis. 
>Array(2015-07-22T16:10:38.028609Z marketpalce-shop 106.51.132.54:4841 10.0.4.227:80 0.000022 0.000989 0.00002 400 400 0 166 "GET https://paytm.com:443/'"\'\");|]*{%0d%0a<%00>/about/ HTTP/1.1" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)" DHE-RSA-AES128-SHA TLSv1)

In [5]:
val cleanRawRDD = rawRDD.filter(_.split("\"").size == 5)

In [6]:
cleanRawRDD.count

1158478

In [7]:
// split each line to X parts
// Can be replaced with the regex "( (?=([^\"]*\"[^\"]*\")*[^\"]*$))", however will be much slower.
def splitLine(line: String) = {
    val firstSplit = line.split("\"").filter(_ != " ")
    val firstPart = firstSplit(0).trim.split(" ")
    val lastPart = firstSplit(3).trim.split(" ")
    (firstPart :+ firstSplit(1) :+ firstSplit(2)) ++ lastPart
}

In [8]:
val splitRDD = cleanRawRDD.map(line => splitLine(line))

In [9]:
val cleanSplitRDD = splitRDD.filter(line => line.size == 15)

In [10]:
cleanSplitRDD.count

1158478

In [11]:
// represents a log line
// will be used as the schema of the spark DataFrame.
case class WeblogRecord(
    timestamp: java.sql.Timestamp,
    elb: String,
    clientIP: String,
    clientPort: Int,
    backendIPPort: String,
    requestProcTime: Double,
    backendProcTime: Double,
    responseProcTime: Double,
    elbStatus: Int,
    backendStatus: Int,
    recvBytes: Int,
    sendBytes: Int,
    reqMethod: String,
    reqURL: String,
    reqVer: String,
    userAgent: String,
    sslCipher: String,
    sslProt: String
)

// convert from the splitted line to an instance of WebLogRecord, using the safe conversions defined above
def rowToWeblogRecord(row: Array[String]) = {
    val urlBreak = row(11).replace("\"", "").split(" ")
    val clientBreak = row(2).split(":")
    WeblogRecord(
        row(0).toTimestampOrElse(),
        row(1),
        clientBreak(0),
        clientBreak(0).toIntOrElse(-1),
        row(3),
        row(4).toDoubleOrElse(-1.0),
        row(5).toDoubleOrElse(-1.0),
        row(6).toDoubleOrElse(-1.0),
        row(7).toIntOrElse(-1),
        row(8).toIntOrElse(-1),
        row(9).toIntOrElse(-1),
        row(10).toIntOrElse(-1),
        urlBreak.lift(0).getOrElse(""),
        urlBreak.lift(1).getOrElse(""),
        urlBreak.lift(2).getOrElse(""),
        row(12),
        row(13),
        row(14)
    )
}

In [12]:
val logDF = splitRDD.map(row => rowToWeblogRecord(row)).toDF

In [13]:
logDF.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- elb: string (nullable = true)
 |-- clientIP: string (nullable = true)
 |-- clientPort: integer (nullable = false)
 |-- backendIPPort: string (nullable = true)
 |-- requestProcTime: double (nullable = false)
 |-- backendProcTime: double (nullable = false)
 |-- responseProcTime: double (nullable = false)
 |-- elbStatus: integer (nullable = false)
 |-- backendStatus: integer (nullable = false)
 |-- recvBytes: integer (nullable = false)
 |-- sendBytes: integer (nullable = false)
 |-- reqMethod: string (nullable = true)
 |-- reqURL: string (nullable = true)
 |-- reqVer: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- sslCipher: string (nullable = true)
 |-- sslProt: string (nullable = true)



### Q1: Sessional Page Hits
For this analysis, a session is defined as all queries made by a user in a particular time period (**time window**). The following table is showing sessions in the time window of 30 minutes. It is equivalent to create a temporary view and run the following query in SQL:
```python
logDF.registerTempTable("webLog")
%%DataFrame
sqlC.sql("
    SELECT floor((unix_timestamp(timestamp)-1437532806)/(60*30)) timewin, clientIP, 
    COUNT(*) page_hits FROM webLog GROUP BY floor((unix_timestamp(timestamp)-1437532806)/(60*30)), clientIP 
    ORDER BY timewin, clientIP"
)
```
To generate the desired time window, an original time point has to be determined. The original time point is the minimum timestamp of the whole dataset. A time window rank number will be determined as: *(current timestamp - minimum timestamp)/(60 sec x window width in min)*

In [14]:
/*
calculate minimum timestamp (unix style) to subtract it from all the timestamps,
so zero represents the beginning of the analyzed period
*/
val min_timestamp = logDF.select(min(unix_timestamp($"timestamp"))).collect()
println(min_timestamp(0))

[1437532806]


In [17]:
logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*30)).as("timewin"), $"clientIP").groupBy($"timewin", $"clientIP").count().orderBy($"timewin", $"clientIP").show()

+-------+-------------+-----+
|timewin|     clientIP|count|
+-------+-------------+-----+
|      0| 1.186.247.60|    4|
|      0|  1.186.41.10|    3|
|      0|  1.186.44.42|    1|
|      0|  1.186.78.17|    4|
|      0|   1.186.78.9|    9|
|      0| 1.187.140.24|    1|
|      0|1.187.164.121|    2|
|      0| 1.187.164.29|    9|
|      0|1.187.168.128|    4|
|      0|1.187.186.198|    4|
|      0| 1.187.187.45|    1|
|      0| 1.187.193.22|    1|
|      0| 1.187.202.23|    1|
|      0|  1.187.208.2|    6|
|      0| 1.187.233.50|    3|
|      0|1.187.249.228|   22|
|      0| 1.187.250.28|    4|
|      0| 1.187.250.52|    7|
|      0|  1.187.43.74|    3|
|      0|  1.187.49.35|    1|
+-------+-------------+-----+
only showing top 20 rows



### Q2: Average Session Time
To calculate the average session time, the time span of each session is needed. The time span of each session (in seconds) is defined as: _the last query time - the first query time_ during each session. For clarity purpose, the following table is defined as *timespanDF_30* for the subsequent calculation of the total average session time. My result shows that the users spend an average time of __99.46__ seconds per session.

In [18]:
logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*30)).as("timewin"), $"clientIP", $"timestamp").groupBy($"timewin", $"clientIP").agg((max(unix_timestamp($"timestamp"))-min(unix_timestamp($"timestamp"))).alias("timespan")).orderBy($"timewin").show()

+-------+---------------+--------+
|timewin|       clientIP|timespan|
+-------+---------------+--------+
|      0| 101.60.167.204|      76|
|      0| 117.253.105.27|      30|
|      0| 106.76.176.223|       1|
|      0|  117.249.186.1|       0|
|      0|107.167.107.125|       0|
|      0|112.133.215.242|      27|
|      0|107.167.107.120|     199|
|      0| 136.185.171.11|       0|
|      0|   49.248.85.79|     243|
|      0|180.151.208.141|       5|
|      0|  117.244.52.67|       1|
|      0|    1.39.46.165|      35|
|      0|  183.82.147.94|       2|
|      0|117.255.253.154|       0|
|      0|175.100.175.178|       0|
|      0|  43.230.38.170|       0|
|      0|   117.203.8.98|      16|
|      0|  37.228.105.65|      67|
|      0|  117.196.56.70|     178|
|      0|  60.243.144.23|     127|
+-------+---------------+--------+
only showing top 20 rows



In [19]:
val timespanDF_30=logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*30)).as("timewin"), $"clientIP", $"timestamp").groupBy($"timewin", $"clientIP").agg((max(unix_timestamp($"timestamp"))-min(unix_timestamp($"timestamp"))).alias("timespan")).orderBy($"timewin")

In [20]:
timespanDF_30.select(avg($"timespan")).show()

                                                                                +-----------------+
|    avg(timespan)|
+-----------------+
|99.46373757313842|
+-----------------+



### Optimizing Time Window
How to define time window is often a question of debates. For this particular dataset, I tried a time window of 15, 30 and 60 minutes and assessed their effects on the average time span per time window. It is expected that larger time window will capture longer session, resulting the increase of the average time span. 

In [21]:
val timespanDF_15=logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*15)).as("timewin"), $"clientIP", $"timestamp").groupBy($"timewin", $"clientIP").agg((max(unix_timestamp($"timestamp"))-min(unix_timestamp($"timestamp"))).alias("timespan")).orderBy($"timewin")
val timespanDF_30=logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*30)).as("timewin"), $"clientIP", $"timestamp").groupBy($"timewin", $"clientIP").agg((max(unix_timestamp($"timestamp"))-min(unix_timestamp($"timestamp"))).alias("timespan")).orderBy($"timewin")
val timespanDF_60=logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*60)).as("timewin"), $"clientIP", $"timestamp").groupBy($"timewin", $"clientIP").agg((max(unix_timestamp($"timestamp"))-min(unix_timestamp($"timestamp"))).alias("timespan")).orderBy($"timewin")

val avgTimeSpanDF_15 = timespanDF_15.select($"timewin", $"timespan").groupBy($"timewin").agg(avg($"timespan"))
val avgTimeSpanDF_30 = timespanDF_30.select($"timewin", $"timespan").groupBy($"timewin").agg(avg($"timespan"))
val avgTimeSpanDF_60 = timespanDF_60.select($"timewin", $"timespan").groupBy($"timewin").agg(avg($"timespan"))

In [22]:
avgTimeSpanDF_15.show(40)

+-------+------------------+
|timewin|     avg(timespan)|
+-------+------------------+
|      0| 41.39601353892441|
|      9|               0.0|
|     10|49.036207717960934|
|     16|               0.0|
|     17| 49.87924071082391|
|     25| 53.48273578658594|
|     31| 99.61538461538461|
|     32|  57.4678470668353|
|     33|56.532908089301245|
|     37|            36.125|
|     41|49.166666666666664|
|     42|               7.0|
|     52|              31.5|
|     53|               0.0|
|     54|110.76200461312644|
|     55|               0.0|
|     56| 54.85549189084199|
|     59|               0.0|
|     60| 58.58223583460949|
|     61| 49.28644314868804|
|     67|             149.5|
|     73| 45.15597569209993|
|     74|            4.1625|
+-------+------------------+



In [23]:
avgTimeSpanDF_30.show()

+-------+------------------+
|timewin|     avg(timespan)|
+-------+------------------+
|      0| 41.39601353892441|
|      4|               0.0|
|      5|49.036207717960934|
|      8|  49.9294656077534|
|     12| 53.48273578658594|
|     15| 99.61538461538461|
|     16| 153.1666307393835|
|     18|            36.125|
|     20|49.166666666666664|
|     21|               7.0|
|     26| 32.18181818181818|
|     27|110.85610945117156|
|     28| 54.85549189084199|
|     29|               0.0|
|     30|136.73887355052457|
|     33|             149.5|
|     36| 45.15597569209993|
|     37|            4.1625|
+-------+------------------+



When comparing the average time span in 15-min and 30-min time windows (above 2 tables), I noticed the values in the 30-min windows of 10:40-11:10 (*16th*) and 17:40-18:10 (*30th*) are much larger than in corresponding 15-min time windows, suggesting users during these time periods had more engaging sessions (>15 min) and thus the 30-min time window is more suitable for defining the sessions. 


The following table shows the average time span in 60-min time windows. Extending time window to 60 minutes did not change the average time span of sessions, indicating the users were all engaging under 30 minutes during a session. As a result, the 30-min time window is the optimal among the three time windows tested. Therefore, my subsequent analysis will only be using 30-min time window for sessionization.

In [24]:
avgTimeSpanDF_60.show()

+-------+------------------+
|timewin|     avg(timespan)|
+-------+------------------+
|      0| 41.39601353892441|
|      2| 49.06876290296967|
|      4|  49.9294656077534|
|      6| 53.48273578658594|
|      7| 99.61538461538461|
|      8| 153.1666307393835|
|      9|            36.125|
|     10|             50.25|
|     13|110.94042441708147|
|     14|  55.1799378520101|
|     15|136.73887355052457|
|     16|             149.5|
|     18| 48.20437017994858|
+-------+------------------+



### Q3: Sessional Unique URL
unique URL per session can be easily calculated by counting distinct url in each session. The following table shows unique URL counts in 30-min time windows. 

In [25]:
logDF.select(floor((unix_timestamp($"timestamp")-1437532806)/(60*30)).as("timewin"), $"clientIP", $"reqURL").groupBy($"timewin", $"clientIP").agg(countDistinct($"reqURL").alias("uniqueURL")).orderBy($"timewin", $"clientIP").show()

                                                                                +-------+-------------+---------+
|timewin|     clientIP|uniqueURL|
+-------+-------------+---------+
|      0| 1.186.247.60|        4|
|      0|  1.186.41.10|        3|
|      0|  1.186.44.42|        1|
|      0|  1.186.78.17|        4|
|      0|   1.186.78.9|        7|
|      0| 1.187.140.24|        1|
|      0|1.187.164.121|        2|
|      0| 1.187.164.29|        8|
|      0|1.187.168.128|        4|
|      0|1.187.186.198|        4|
|      0| 1.187.187.45|        1|
|      0| 1.187.193.22|        1|
|      0| 1.187.202.23|        1|
|      0|  1.187.208.2|        3|
|      0| 1.187.233.50|        3|
|      0|1.187.249.228|        8|
|      0| 1.187.250.28|        4|
|      0| 1.187.250.52|        6|
|      0|  1.187.43.74|        3|
|      0|  1.187.49.35|        1|
+-------+-------------+---------+
only showing top 20 rows



### Q4: Most Engaged Users
Most engaged users are IPs/users with the longest session times. The first table contains users with longest session times in a 30-min time window. The second table contains users with largest sum of session times from all time windows. Insterestingly, the top four users in both tables are the same users, confirming these users are genuine most engaged users. 

In [26]:
timespanDF_30.orderBy(desc("timespan")).show()

                                                                                +-------+---------------+--------+
|timewin|       clientIP|timespan|
+-------+---------------+--------+
|     30|  220.226.206.7|    1505|
|     30|   52.74.219.71|    1499|
|     30|  119.81.61.166|    1499|
|     30|  182.48.232.41|    1498|
|     30|  54.251.151.39|    1497|
|     30|   46.236.24.53|    1496|
|     30| 182.18.177.214|    1496|
|     30| 180.179.213.70|    1494|
|     30|  54.169.191.85|    1494|
|     30| 122.248.183.22|    1493|
|     30|  49.204.55.206|    1492|
|     30|  66.249.71.110|    1492|
|     30|   49.205.90.93|    1492|
|     30|168.235.197.212|    1492|
|     30|   103.42.88.34|    1490|
|     30|  66.249.71.118|    1490|
|     30|  54.244.52.204|    1490|
|     30|117.207.121.176|    1489|
|     30|   66.102.6.250|    1488|
|     30|   54.228.16.12|    1488|
+-------+---------------+--------+
only showing top 20 rows



In [27]:
timespanDF_30.select($"clientIP", $"timespan").groupBy($"clientIP").agg(sum($"timespan").alias("sum_timespan")).orderBy(desc("sum_timespan")).show()

                                                                                +--------------+------------+
|      clientIP|sum_timespan|
+--------------+------------+
| 220.226.206.7|        6426|
|  52.74.219.71|        5817|
| 119.81.61.166|        5808|
| 54.251.151.39|        5787|
| 106.186.23.95|        5555|
|121.58.175.128|        5383|
|  125.19.44.66|        5233|
| 54.169.191.85|        5176|
|180.179.213.94|        5067|
| 54.244.52.204|        5033|
|54.250.253.236|        5008|
|180.179.213.70|        4991|
|54.252.254.204|        4926|
|  207.46.13.22|        4914|
|180.179.213.71|        4879|
|122.252.231.14|        4751|
|176.34.159.236|        4748|
| 54.245.168.44|        4723|
| 54.243.31.236|        4718|
| 54.251.31.140|        4609|
+--------------+------------+
only showing top 20 rows



### Bonus: Distinct User Identification
IP does not guarantee distinct users, as several users can share the same external IP address.
Based on the available data, we can use the client user-agent, combined with the IP address, to identify a distinct user.
While this method still does not guarantee 100% that the identified users are distinct, as several users with the same IP can have the same user-agent and the same user can use multiple user-agents (e.g., if they use multiple browsers), it does provide a way to improve the identification process.

Following is the number of distinct users based only on IP address, and the number of distinct users based on both IP address and the user agent. The latter method manages to distinguish more distinct users.

In [28]:
logDF.agg(countDistinct($"clientIP")).collect

Array([90544])

In [29]:
logDF.agg(countDistinct($"clientIP", $"userAgent")).collect

Array([112594])

The following table counts the number of user agent per IP, clearly showing that indeed a same IP address can have as many as 49 different user agents, indicating the significance of this method in furthur identifying distinct users per IP. 

In [30]:
logDF.groupBy($"clientIP").agg(countDistinct($"userAgent") as "cntUserAgents").orderBy(desc("cntUserAgents")).show()

                                                                                +---------------+-------------+
|       clientIP|cntUserAgents|
+---------------+-------------+
|   125.19.44.66|           49|
|  116.50.59.180|           31|
| 122.252.231.14|           31|
|   59.144.58.37|           30|
|  123.63.74.210|           28|
|168.235.197.212|           26|
| 203.143.186.45|           25|
|   125.20.9.248|           24|
| 117.239.35.226|           23|
|   112.121.55.9|           23|
|168.235.197.151|           22|
|203.145.131.164|           21|
|   117.239.53.7|           20|
|   103.12.119.2|           20|
| 103.22.172.130|           19|
|168.235.197.149|           19|
| 115.111.223.43|           19|
|    61.246.57.5|           19|
|  14.139.241.84|           19|
|168.235.197.238|           19|
+---------------+-------------+
only showing top 20 rows

