# Spark - Schema on Read: Loading NASA Weblog

This is a walkthrough to show diffrent Spark commands and also show some common issues / errors. 
The purpose is to demonstrate how we can work with unstructured data without loading it into a database first.

## Import libraries, load data
Data Source: http://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [2]:
spark = SparkSession.builder.getOrCreate()

In [4]:
# Read in as text
dfLog = spark.read.text("data/NASA_access_log_Jul95.gz")

## Quick inspection of  data

In [5]:
# See schema (it's one big string)
dfLog.printSchema()

root
 |-- value: string (nullable = true)



In [8]:
# Count the number of lines
dfLog.count()

1891715

In [6]:
# What's in there? Show 5 top rows
dfLog.show(5)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [7]:
# Try to see a little more?
dfLog.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [10]:
# Hard to read ... pandas to the rescue
pd.set_option('max_colwidth', 200)
dfLog.limit(5).toPandas()  

# NOTE: In this case call .limit() before .toPandas(), so you load only the first five lines 
# into the dataframe. You coud also do the other way round but then all data would be loaded.

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"


## Parse the data
### First attempt:  simple parsing with split

This won't help much ...

In [9]:
from pyspark.sql.functions import split
dfArrays = dfLog.withColumn("tokenized", split("value"," "))
dfArrays.limit(10).toPandas()

