# NASA webserver Log Analytics using Apache Spark

Apache Spark is an excellent and ideal framework for wrangling, analyzing and modeling on structured and unstructured data - at scale! In this notebook, we will be focusing on one of the most popular case studies in the industry - log analytics.

Typically, server logs are a very common data source in enterprises and often contain a gold mine of actionable insights and information. Log data comes from many sources in an enterprise, such as the web, client and compute servers, applications, user-generated content, flat files. They can be used for monitoring servers, improving business and customer intelligence, building recommendation systems, fraud detection, and much more.

Spark allows you to dump and store your logs in files on disk cheaply, while still providing rich APIs to perform data analysis at scale. This hands-on will show you how to use Apache Spark on real-world production logs from NASA and learn data wrangling and basic yet powerful techniques in exploratory data analysis.


# FILL REST OF NOTEBOOK LATER

## Step 1: Setting up the Dependencies

In [1]:
# Initialize FindSpark
import findspark
findspark.init()

In [2]:
# Load up Spark and fire up its session variables
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
    
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [3]:
# Load up other dependencies
import re
import pandas as pd

### Test basic Regular Expressions

In [14]:
m = re.finditer(r'.*?(spark).*?', "I'm searching for a spark in PySpark", re.I)
for match in m:
    print(match, match.start(), match.end())

<re.Match object; span=(0, 25), match="I'm searching for a spark"> 0 25
<re.Match object; span=(25, 36), match=' in PySpark'> 25 36


## Part 2 - Loading and Viewing the NASA Log Dataset
Given that our data is stored in the following mentioned path, let's load it into a DataFrame. We'll do this in steps. First, we'll use sqlContext.read.text() or spark.read.text() to read the text file. This will produce a DataFrame with a single string column called value.

In [4]:
import glob

raw_data_files = glob.glob('*.gz')
raw_data_files

['NASA_access_log_Jul95.gz']

In [5]:
base_df = spark.read.text(raw_data_files)
base_df.printSchema()

print((base_df.count(), len(base_df.columns)))

root
 |-- value: string (nullable = true)

(1891715, 1)


In [18]:
type(base_df)

pyspark.sql.dataframe.DataFrame

**You can also convert DataFrame to RDD if needed**

It is a simple one step process by calling the *.rdd* method on the dataframe.

In [19]:
base_df_rdd = base_df.rdd
type(base_df_rdd)

pyspark.rdd.RDD

### Check preview sample of the Data

This will allow us to visualize what aspects of the data need to be parsed and wrangled.

In [20]:
base_df.show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
|burger.letters.com - - 

In [6]:
# Extract and take a look at Sample Logs

sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245',
 'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085',
 'burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0',
 '199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179',
 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0',
 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0',
 '205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985',
 'd104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '129.94.144.152 - - [01/Jul/

In [6]:
# Building regular expression sets for lookup
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'
content_size_pattern = r'\s(\d+)$'



## Putting it all together

Let's now try and leverage all the regular expression patterns we previously built through testing and use the regexp_extract(...) method to build our dataframe with all the log attributes neatly extracted in their own separate columns.

In [7]:
from pyspark.sql.functions import regexp_extract

logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+--------------------+--------------------+------+--------------------+--------+------+------------+
|                host|           timestamp|method|            endpoint|protocol|status|content_size|
+--------------------+--------------------+------+--------------------+--------+------+------------+
|        199.72.81.55|01/Jul/1995:00:00...|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   200|    

## Finding Missing Values

Missing and null values are the bane of data analysis and machine learning. Let's see how well our data parsing and extraction logic worked. First, let's verify that there are no null rows in the original dataframe.

In [9]:
(base_df
    .filter(base_df['value']
                .isNull())
    .count())

0

#### Zero null rows in Logs

There were ZERO null rows in our log file which means we have atleast something captured in every row. However, we could still have some null columns in our dataframe as all roles does not neccessarily have all column values.

To test this, use:

In [57]:
logs_df.filter(logs_df['endpoint'].isNull()).count()

0

In [8]:
bad_rows_df = logs_df.filter(logs_df['host'].isNull()| 
                             logs_df['timestamp'].isNull() | 
                             logs_df['method'].isNull() |
                             logs_df['endpoint'].isNull() |
                             logs_df['status'].isNull() |
                             logs_df['content_size'].isNull()|
                             logs_df['protocol'].isNull())
bad_rows_df.count()

19727

### Identifying missing values in each column

Here we check the total number of missing values in each column. We can do so in multiple methods using Spark. Most common ones are given below:

1. UDF and agg() method
2. Mixed use of SQL operators and isNull() function

In [10]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import count, isnan, lit, when

# Method 1: Use a UDF to build expressions for each column and then use agg() method to execute it.
def count_null(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.
exprs = [count_null(col_name) for col_name in logs_df.columns]

# Run the aggregation. The *exprs converts the list of expressions into variable function arguments.
logs_df.agg(*exprs).show()

# Method 2: Use the isnan() and isNull() functions paired with other sql functions of spark
logs_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in logs_df.columns]).show()


