In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [2]:
spark = SparkSession.builder.getOrCreate()

# Loading dataset

In [3]:
df = spark.read.text('data/NASA_access_log_Jul95.gz')

In [4]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [5]:
pd.set_option('display.max_colwidth', 200)

In [6]:
df.limit(2).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"


In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType

# Parsing

In [8]:
@udf(MapType(StringType(),StringType()))
def parseUDFbetter(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [9]:
dfParsed = df.withColumn('parsed', parseUDFbetter('value'))
dfParsed.limit(5).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/liftoff.html', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', 'user_id': '-', 'ho..."
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'content_size': '4179', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', ..."


# Building separate columns

In [10]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [ "parsed['{}'] as {}".format(field,field) for field in fields]
exprs

["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

In [11]:
dfClean = dfParsed.selectExpr(*exprs)
dfClean.limit(2).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985


# Converting date_time column to timestamp format
- below is the function that the mentor gave me

In [12]:
from pyspark.sql.functions import *

In [15]:
dfCleanDate = dfClean.withColumn('date_time2', to_timestamp('date_time', "dd/MMM/yyyy:HH:mm:ss Z"))

In [16]:
dfCleanDate.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- content_size: string (nullable = true)
 |-- date_time2: timestamp (nullable = true)



In [17]:
dfCleanDate.limit(2).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,date_time2
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,1995-07-01 04:00:01
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,1995-07-01 04:00:06