Unnamed: 0,value,tokenized
0,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,"[199.72.81.55, -, -, [01/Jul/1995:00:00:01, -0..."
1,unicomp6.unicomp.net - - [01/Jul/1995:00:00:06...,"[unicomp6.unicomp.net, -, -, [01/Jul/1995:00:0..."
2,199.120.110.21 - - [01/Jul/1995:00:00:09 -0400...,"[199.120.110.21, -, -, [01/Jul/1995:00:00:09, ..."
3,burger.letters.com - - [01/Jul/1995:00:00:11 -...,"[burger.letters.com, -, -, [01/Jul/1995:00:00:..."
4,199.120.110.21 - - [01/Jul/1995:00:00:11 -0400...,"[199.120.110.21, -, -, [01/Jul/1995:00:00:11, ..."
5,burger.letters.com - - [01/Jul/1995:00:00:12 -...,"[burger.letters.com, -, -, [01/Jul/1995:00:00:..."
6,burger.letters.com - - [01/Jul/1995:00:00:12 -...,"[burger.letters.com, -, -, [01/Jul/1995:00:00:..."
7,205.212.115.106 - - [01/Jul/1995:00:00:12 -040...,"[205.212.115.106, -, -, [01/Jul/1995:00:00:12,..."
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""...","[d104.aa.net, -, -, [01/Jul/1995:00:00:13, -04..."
9,129.94.144.152 - - [01/Jul/1995:00:00:13 -0400...,"[129.94.144.152, -, -, [01/Jul/1995:00:00:13, ..."


### Second attempt: Let's build a custom parsing UDF 

We are using the standard regex pattern for weblogs (you can google this one).

In [10]:
from pyspark.sql.functions import udf

@udf
def parseUDF(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [11]:
# Let's start from the beginning
dfParsed= dfLog.withColumn("parsed", parseUDF("value"))
dfParsed.limit(10).toPandas()

Unnamed: 0,value,parsed
0,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,"{response_code=200, protocol=HTTP/1.0, endpoin..."
1,unicomp6.unicomp.net - - [01/Jul/1995:00:00:06...,"{response_code=200, protocol=HTTP/1.0, endpoin..."
2,199.120.110.21 - - [01/Jul/1995:00:00:09 -0400...,"{response_code=200, protocol=HTTP/1.0, endpoin..."
3,burger.letters.com - - [01/Jul/1995:00:00:11 -...,"{response_code=304, protocol=HTTP/1.0, endpoin..."
4,199.120.110.21 - - [01/Jul/1995:00:00:11 -0400...,"{response_code=200, protocol=HTTP/1.0, endpoin..."
5,burger.letters.com - - [01/Jul/1995:00:00:12 -...,"{response_code=304, protocol=HTTP/1.0, endpoin..."
6,burger.letters.com - - [01/Jul/1995:00:00:12 -...,"{response_code=200, protocol=HTTP/1.0, endpoin..."
7,205.212.115.106 - - [01/Jul/1995:00:00:12 -040...,"{response_code=200, protocol=HTTP/1.0, endpoin..."
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""...","{response_code=200, protocol=HTTP/1.0, endpoin..."
9,129.94.144.152 - - [01/Jul/1995:00:00:13 -0400...,"{response_code=200, protocol=HTTP/1.0, endpoin..."


In [12]:
# That would look good, but we actually did not return a dict as we intended ... it's a string ...
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: string (nullable = true)



### Third attempt: let's fix our UDF

We have to define the output tye see wrapper - the rest stays the same.

In [13]:
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def parseUDFbetter(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [14]:
# This is the same as before
dfParsed= dfLog.withColumn("parsed", parseUDFbetter("value"))
dfParsed.limit(2).toPandas()

Unnamed: 0,value,parsed
0,199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ...,"{'response_code': '200', 'protocol': 'HTTP/1.0..."
1,unicomp6.unicomp.net - - [01/Jul/1995:00:00:06...,"{'response_code': '200', 'protocol': 'HTTP/1.0..."


In [15]:
# Bingo! We got a column of type map with the fields parsed
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



## Build separate columns

We use Sparks selectExpr function: Select from a DataFrame using a set of SQL expressions.

https://spark.apache.org/docs/1.6.0/api/R/selectExpr.html

In [18]:
# Try one column
dfParsed.selectExpr("parsed['host'] as host").limit(5).show(5)

+--------------------+
|                host|
+--------------------+
|        199.72.81.55|
|unicomp6.unicomp.net|
|      199.120.110.21|
|  burger.letters.com|
|      199.120.110.21|
+--------------------+



In [19]:
# Do so for all keys
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [ "parsed['{}'] as {}".format(field,field) for field in fields]
exprs

["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

In [20]:
dfClean = dfParsed.selectExpr(*exprs)
dfClean.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179


## Analzye Data

In [21]:
# Show most popular hosts

from pyspark.sql.functions import desc
dfClean.groupBy("host").count().orderBy(desc("count")).limit(5).toPandas()

Unnamed: 0,host,count
0,piweba3y.prodigy.com,17572
1,piweba4y.prodigy.com,11591
2,piweba1y.prodigy.com,9868
3,alyssa.prodigy.com,7852
4,siltb10.orl.mmc.com,7573


In [22]:
# Show largest files -- ups, the size is still stored as string ...

dfClean.createOrReplaceTempView("cleanlog")
spark.sql("""
select endpoint, content_size
from cleanlog 
order by content_size desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/images/cdrom-1-95/img0007.jpg,99981
1,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
2,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
3,/history/apollo/apollo-13/images/index.gif,99942
4,/history/apollo/apollo-13/images/index.gif,99942
5,/history/apollo/apollo-13/images/index.gif,99942
6,/history/apollo/apollo-13/images/index.gif,99942
7,/history/apollo/apollo-13/images/index.gif,99942
8,/history/apollo/apollo-13/images/index.gif,99942
9,/history/apollo/apollo-13/images/index.gif,99942


In [23]:
# Cast the content size to int ...
from pyspark.sql.functions import expr
dfCleanTyped = dfClean.withColumn("content_size_bytes", expr("cast(content_size  as int)"))
dfCleanTyped.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,content_size_bytes
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,4179


In [None]:
# ... and show again
dfCleanTyped.createOrReplaceTempView("cleantypedlog")
spark.sql("""
select endpoint, content_size
from cleantypedlog 
order by content_size_bytes desc
""").limit(10).toPandas()

In [196]:
# Left for you, clean the date column :)
# 1- Create a udf that parses that weird format,
# 2- Create a new column with a data tiem string that spark would understand
# 3- Add a new date-time column properly typed
# 4- Print your schema