+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     1|       19727|
+----+---------+------+--------+--------+------+------------+

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     1|       19727|
+----+---------+------+--------+--------+------+------------+



From what we uncovered here, it appears that only 1 value is missing in 'status' column while a huge number is missing from the 'content_size' column. In next step, we will observe the issue further.

## Handling NULLs in HTTP Status column

In our parsing of logs, we analyzed them and developed a regular expression to identify the STATUS values from logs. And the expression was:

**regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer').alias('status')**

There are two possiblities for the error:
1. There is more to status code pattern that we missed.
2. The data point is bad.

So in order to find which values are not matching this regular expression, we can use the inverse method, i.e., extract where column value does-not match the defined regular expression.
> **Logic:** Filter the dataframe where the defined pattern of STATUS Regular Expression DOES-NOT match. In the expression below, ~ means "not".

In [11]:
#use rlike() on dataframe to find
null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
null_status_df.count()

1

In [12]:
null_status_df.show()

+--------+
|   value|
+--------+
|alyssa.p|
+--------+



In [42]:
base_df.show(10)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
|burger.letters.co...|
|burger.letters.co...|
|205.212.115.106 -...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
+--------------------+
only showing top 10 rows



In [55]:
from pyspark.sql import functions as F

# F.array(F.lit("Retail"), F.lit("SME"),)

F.array(
    F.lit("Retail"),
    F.lit("SME"),
    F.lit("Cor"),
  ).getItem(
    (F.rand()*3).cast("int")
  )

F.rand()*3

Column<b'(rand(4154673701051223795) * 3)'>

In [71]:
from pyspark.sql import functions as F

spark.df.withColumn(
  "business_vertical",
  F.array(
    F.lit("Retail"),
    F.lit("SME"),
    F.lit("Cor"),
  ).getItem(
    (F.rand()*3).cast("int")
  )
)

AttributeError: 'SparkSession' object has no attribute 'df'

In [15]:
Xer = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
Xer.show()

boo = spark.createDataFrame([['']], ['a'])
boo.show()

myValues = [('BAKEL','BAKEL','1 341 2323 01415'),('BAKEL','BAKEL','2 272 7729 00307'),
            ('BAKEL','BAKEL','2 341 1224 00549'),('BAKEL','BAKEL','2 341 1200 01194'),
            ('BAKEL','BAKEL','1 845 0112 101159'),]
df = sqlContext.createDataFrame(myValues,['A','B','Num'])

+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

+---+
|  a|
+---+
|   |
+---+



In [41]:
display((myValues))

[('BAKEL', 'BAKEL', '1 341 2323 01415'),
 ('BAKEL', 'BAKEL', '2 272 7729 00307'),
 ('BAKEL', 'BAKEL', '2 341 1224 00549'),
 ('BAKEL', 'BAKEL', '2 341 1200 01194'),
 ('BAKEL', 'BAKEL', '1 845 0112 101159')]

In [19]:
from pyspark.sql import functions as F

(df.withColumn(
  "business_vertical",
  F.array(
    F.lit("Retail"),
    F.lit("SME"),
    F.lit("Cor"),
  ).getItem(
    (F.rand()*3).cast("int")
  )
)).show()

+-----+-----+-----------------+-----------------+
|    A|    B|              Num|business_vertical|
+-----+-----+-----------------+-----------------+
|BAKEL|BAKEL| 1 341 2323 01415|              SME|
|BAKEL|BAKEL| 2 272 7729 00307|              SME|
|BAKEL|BAKEL| 2 341 1224 00549|              SME|
|BAKEL|BAKEL| 2 341 1200 01194|              Cor|
|BAKEL|BAKEL|1 845 0112 101159|           Retail|
+-----+-----+-----------------+-----------------+



In [121]:
df.show()

