# Simple Weblog Analytics - The Batch Way
In this notebook, we are going to quickly visit a batch process of a series of weblog files to obtain the top trending pages per day.

In [3]:
// This is the location of the unpackaged files. Update accordingly
// You can unpack the provided dataset with:
// tar xvf datasets/NASA-weblogs/nasa_dataset_july_1995.tgz -C /tmp/data/
val logsDirectory = "/home/ldi/nasa_dataset_july_1995"
val rawLogs = spark.read.json(logsDirectory)

// test xav

logsDirectory: String = /home/ldi/nasa_dataset_july_1995
rawLogs: org.apache.spark.sql.DataFrame = [bytes: bigint, host: string ... 3 more fields]


## We define a schema for the data in the logs
Following the formal description of the dataset (at: [NASA-HTTP](http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html) ), the log is structured as follows:

>The logs are an ASCII file with one line per request, with the following columns:
- host making the request. A hostname when possible, otherwise the Internet address if the name could not be looked up.
- timestamp in the format "DAY MON DD HH:MM:SS YYYY", where DAY is the day of the week, MON is the name of the month, DD is the day of the month, HH:MM:SS is the time of day using a 24-hour clock, and YYYY is the year. The timezone is -0400.
- request given in quotes.
- HTTP reply code.
- bytes in the reply.

The dataset provided for this exercise offers this data in JSON format

In [4]:
import java.sql.Timestamp
case class WebLog(host:String, 
                  timestamp: String, 
                  request: String, 
                  http_reply:Int, 
                  bytes: Long
                 )

import java.sql.Timestamp
defined class WebLog


## We convert the raw data to structured logs

In [5]:
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.sql.Timestamp
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
val preparedLogs = rawLogs.withColumn("http_reply", $"http_reply".cast(IntegerType))


import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.sql.Timestamp
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
preparedLogs: org.apache.spark.sql.DataFrame = [bytes: bigint, host: string ... 3 more fields]


In [6]:
preparedLogs.show(5)
preparedLogs.schema.treeString

+-------+----------------+----------+--------------------+--------------------+
|  bytes|            host|http_reply|             request|           timestamp|
+-------+----------------+----------+--------------------+--------------------+
|      0| nntp1.reach.com|       304|GET /images/NASA-...|1995-07-13T00:00:...|
|  32252|webgate1.mot.com|       200|GET /shuttle/tech...|1995-07-13T00:00:...|
|    751|webgate1.mot.com|       200|GET /htbin/cdt_cl...|1995-07-13T00:00:...|
|1121554|  204.157.128.52|       200|GET /shuttle/miss...|1995-07-13T00:00:...|
|      0| nntp1.reach.com|       304|GET /images/KSC-l...|1995-07-13T00:00:...|
+-------+----------------+----------+--------------------+--------------------+
only showing top 5 rows



res0: String =
"root
 |-- bytes: long (nullable = true)
 |-- host: string (nullable = true)
 |-- http_reply: integer (nullable = true)
 |-- request: string (nullable = true)
 |-- timestamp: string (nullable = true)
"


In [7]:
val weblogs = preparedLogs.as[WebLog] //convert DF preparedLogs to a dataset of given type (Weblog)
weblogs.schema.treeString

weblogs: org.apache.spark.sql.Dataset[WebLog] = [bytes: bigint, host: string ... 3 more fields]
res1: String =
"root
 |-- bytes: long (nullable = true)
 |-- host: string (nullable = true)
 |-- http_reply: integer (nullable = true)
 |-- request: string (nullable = true)
 |-- timestamp: string (nullable = true)
"


## Now, we have the data in a structured format and we can start asking the questions that interest us.
We have imported the data and transformed it using a known schema.  We can use this 'structured' data to create queries that provide insights in the behavior of the users. 

### As a first step, we would like to know how many records are contained in our dataset.

In [8]:
    val recordCount = weblogs.count

recordCount: Long = 1871988


### A common question would be, what was the most popular URL per day?
We first reduce the timestamp to the day of the year. We then group by this new 'day of year' column and the request url and we count over this aggregate. We finally order using descending order to get this top URLs first.

In [9]:
val topDailyURLsStep1 = weblogs.withColumn("dayOfMonth", dayofmonth($"timestamp"))
topDailyURLsStep1.show(5)

+-------+----------------+----------+--------------------+--------------------+----------+
|  bytes|            host|http_reply|             request|           timestamp|dayOfMonth|
+-------+----------------+----------+--------------------+--------------------+----------+
|      0| nntp1.reach.com|       304|GET /images/NASA-...|1995-07-13T00:00:...|        13|
|  32252|webgate1.mot.com|       200|GET /shuttle/tech...|1995-07-13T00:00:...|        13|
|    751|webgate1.mot.com|       200|GET /htbin/cdt_cl...|1995-07-13T00:00:...|        13|
|1121554|  204.157.128.52|       200|GET /shuttle/miss...|1995-07-13T00:00:...|        13|
|      0| nntp1.reach.com|       304|GET /images/KSC-l...|1995-07-13T00:00:...|        13|
+-------+----------------+----------+--------------------+--------------------+----------+
only showing top 5 rows



topDailyURLsStep1: org.apache.spark.sql.DataFrame = [bytes: bigint, host: string ... 4 more fields]


In [10]:
val topDailyURLsStep2 = topDailyURLsStep1
    .select($"request",$"dayOfMonth")
    .groupBy($"dayOfMonth", $"request")
    .agg(count($"request").alias("count"))
topDailyURLsStep2.show(5)

