## NASA Web Server Log Analysis with Apache Spark

### STEP 1 : Introduction and Imports

* We are doing analysis on NASA access log dataset for the month of July and August.


* Folder Structure
    - log-anaysis-using-pyspark
        - dataset
            - log_aug.gz
            - log_jul.gz
        - Notebook.ipynb
        - DockerFile


* log_jul.gz - contains logs for month of July
* log_aug.gz - contains logs for month of August

#### Library Imports

In [1]:
# importing libraries

# SparkSession
from pyspark.sql import SparkSession

# Regular expressions
import re

# Pandas
import pandas as pd

# creating sparkSession
spark = SparkSession.builder \
                    .appName("Log Analysis") \
                    .config("spark.some.config.option", "some-value") \
                    .getOrCreate()
spark

### STEP 2 : Exploratory Data Analysis

#### Loading the log file

In [2]:
# reading data as dataframe
data = spark.read.format("csv") \
               .option("inferSchema", "true") \
               .option("header","false") \
               .load("dataset/*.gz")
print('Data type : ', type(data))

Data type :  <class 'pyspark.sql.dataframe.DataFrame'>


In [3]:
# counting number of records
print('Total rows and columns : ')
print((data.count(), len(data.columns)))

Total rows and columns : 
(3461613, 1)


In [4]:
# renaming default column name
data = data.withColumnRenamed("_c0", "log") 

# viewing schema of dataset
data.printSchema()

root
 |-- log: string (nullable = true)



In [5]:
# viewing dataset
data.show(5, truncate=False)

# details about dataset : host_name, timestamp, request, response_code, data_size

+-----------------------------------------------------------------------------------------------------------------------+
|log                                                                                                                    |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

#### Parsing the log file

In [6]:
# getting sample log for operations
sample_log_data = []
for item in data.take(10):
    sample_log_data.append(item["log"])
print('Sample log data is : \n', sample_log_data)

