In [1]:
import findspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import split
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType,TimestampType, IntegerType

import pandas as pd
import matplotlib
import re
import datetime

## Start the Session

In [2]:
findspark.init("/home/kbaafi/spark")
spark = SparkSession.builder.getOrCreate()

## Load the Dataset
Use spark to load the compressed file into a dataframe

In [3]:
#Data Source: ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
data_path = "in-data/nasa-weblogs/NASA_access_log_Jul95.gz"
header_option, header_value = ("header","false")
dfLog = spark.read.format("csv").option(header_option,header_value).load(data_path).withColumnRenamed("_c0","value")

In [4]:
pd.set_option('max_colwidth', 250)
dfLog.select('value').limit(2).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"


## Parsing the data
We need to parse this data into:
    * host
    * date time
    * request-method
    * endpoint
    * protocol
    * response
    * user id
    * content-size
    * client id
we have two functions that can do the same job: **parseUDF** and **parseUDFv2**. The difference between these two functions is that one returns an unqueryable string and the other returns a queryable dictionary. **parseUDFv2** is better because it allows us to separate out all the fields in each line. We will see more of this later 

In [5]:
@udf
def parseUDF(line):
    
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }



@udf(MapType(StringType(),StringType()))
def parseUDFv2(line):
    
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [6]:
dfParsed= dfLog.withColumn("parsed", parseUDFv2("value"))
dfParsed.limit(2).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72.81.55', 'client_identd': '-'}"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'unicomp6.unicomp.net', 'client_identd': '-'}"


#### Now we can select the data we want from the **parsed** dictionary field like so:

In [7]:
dfParsed.selectExpr(["parsed['host'] as host", "parsed['date_time'] as date"]).show(2)

+--------------------+--------------------+
|                host|                date|
+--------------------+--------------------+
|        199.72.81.55|01/Jul/1995:00:00...|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|
+--------------------+--------------------+
only showing top 2 rows



### Extracting the fields

In [8]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [ "parsed['{}'] as {}".format(field,field) for field in fields]
exprs

["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

#### Using our generated expression we can create a new tabular dataframe as shown below:

In [9]:
dfClean = dfParsed.selectExpr(*exprs)
dfClean.limit(2).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985


### Cleaning the datetime field
We need a UDF for selecting extracting the datetime in an appropriate format

In [10]:
@udf (TimestampType())
def parseTimeUdf(time_str):
    if time_str is None:
        return None
    time_str = time_str.split("-")[0].strip()
    format_str = "%d/%b/%Y:%H:%M:%S"
    dt = datetime.datetime.strptime(time_str,format_str)
    return dt

In [11]:
dfClean = dfClean.withColumn("date_time_2",parseTimeUdf("date_time"))

In [12]:
dfClean.limit(2).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,date_time_2
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,1995-07-01 00:00:01
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,1995-07-01 00:00:06


In [13]:
dfClean.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- content_size: string (nullable = true)
 |-- date_time_2: timestamp (nullable = true)



## Now some analysis
At this point, the data engineer has done his work. Data analysts and scientists can pick up from here. The dataframe can be saved as a parquet file / redshift table / (whatever) for later use

I prefer to use spark sql for analyis so let's register a temp table or view

In [19]:
dfClean = dfClean.withColumn("cs",dfClean.content_size.cast(IntegerType()))\
                    .drop("content_size") \
                    .withColumnRenamed("cs","content_size")
dfClean.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- date_time_2: timestamp (nullable = true)
 |-- content_size: integer (nullable = true)



**Before analysis lets save to parquet**

In [15]:
file_path = "out-data/nasa-weblogs-clean/nasa_clean_logs.parquet"
dfClean.write.parquet(file_path)

In [20]:
dfClean = spark.read.parquet(file_path)
dfClean.createOrReplaceTempView("weblogs")

In [25]:
dfClean.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- date_time_2: timestamp (nullable = true)
 |-- content_size: integer (nullable = true)



#### * Top 10 inbound hosts

In [22]:
spark.sql("""
        select host,count(host) as count
        from weblogs
        group by host
        order by count desc
        limit 10
""").toPandas()

Unnamed: 0,host,count
0,piweba3y.prodigy.com,16955
1,piweba4y.prodigy.com,11198
2,piweba1y.prodigy.com,9472
3,alyssa.prodigy.com,7580
4,siltb10.orl.mmc.com,7573
5,piweba2y.prodigy.com,5731
6,edams.ksc.nasa.gov,5357
7,163.206.89.4,4782
8,news.ti.com,4725
9,disarray.demon.co.uk,4213


#### * Top 10 popular content

In [23]:
spark.sql("""
        select endpoint,count(endpoint) as count
        from weblogs
        group by endpoint
        order by count desc
        limit 10
""").toPandas()

Unnamed: 0,endpoint,count
0,/images/NASA-logosmall.gif,111328
1,/images/KSC-logosmall.gif,89636
2,/images/MOSAIC-logosmall.gif,60466
3,/images/USA-logosmall.gif,60012
4,/images/WORLD-logosmall.gif,59487
5,/images/ksclogo-medium.gif,58800
6,/images/launch-logo.gif,40869
7,/shuttle/countdown/,40277
8,/ksc.html,40225
9,/images/ksclogosmall.gif,33583


#### * Top 10 largest files downloaded

In [24]:
spark.sql("""
        select endpoint,content_size
        from weblogs
        order by content_size desc
        limit 10
""").toPandas()

Unnamed: 0,endpoint,content_size
0,/shuttle/countdown/video/livevideo.jpeg,6823936
1,/statistics/1995/bkup/Mar95_full.html,3155499
2,/statistics/1995/bkup/Mar95_full.html,3155499
3,/statistics/1995/bkup/Mar95_full.html,3155499
4,/statistics/1995/bkup/Mar95_full.html,3155499
5,/statistics/1995/bkup/Mar95_full.html,3155499
6,/statistics/1995/bkup/Mar95_full.html,3155499
7,/statistics/1995/bkup/Mar95_full.html,3155499
8,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
9,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