+----------+--------------------+-----+
|dayOfMonth|             request|count|
+----------+--------------------+-----+
|        13|GET /icons/sound....|   98|
|        13|GET /cgi-bin/imag...|    2|
|        13|GET /cgi-bin/imag...|    2|
|        13|GET /htbin/wais.p...|   14|
|        13|GET /statistics/1...|    1|
+----------+--------------------+-----+
only showing top 5 rows



topDailyURLsStep2: org.apache.spark.sql.DataFrame = [dayOfMonth: int, request: string ... 1 more field]


In [11]:
val topDailyURLs = weblogs.withColumn("dayOfMonth", dayofmonth($"timestamp"))
                          .select($"request", $"dayOfMonth")
                          .groupBy($"dayOfMonth", $"request")
                          .agg(count($"request").alias("count"))
                          .orderBy(desc("count"))

topDailyURLs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [dayOfMonth: int, request: string ... 1 more field]


In [12]:
topDailyURLs.take(10)


res4: Array[org.apache.spark.sql.Row] = Array([13,GET /images/NASA-logosmall.gif HTTP/1.0,12476], [13,GET /htbin/cdt_main.pl HTTP/1.0,7471], [12,GET /images/NASA-logosmall.gif HTTP/1.0,7143], [13,GET /htbin/cdt_clock.pl HTTP/1.0,6237], [6,GET /images/NASA-logosmall.gif HTTP/1.0,6112], [5,GET /images/NASA-logosmall.gif HTTP/1.0,5865], [13,GET /images/KSC-logosmall.gif HTTP/1.0,5662], [7,GET /images/NASA-logosmall.gif HTTP/1.0,5651], [3,GET /images/NASA-logosmall.gif HTTP/1.0,5356], [6,GET /images/KSC-logosmall.gif HTTP/1.0,5126])


In [13]:
topDailyURLs.show(10) //show allows better table view.

+----------+--------------------+-----+
|dayOfMonth|             request|count|
+----------+--------------------+-----+
|        13|GET /images/NASA-...|12476|
|        13|GET /htbin/cdt_ma...| 7471|
|        12|GET /images/NASA-...| 7143|
|        13|GET /htbin/cdt_cl...| 6237|
|         6|GET /images/NASA-...| 6112|
|         5|GET /images/NASA-...| 5865|
|        13|GET /images/KSC-l...| 5662|
|         7|GET /images/NASA-...| 5651|
|         3|GET /images/NASA-...| 5356|
|         6|GET /images/KSC-l...| 5126|
+----------+--------------------+-----+
only showing top 10 rows



### Top hits are all images. What now?
It's not unusual to see that the top URLs are images commonly used across a site.

Our true interest lies in the content pages generating most traffic. To find those, we first filter on `html` content 
and then proceed to apply the top aggregation we just learned.

The request field is a quoted sequence of `[HTTP_VERB] URL [HTTP_VERSION]`. We will extract the url and preserve only those ending in `.html`, `.htm` or no extension (directories). This is a simplification for the purpose of the exercise. 

In [14]:
weblogs.show(10)

+-------+--------------------+----------+--------------------+--------------------+
|  bytes|                host|http_reply|             request|           timestamp|
+-------+--------------------+----------+--------------------+--------------------+
|      0|     nntp1.reach.com|       304|GET /images/NASA-...|1995-07-13T00:00:...|
|  32252|    webgate1.mot.com|       200|GET /shuttle/tech...|1995-07-13T00:00:...|
|    751|    webgate1.mot.com|       200|GET /htbin/cdt_cl...|1995-07-13T00:00:...|
|1121554|      204.157.128.52|       200|GET /shuttle/miss...|1995-07-13T00:00:...|
|      0|     nntp1.reach.com|       304|GET /images/KSC-l...|1995-07-13T00:00:...|
|      0|    webgate1.mot.com|       304|GET /images/NASA-...|1995-07-13T00:00:...|
| 190269|    webgate1.mot.com|       200|GET /shuttle/miss...|1995-07-13T00:00:...|
|   7008|    darkstar.isi.edu|       200|GET /facilities/l...|1995-07-13T00:00:...|
|  57344|dd08-048.compuser...|       200|GET /shuttle/coun...|1995-07-13T00:

In [15]:
val urlExtractor = """^GET (.+) HTTP/\d.\d""".r
val allowedExtensions = Set(".html",".htm", "")
val contentPageLogs = weblogs.filter {log => 
  log.request match {                                        
    case urlExtractor(url) => 
      val ext = url.takeRight(5).dropWhile(c => c != '.')
      allowedExtensions.contains(ext)
    case _ => false 
  }
}
                                      

urlExtractor: scala.util.matching.Regex = ^GET (.+) HTTP/\d.\d
allowedExtensions: scala.collection.immutable.Set[String] = Set(.html, .htm, "")
contentPageLogs: org.apache.spark.sql.Dataset[WebLog] = [bytes: bigint, host: string ... 3 more fields]


In [16]:
contentPageLogs.show(100)

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 31) (ubuntu20ldi1 executor driver): java.lang.ClassCastException: $iw cannot be cast to $iw

In [None]:
val topContentPages = contentPageLogs.withColumn("dayOfMonth", dayofmonth($"timestamp"))
                          .select($"request", $"dayOfMonth")
                          .groupBy($"dayOfMonth", $"request")
                          .agg(count($"request").alias("count"))
                          .orderBy(desc("count"))

In [None]:
topContentPages

We can see that the most popular page that month was `liftoff.html ` corresponding to the coverage of the launch of the Discovery shuttle, as documented on the NASA archives: https://www.nasa.gov/mission_pages/shuttle/shuttlemissions/archives/sts-70.html.

It's closely followed by `countdown/` the days prior ot the launch.