+-----+-----+-----------------+
|    A|    B|              Num|
+-----+-----+-----------------+
|BAKEL|BAKEL| 1 341 2323 01415|
|BAKEL|BAKEL| 2 272 7729 00307|
|BAKEL|BAKEL| 2 341 1224 00549|
|BAKEL|BAKEL| 2 341 1200 01194|
|BAKEL|BAKEL|1 845 0112 101159|
+-----+-----+-----------------+



In [120]:
from pyspark.sql.types import *

sample_logs_less = [item['value'] for item in base_df.take(df.count())]

sample_logs_df = sqlContext.createDataFrame(sample_logs_less, StringType())

# UDF for adding a dataframe or a dataframe-column to existing dataframe
def zipDataFrame(l, r):
    return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).toDF(StructType([l.schema[0],r.schema[0]]))

(zipDataFrame(df, sample_logs_df).show())

+-----+--------------------+
|    A|               value|
+-----+--------------------+
|BAKEL|199.72.81.55 - - ...|
|BAKEL|unicomp6.unicomp....|
|BAKEL|199.120.110.21 - ...|
|BAKEL|burger.letters.co...|
|BAKEL|199.120.110.21 - ...|
+-----+--------------------+



In [118]:
sample_logs_df = sqlContext.createDataFrame(sample_logs_less, StringType())

sample_logs_df.show()

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+



In [105]:
minDf = sc.parallelize(['2016-11-01','2016-11-02','2016-11-03']).map(lambda x: (x, )).toDF(['date_min'])
maxDf = sc.parallelize(['2016-12-01','2016-12-02','2016-12-03']).map(lambda x: (x, )).toDF(['date_max'])

from pyspark.sql.types import StructType

def zipDataFrame(l, r):
    return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).toDF(StructType([l.schema[0],r.schema[0]]))

combined = zipDataFrame(minDf, maxDf.select('date_max'))
combined.show()

+----------+----------+
|  date_min|  date_max|
+----------+----------+
|2016-11-01|2016-12-01|
|2016-11-02|2016-12-02|
|2016-11-03|2016-12-03|
+----------+----------+



In [93]:
maxDf.select('date_max')

DataFrame[date_max: string]

In [96]:
myFloatDF.select('val')

DataFrame[val: string]

In [102]:
(minDf.rdd.zip(maxDf.rdd)).map(lambda x: (x[0][0] , x[1][0])).toDF().show()

+----------+----------+
|        _1|        _2|
+----------+----------+
|2016-11-01|2016-12-01|
|2016-11-02|2016-12-02|
|2016-11-03|2016-12-03|
+----------+----------+



In [104]:
(zip_df(minDf, maxDf)).show()

+----------+----------+
|  date_min|  date_max|
+----------+----------+
|2016-11-01|2016-12-01|
|2016-11-02|2016-12-02|
|2016-11-03|2016-12-03|
+----------+----------+



***

# Other Practice Exercises

## Exploring ways to create DataFrame

We can initialize a new DataFrame using flat values
1. parallelize() method
2. createDataFrame() method

### Create DataFrame from existing column

There are three good methods of doing so:

1. createDataFrame() method on LIST type. (pass the TYPE in argument to avoid schema analysis error)
2. convert list to Pandas Dataframe and then use PySpark createDataFrame() [**this avoids the need to define schema**]
3. create RDD from list and then MAP() to column and then to DataFrame.

In [114]:
from pyspark.sql.types import *
from pyspark.sql import Row


# take a subset of base_df, total of 5 rows/tupples
sample_logs_less = [item['value'] for item in base_df.take(5)]
# display (type(sample_logs_less)) # LIST type variable

# METHOD 1 : createDataFrame() method
df_values_m1 = sqlContext.createDataFrame(sample_logs_less, StringType())

df_values_m1.show(truncate=False)
df_values_m1.printSchema()


# METHOD 2 : using Pandas Dataframe to createDataFrame() in PySpark
new_pd_df = pd.DataFrame(sample_logs_less) # Create Pandas Dataframe from list
df_values_m2 = sqlContext.createDataFrame(new_pd_df, ["value"])

df_values_m2.show(truncate=False)
df_values_m2.printSchema()


# METHOD 3 : create RDD out of list, then MAP() to column and use toDF() method to return DataFrame
df_values_Rdd = sc.parallelize(sample_logs_less)
row = Row("value") # define column name
df_values_m3 = df_values_Rdd.map(row).toDF()

df_values_m3.show(truncate=False)
df_values_m3.printSchema()

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------