## Scalable Web Server Log Analytics with Apache Spark
> This is the Scala version of the excellent work by [dipanjanS](https://github.com/dipanjanS/data_science_for_all/blob/master/tds_scalable_log_analytics/Scalable_Log_Analytics_Spark.ipynb) which was implemented in Python (PySpark). Kindly visit his [Github](https://github.com/dipanjanS/data_science_for_all/blob/master/tds_scalable_log_analytics/Scalable_Log_Analytics_Spark.ipynb) page for more details on the project.

This project make use of Scala version 2.12.8, Spark 2.4.0 and plotly (in place of Seaborn/Matplotlib used in the original post).

### Prerequisites
The following needs to be setup/installed
* [Almond](https://almond.sh/) - a Scala kernel for Jupyter.
* [Plotly](https://plot.ly/scala/) for scala

### Part 1 - Setting up Dependencies

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`sh.almond::almond-spark:0.5.0`
import $ivy.`org.plotly-scala::plotly-almond:0.7.0`
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                              
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36mplotly._, plotly.element._, plotly.layout._, plotly.Almond._[39m

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

In [3]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._

[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.Column
[39m
[32mimport [39m[36morg.apache.spark.sql.types._[39m

In [4]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@75c958a7


### Part 2 - Loading and Viewing the NASA Log Dataset

##### Load data

In [6]:
val baseDF = spark.read.text("data/*.gz")

[36mbaseDF[39m: [32mDataFrame[39m = [value: string]

In [7]:
baseDF.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
type(baseDF)

##### Viewing sample data in our dataframe

In [8]:
baseDF.show(10, truncate = false)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
|burger.letters.com - - 

##### Print schema

### Part 3 - Data Wrangling

##### Data Parsing and Extraction with Regular Expressions

In [9]:
println(baseDF.count(), baseDF.columns.length)

(3461613,1)


In [10]:
val sampleLogs = baseDF.take(15)
sampleLogs

[36msampleLogs[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245],
  [unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985],
  [199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085],
  [burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0],
  [199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179],
  [burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0],
  [burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0],
  [205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985],
  [d104.aa.net - - [01/Jul/1995:00:00:13 -

##### Extracting host names

In [11]:
val hostPattern = """(^\S+\.[\S+\.]+\S+)\s"""
val hosts = baseDF.withColumn("value", regexp_extract(baseDF("value"), hostPattern, 1))
hosts

[36mhostPattern[39m: [32mString[39m = [32m"(^\\S+\\.[\\S+\\.]+\\S+)\\s"[39m
[36mhosts[39m: [32mDataFrame[39m = [value: string]
[36mres10_2[39m: [32mDataFrame[39m = [value: string]

##### Extracting timestamps

In [12]:
val tsPattern = """\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]"""
val timeStamps = baseDF.withColumn("value", regexp_extract(baseDF("value"), tsPattern, 1))
timeStamps

[36mtsPattern[39m: [32mString[39m = [32m"\\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} -\\d{4})]"[39m
[36mtimeStamps[39m: [32mDataFrame[39m = [value: string]
[36mres11_2[39m: [32mDataFrame[39m = [value: string]

##### Extracting HTTP Request Method, URIs and Protocol

In [13]:
val methodURIProtocolPattern = """\"(\S+)\s(\S+)\s*(\S*)\""""
val methodURIProtocol = baseDF.withColumn("value", regexp_extract(baseDF("value"), methodURIProtocolPattern, 0))
methodURIProtocol

[36mmethodURIProtocolPattern[39m: [32mString[39m = [32m"\\\"(\\S+)\\s(\\S+)\\s*(\\S*)\\\""[39m
[36mmethodURIProtocol[39m: [32mDataFrame[39m = [value: string]
[36mres12_2[39m: [32mDataFrame[39m = [value: string]

##### Extracting HTTP Status Codes

In [14]:
val statusPattern = """\s(\d{3})\s"""
val status = baseDF.withColumn("value", regexp_extract(baseDF("value"), statusPattern, 1))
status

[36mstatusPattern[39m: [32mString[39m = [32m"\\s(\\d{3})\\s"[39m
[36mstatus[39m: [32mDataFrame[39m = [value: string]
[36mres13_2[39m: [32mDataFrame[39m = [value: string]

##### Extracting HTTP Response Content Size

In [15]:
val contentSizePattern = """\s(\d+)$"""
val contentSize = baseDF.withColumn("value", regexp_extract(baseDF("value"), contentSizePattern, 1))
contentSize

[36mcontentSizePattern[39m: [32mString[39m = [32m"\\s(\\d+)$"[39m
[36mcontentSize[39m: [32mDataFrame[39m = [value: string]
[36mres14_2[39m: [32mDataFrame[39m = [value: string]

#### Putting it all together
Let's now try and leverage all the regular expression patterns we previously built and use the regexp_extract(...) method to build our dataframe with all the log attributes neatly extracted in their own separate columns.

In [16]:
val logsDF = baseDF.select(regexp_extract(baseDF("value"), hostPattern, 1).alias("host"),
      regexp_extract(baseDF("value"), tsPattern, 1).alias("timestamp"),
      regexp_extract(baseDF("value"), methodURIProtocolPattern, 1).alias("method"),
      regexp_extract(baseDF("value"), methodURIProtocolPattern, 2).alias("endpoint"),
      regexp_extract(baseDF("value"), methodURIProtocolPattern, 3).alias("protocol"),
      regexp_extract(baseDF("value"), statusPattern, 1).cast("integer").alias("status"),
      regexp_extract(baseDF("value"), contentSizePattern, 1).cast("integer").alias("content_size")
    )
logsDF

[36mlogsDF[39m: [32mDataFrame[39m = [host: string, timestamp: string ... 5 more fields]
[36mres15_1[39m: [32mDataFrame[39m = [host: string, timestamp: string ... 5 more fields]

##### Finding Missing Values

In [17]:
val badRowsDF = logsDF.filter(logsDF("host").isNull ||
      logsDF("timestamp").isNull ||
      logsDF("method").isNull ||
      logsDF("endpoint").isNull ||
      logsDF("protocol").isNull ||
      logsDF("status").isNull ||
      logsDF("content_size").isNull
    )
badRowsDF

[36mbadRowsDF[39m: [32mDataset[39m[[32mRow[39m] = [host: string, timestamp: string ... 5 more fields]
[36mres16_1[39m: [32mDataset[39m[[32mRow[39m] = [host: string, timestamp: string ... 5 more fields]

##### Finding Null Counts

In [18]:
def countNull(colName: Column) = sum(colName.isNull.cast("integer"))
val exprs = logsDF.select(logsDF.columns map (c => countNull(col(c)).alias(c)): _*).show
exprs

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     1|       33905|
+----+---------+------+--------+--------+------+------------+



defined [32mfunction[39m [36mcountNull[39m

##### Handling nulls in HTTP status
Our original parsing regular expression for the status column was:

```regexp_extract(baseDF("value"), statusPattern, 1).cast("integer").alias("status")```

In [19]:
val nullStatusDF = baseDF.filter(!baseDF("value").rlike("""\s(\d{3})\s"""))
nullStatusDF.show

+--------+
|   value|
+--------+
|alyssa.p|
+--------+



[36mnullStatusDF[39m: [32mDataset[39m[[32mRow[39m] = [value: string]

In [20]:
val badStatusDF = nullStatusDF.select(regexp_extract(baseDF("value"), hostPattern, 1).alias("host"),
      regexp_extract(baseDF("value"), tsPattern, 1).alias("timestamp"),
      regexp_extract(baseDF("value"), methodURIProtocolPattern, 1).alias("method"),
      regexp_extract(baseDF("value"), methodURIProtocolPattern, 2).alias("endpoint"),
      regexp_extract(baseDF("value"), methodURIProtocolPattern, 3).alias("protocol"),
      regexp_extract(baseDF("value"), statusPattern, 1).cast("integer").alias("status"),
      regexp_extract(baseDF("value"), contentSizePattern, 1).cast("integer").alias("content_size")
    )
badStatusDF.show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|    |         |      |        |        |  null|        null|
+----+---------+------+--------+--------+------+------------+



[36mbadStatusDF[39m: [32mDataFrame[39m = [host: string, timestamp: string ... 5 more fields]

In [21]:
logsDF.count()

[36mres20[39m: [32mLong[39m = [32m3461613L[39m

In [23]:
val logsDF1 = logsDF.filter(logsDF("status").isNotNull)
logsDF1.count()

[36mlogsDF1[39m: [32mDataset[39m[[32mRow[39m] = [host: string, timestamp: string ... 5 more fields]
[36mres22_1[39m: [32mLong[39m = [32m3461612L[39m

In [24]:
val exprs = logsDF2.select(logsDF2.columns map (c => countNull(col(c)).alias(c)): _*)
exprs.show

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|       33904|
+----+---------+------+--------+--------+------+------------+



[36mexprs[39m: [32mDataFrame[39m = [host: bigint, timestamp: bigint ... 5 more fields]

##### Handling nulls in HTTP content size
Based on our previous regular expression, our original parsing regular expression for the content_size column was:

```regexp_extract(baseDF("value"), contentSizePattern, 1).cast("integer").alias("content_size")```

Could there be missing data in our original dataset itself? Let's try and find out!

##### Find out the records in our base data frame with potential missing content sizes

In [25]:
val nullContentSizeDF = baseDF.filter(!baseDF("value").rlike("""\s\d+$"""))
nullContentSizeDF.count()

[36mnullContentSizeDF[39m: [32mDataset[39m[[32mRow[39m] = [value: string]
[36mres24_1[39m: [32mLong[39m = [32m33905L[39m

##### Display the top ten records of your data frame having missing content sizes

In [26]:
nullContentSizeDF.take(10)

[36mres25[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [dd15-062.compuserve.com - - [01/Jul/1995:00:01:12 -0400] "GET /news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt HTTP/1.0" 404 -],
  [dynip42.efn.org - - [01/Jul/1995:00:02:14 -0400] "GET /software HTTP/1.0" 302 -],
  [ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:02:40 -0400] "GET /software/winvn HTTP/1.0" 302 -],
  [ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:03:24 -0400] "GET /software HTTP/1.0" 302 -],
  [link097.txdirect.net - - [01/Jul/1995:00:05:06 -0400] "GET /shuttle HTTP/1.0" 302 -],
  [ix-war-mi1-20.ix.netcom.com - - [01/Jul/1995:00:05:13 -0400] "GET /shuttle/missions/sts-78/news HTTP/1.0" 302 -],
  [ix-war-mi1-20.ix.netcom.com - - [01/Jul/1995:00:05:58 -0400] "GET /shuttle/missions/sts-72/news HTTP/1.0" 302 -],
  [netport-27.iu.net - - [01/Jul/1995:00:10:19 -0400] "GET /pub/winvn/readme.txt HTTP/1.0" 404 -],
  [netport-27.iu.net - - [01/Jul/1995:00:10:28 -0400] "GET /pub/winvn/readme.txt

It is quite evident that the bad raw data records correspond to error responses, where no content was sent back and the server emitted a "-" for the content_size field.

Since we don't want to discard those rows from our analysis, let's impute or fill them to 0.

##### Fix the rows with null content_size
In Scala you can use Map inside fill where key is the column name and value is of Int, Long, Float, Double, String, Boolean.

Check this [Stackoverflow](https://stackoverflow.com/questions/51073493/spark-dataframe-na-fill-boolean-column-type) answer

```>>> df.na.fill(Map("A" -> "unknown", "B" -> 1.0)).show()```

| age        | height           | name  |
| ------------- |:-------------:| -----:|
| 10      | 80 | Alice |
| 5      | null      |   Bob |
| 50 | null      |    unknown |

Now we use this function and fill all the missing values in the content_size field with 0!

In [27]:
val logsDF3 = logsDF.na.fill(Map("content_size" -> 0))

[36mlogsDF3[39m: [32mDataFrame[39m = [host: string, timestamp: string ... 5 more fields]

Now assuming everything we have done so far worked, we should have no missing values \ nulls in our dataset. Let's verify this!

In [None]:
val exprs = logsDF.select(logsDF4.columns map (c => countNull(col(c)).alias(c)): _*)
exprs.show

We can see that there's no missing values.

##### Handling Temporal Fields (Timestamp)
Now that we have a clean, parsed DataFrame, we have to parse the timestamp field into an actual timestamp. The Common Log Format time is somewhat non-standard.

First let's check the format the timestamp in our dataset is;

In [None]:
val checkTime = logsDF4.select(logsDF4.col("timestamp"))
checkTime.take(5).foreach(println(_))

*Instead of using a user-defined function for converting the date (as used in the python version by [dipanjanS](https://github.com/dipanjanS/data_science_for_all/blob/master/tds_scalable_log_analytics/Scalable_Log_Analytics_Spark.ipynb)), we use the `unix_timestamp` function available through Spark `import org.apache.spark.sql.functions._`*

In [None]:
val finalLogsDF = logsDF.select(logsDF4.col("*"), unix_timestamp(logsDF4("timestamp"), "dd/MMM/yyyy:HH:mm:ss").cast("timestamp").alias("time")).drop("timestamp")
finalLogsDF.show(10)

Let's now cache logs_df since we will be using it extensively for our data analysis section in the next part!

In [None]:
finalLogsDF.printSchema()

### Part 4 - Data Analysis on our Web Logs