Sample log data is : 
 ['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245', 'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985', '199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085', 'burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0', '199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179', 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0', 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0', '205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985', 'd104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985', '129.94.144.15

#### Fetching status code

In [7]:
# regex pattern for status code
# \s matches any whitespace character (equal to [\r\n\t\f\v ])
# capture group (\d{3}) \d{3} matches a digit (equal to [0-9]) {3} Quantifier — Matches exactly 3 times
status_code_regx = r'\s(\d{3})\s'
status = []
for item in sample_log_data:
    status.append(re.search(status_code_regx, item).group(1))
print(status)

['200', '200', '200', '304', '200', '304', '200', '200', '200', '200']


#### Fetching content size

In [8]:
# regex pattern for content size
# \s matches any whitespace character (equal to [\r\n\t\f\v ])
# \d+ matches a digit (equal to [0-9])
# + Quantifier — Matches between one and unlimited times, as many times as possible
# $ asserts position at the end of a line
content_size_regx = r'\s(\d+)$'
content_size = []
for item in sample_log_data:
    content_size.append(re.search(content_size_regx, item).group(1))
print(content_size)

['6245', '3985', '4085', '0', '4179', '0', '0', '3985', '3985', '7074']


#### Fetching host name

In [9]:
# regex pattern for host name
# 1st Capturing Group (^.[\S+\.]+\S+)
# ^ asserts position at start of a line . matches any character (except for line terminators)
# Match a single character present in the list below [\S+\.]+
# + Quantifier — Matches between one and unlimited times, as many times as possible
host_name_regx = r'(^\S+\.[\S+\.]+\S+)\s'
host_name = []
for item in sample_log_data:
    host_name.append(re.search(host_name_regx, item).group(1))
print(host_name)

['199.72.81.55', 'unicomp6.unicomp.net', '199.120.110.21', 'burger.letters.com', '199.120.110.21', 'burger.letters.com', 'burger.letters.com', '205.212.115.106', 'd104.aa.net', '129.94.144.152']


#### Fetching time stamp

In [10]:
# regex pattern for time stamp
# 1st Capturing Group (\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})
# \d matches a digit (equal to [0-9])
# \w{3} matches any word character (equal to [a-zA-Z0-9_])
# {3} Quantifier — Matches exactly 3 times
# \d{4} matches a digit (equal to [0-9])
# {4} Quantifier — Matches exactly 4 times
time_stamp_regx = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
time_stamp = []
for item in sample_log_data:
    time_stamp.append(re.search(time_stamp_regx, item).group(1))
print(time_stamp)

['01/Jul/1995:00:00:01 -0400', '01/Jul/1995:00:00:06 -0400', '01/Jul/1995:00:00:09 -0400', '01/Jul/1995:00:00:11 -0400', '01/Jul/1995:00:00:11 -0400', '01/Jul/1995:00:00:12 -0400', '01/Jul/1995:00:00:12 -0400', '01/Jul/1995:00:00:12 -0400', '01/Jul/1995:00:00:13 -0400', '01/Jul/1995:00:00:13 -0400']


#### Fetching methods, urls and protocol

In [11]:
# regex pattern for combined methods, urls and protocol
# 1st Capturing Group (\S+) \S+ matches any non-whitespace character (equal to [^\r\n\t\f\v ]) 
# + Quantifier — Matches between one and unlimited times, as many times as possible
# \s matches any whitespace character 
# \S* matches any non-whitespace character (equal to [^\r\n\t\f\v ])
methods_urls_protocol_regx = r'\"(\S+)\s(\S+)\s*(\S*)\"'
methods_urls_protocol = []
for item in sample_log_data:
    methods_urls_protocol.append(re.search(methods_urls_protocol_regx, item).groups())
print(methods_urls_protocol)

[('GET', '/history/apollo/', 'HTTP/1.0'), ('GET', '/shuttle/countdown/', 'HTTP/1.0'), ('GET', '/shuttle/missions/sts-73/mission-sts-73.html', 'HTTP/1.0'), ('GET', '/shuttle/countdown/liftoff.html', 'HTTP/1.0'), ('GET', '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'HTTP/1.0'), ('GET', '/images/NASA-logosmall.gif', 'HTTP/1.0'), ('GET', '/shuttle/countdown/video/livevideo.gif', 'HTTP/1.0'), ('GET', '/shuttle/countdown/countdown.html', 'HTTP/1.0'), ('GET', '/shuttle/countdown/', 'HTTP/1.0'), ('GET', '/', 'HTTP/1.0')]


#### Combining it together

In [12]:
# importing regex_extract for extraction of regex based data
from pyspark.sql.functions import regexp_extract

processed_data = data.select(regexp_extract('log', host_name_regx, 1).alias('host'),
                            regexp_extract('log', time_stamp_regx, 1).alias('timestamp'),
                            regexp_extract('log', methods_urls_protocol_regx, 1).alias('method'),
                            regexp_extract('log', methods_urls_protocol_regx, 2).alias('endpoint'),
                            regexp_extract('log', methods_urls_protocol_regx, 3).alias('protocol'),
                            regexp_extract('log', status_code_regx, 1).cast('integer').alias('status'),
                            regexp_extract('log', content_size_regx, 1).cast('integer').alias('content_size'))

In [13]:
# viewing new dataframe 
processed_data.show(10, truncate=True)

+--------------------+--------------------+------+--------------------+--------+------+------------+
|                host|           timestamp|method|            endpoint|protocol|status|content_size|
+--------------------+--------------------+------+--------------------+--------+------+------------+
|        199.72.81.55|01/Jul/1995:00:00...|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   200|    

In [14]:
# schema of dataset
processed_data.printSchema()

root
 |-- host: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- content_size: integer (nullable = true)



In [15]:
# count of rows of new dataframe
print('Total rows and columns : ')
print((processed_data.count(), len(processed_data.columns)))

# now we have 7 columns

Total rows and columns : 
(3461613, 7)


### STEP : 3 Data Wrangling

In [16]:
# checking null rows in original dataset
print('Count of null row : ', data.filter(data.log.isNull()).count())

Count of null row :  0


In [17]:
# checking null value for each columns in processed dataset
isnull_rows_df = processed_data.filter(processed_data['host'].isNull()| 
                                       processed_data['timestamp'].isNull() | 
                                       processed_data['method'].isNull() |
                                       processed_data['endpoint'].isNull() |
                                       processed_data['status'].isNull() |
                                       processed_data['content_size'].isNull()|
                                       processed_data['protocol'].isNull())
print('Missing values cell : ', isnull_rows_df.count())

Missing values cell :  94533


In [19]:
# importing col and sum 
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as col_sum

def count_null_cell(col_name):
    return col_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# list of column expressions
all_columns_exp = []
for col_names in processed_data.columns:
    all_columns_exp.append(count_null_cell(col_names))

# converts the list of expressions to variable function arguments.
processed_data.agg(*all_columns_exp).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0| 60679|       94529|
+----+---------+------+--------+--------+------+------------+



In [30]:
# checking null value for status
null_status = processed_data.filter(~data['log'].rlike(status_code_regx))
print('Null value for status code : ', null_status.count())

Null value for status code :  60679


In [34]:
# viewing dataset for null status
null_status_df.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------+
|log                                                                                            |
+-----------------------------------------------------------------------------------------------+
|205.189.154.54 - - [01/Jul/1995:00:01:06 -0400] "GET /cgi-bin/imagemap/countdown?99            |
|remote27.compusmart.ab.ca - - [01/Jul/1995:00:01:53 -0400] "GET /cgi-bin/imagemap/countdown?102|
|onyx.southwind.net - - [01/Jul/1995:00:02:27 -0400] "GET /cgi-bin/imagemap/countdown?103       |
|gater3.sematech.org - - [01/Jul/1995:00:02:41 -0400] "GET /cgi-bin/imagemap/countdown?99       |
|onyx.southwind.net - - [01/Jul/1995:00:03:00 -0400] "GET /cgi-bin/imagemap/countdown?102       |
+-----------------------------------------------------------------------------------------------+
only showing top 5 rows



In [36]:
# removing null status rows
processed_data = processed_data[processed_data['status'].isNotNull()] 
print('Records with null status removed : ', processed_data.count())

Records with null status removed :  3400934


In [37]:
# viewing dataset after wrangling status codem
processed_data.agg(*all_columns_exp).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|       33854|
+----+---------+------+--------+--------+------+------------+



In [39]:
# checking null value for content size
null_content_size = processed_data.filter(~data['log'].rlike(content_size_regx))
print('Null value for content size : ', null_content_size.count())

Null value for content size :  33854


In [40]:
# viewing dataset for null status
null_content_size.show(5, truncate=False)

+------------------------+--------------------------+------+--------------------------------------------------------------------+--------+------+------------+
|host                    |timestamp                 |method|endpoint                                                            |protocol|status|content_size|
+------------------------+--------------------------+------+--------------------------------------------------------------------+--------+------+------------+
|dd15-062.compuserve.com |01/Jul/1995:00:01:12 -0400|GET   |/news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt|HTTP/1.0|404   |null        |
|dynip42.efn.org         |01/Jul/1995:00:02:14 -0400|GET   |/software                                                           |HTTP/1.0|302   |null        |
|ix-or10-06.ix.netcom.com|01/Jul/1995:00:02:40 -0400|GET   |/software/winvn                                                     |HTTP/1.0|302   |null        |
|ix-or10-06.ix.netcom.com|01/Jul/1995:00:03:24

In [43]:
# filling null content size with 0 value
processed_data = processed_data.na.fill({'content_size': 0})
print('Records after replacing it by 0 : ', processed_data.count())

Records after replacing it by 0 :  3400934


In [44]:
# viewing dataset after wrangling content_size
processed_data.agg(*all_columns_exp).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|           0|
+----+---------+------+--------+--------+------+------------+



NOTE : Now after data cleaning there is no missing values in our processed dataset.

#### Changing timestamp field

In [45]:
# viewing schema
processed_data.printSchema()

root
 |-- host: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- content_size: integer (nullable = false)



NOTE : Here timestamp column is of string type, so we have convert it into timestamp type.