# Exercise 1: Schema on Read

In [2]:
import findspark

In [4]:
# path to spark file
findspark.init("/usr/local/Cellar/apache-spark/3.0.1/libexec")

In [5]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [6]:
spark = SparkSession.builder.getOrCreate()

# Load the dataset

In [7]:
#Data Source: http://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
dfLog = spark.read.text("NASA_access_log_Jul95.gz")

# Quick inspection of  the data set

In [8]:
# see the schema
dfLog.printSchema()

root
 |-- value: string (nullable = true)



In [9]:
# number of lines
dfLog.count()

1891715

In [10]:
#what's in there? 
dfLog.show(5)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [11]:
#Show first 5 rows
dfLog.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [12]:
#Use Pandas to widen columns
pd.set_option('max_colwidth', 200)
dfLog.limit(5).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"


# Let' try simple parsing with split

In [13]:
from pyspark.sql.functions import split

In [18]:
# split using a space as delimiter, name the column tokenized and show top 10
dfLog.withColumn("tokenized", split("value", " ")).limit(10).toPandas()

Unnamed: 0,value,tokenized
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","[199.72.81.55, -, -, [01/Jul/1995:00:00:01, -0400], ""GET, /history/apollo/, HTTP/1.0"", 200, 6245]"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[unicomp6.unicomp.net, -, -, [01/Jul/1995:00:00:06, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","[199.120.110.21, -, -, [01/Jul/1995:00:00:09, -0400], ""GET, /shuttle/missions/sts-73/mission-sts-73.html, HTTP/1.0"", 200, 4085]"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/countdown/liftoff.html, HTTP/1.0"", 304, 0]"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","[199.120.110.21, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/missions/sts-73/sts-73-patch-small.gif, HTTP/1.0"", 200, 4179]"
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /images/NASA-logosmall.gif, HTTP/1.0"", 304, 0]"
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/video/livevideo.gif, HTTP/1.0"", 200, 0]"
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","[205.212.115.106, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/countdown.html, HTTP/1.0"", 200, 3985]"
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[d104.aa.net, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","[129.94.144.152, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /, HTTP/1.0"", 200, 7074]"


# Second attempt, let's build a custom parsing UDF 

In [None]:
# regex for web logs
# ^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S+)?\s*" (\d{3}) (\S+)

In [23]:
from pyspark.sql.functions import udf

@udf
def parseUDF(line):
    import re
    #create patterns
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S+)?\s*" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    
    #if no match, return line:0 
    if match is None:
        return (line,0)
    
    #set size_field to match.group(9)
    size_field = match.group(9)
    
    #if size field is missing, set to 0. Otherwise set to value
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
        
    #return dictionary of items
    return {
        "host" : match.group(1),
        "client_identd" : match.group(2),
        "user_id" : match.group(3),
        "date_time" : match.group(4),
        "method" : match.group(5),
        "endpoint" : match.group(6),
        "protocol" : match.group(7),
        "reponse_code" : int(match.group(8)),
        "content_size" : size
    }

In [21]:
dfParsed = dfLog.withColumn("parsed", parseUDF("value"))

In [22]:
dfParsed.limit(10).toPandas()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/Cellar/apache-spark/3.0.1/libexec/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-20-8da4be9e5571>", line 17, in parseUDF
NameError: name 'NONE' is not defined


In [None]:
dfParsed.printSchema()

# Third attempt, let's fix our UDF

In [None]:
#from pyspark.sql.functions import udf # already imported
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def parseUDFbetter(line):
    # Todo

In [None]:
#Let's start from the beginning
# Todo

In [None]:
#Let's start from the beginning
# Todo

In [None]:
#Bingo!! we'got a column of type map with the fields parsed
dfParsed.printSchema()

In [None]:
dfParsed.select("parsed").limit(10).toPandas()

# Let's build separate columns

In [None]:
dfParsed.selectExpr("parsed['host'] as host").limit(5).show(5)

In [None]:
dfParsed.selectExpr(["parsed['host']", "parsed['date_time']"]).show(5)

In [None]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = # Todo
exprs


In [None]:
dfClean = # Todo
dfClean.limit(5).toPandas()

## Popular hosts

In [None]:
from pyspark.sql.functions import desc
# Todo

## Popular content

In [None]:
from pyspark.sql.functions import desc
# Todo

## Large Files

In [None]:
dfClean.createOrReplaceTempView("cleanlog")
spark.sql("""
select endpoint, content_size
from cleanlog 
order by content_size desc
""").limit(10).toPandas()

In [None]:
from pyspark.sql.functions import expr
dfCleanTyped = # Todo

In [None]:
dfCleanTyped.createOrReplaceTempView("cleantypedlog")
spark.sql("""
select endpoint, content_size
from cleantypedlog 
order by content_size_bytes desc
""").limit(10).toPandas()

In [None]:
# Left for you, clean the date column :)
# 1- Create a udf that parses that weird format,
# 2- Create a new column with a data tiem string that spark would understand
# 3- Add a new date-time column properly typed
# 4- Print your schema