# Exploring a public web log data set using PySpark

This notebook uses Apache Spark to explore a publicly available web log data set. 

This is the first part, which does ETL - reads the raw log file, parses each records, and loads it into columnar Parquet format. Then the parquet data store will be available for further analysis. 

The dataset is available from [The Internet Traffic Archive](http://ita.ee.lbl.gov/index.html).

#### Dataset source: http://ita.ee.lbl.gov/html/contrib/Sask-HTTP.html

#### Dataset description:

This trace contains seven months’ worth of all HTTP requests to the University of Saskatchewan's WWW server. The University of Saskatchewan is located in Saskatoon, Saskatchewan, Canada.

The log contains over **_2,400,000_** lines from **_June to December 1995_** - the early days of the World Wide Web.

## Part 1: Parse the log file and load its contents into parquet format.

### Initialize the Apache Spark context & config

In [1]:
## Initialize
#
import findspark
import os
findspark.init()

import pyspark
sc = pyspark.SparkContext()

In [2]:
# Check the Spark context & config
#
sc.version, sc._conf.getAll()

(u'1.5.0',
 [(u'spark.driver.memory', u'6g'),
  (u'spark.rdd.compress', u'True'),
  (u'spark.master', u'local[8]'),
  (u'spark.serializer.objectStreamReset', u'100'),
  (u'spark.submit.deployMode', u'client'),
  (u'spark.app.name', u'pyspark-shell')])

### Python imports

In [3]:
import re
import math
import time, datetime
import os
import operator as op
from pyspark.sql import Row, SQLContext
from pyspark import *

### Data file location settings

In [4]:
DATA_DIR = './data'
LOG_FILE_NAME = 'UofS_access_log'

logFileName = os.path.join(DATA_DIR, LOG_FILE_NAME)
print logFileName

./data/UofS_access_log


In [5]:
# Using the Unix wc command to get the number of lines in the log file
#
!wc -l ./data/UofS_access_log

2408625 ./data/UofS_access_log


### Regular expression to use for parsing a log line

In [6]:
p_log_line = re.compile(r'^(\S+)\s(\S+)\s(\S+)\s\[(\d*)/([A-Za-z]+)/(\d+):(\d*):(\d*):(\d*) .*\]\s"([A-Z]+) +(\S+)[^"]*" (\d+) (\S+)')

### Parse log  line function

In [7]:
month2number_map = {
    'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12 
}

def parse_log_line(line):
    m = p_log_line.match(line)
    if m:
        dtm = datetime.datetime(int(m.group(6)),
                             month2number_map[m.group(5)],
                             int(m.group(4)),
                             int(m.group(7)),
                             int(m.group(8)),
                             int(m.group(9)))
        date = datetime.date(dtm.year, dtm.month, dtm.day)
        
        epo_seconds = (dtm - datetime.datetime(1970,1,1)).total_seconds()
        respsize = 0
        if m.group(13) != '-':
            respsize = int(m.group(13))
        return( Row(
            remote_host = m.group(1), # host name or IP, where the request comes from
            user_id     = m.group(3), 
            date_time   = dtm,
            week_day    = dtm.weekday() + 1,
            epo_seconds = epo_seconds,
            http_verb   = m.group(10), 
            url         = m.group(11), 
            http_status = int(m.group(12)), 
            resp_size   = respsize, 
        ), 1)             ### line parsed successfully 
    return (line, 0)      ### not parsed successfully     


### Check for lines that failed to parse

In [8]:
## Uncomment this if need to check for lines that failed to parse.
## Some 45 lines won't parse successfully. Looking at them, 
## they have rally strange format. Let's say we can do without them.
#
# not_parsed_rdd = (sc
#                    .textFile(logFileName)
#                    .map(parse_log_line)
#                    .filter(lambda s: s[1] == 0)
#                    .map(lambda s: s[0]) )

# print "not parsed: ", not_parsed_rdd.count(), ' lines'
# for x in not_parsed_rdd.take(10):
#     print x

### Get the successfully parsed lines into a RDD.  

Then check out the number of lines and what the result looks like.

In [9]:
parsed_rdd = (sc
               .textFile(logFileName)
               .map(parse_log_line)
               .filter(lambda s: s[1] == 1)
               .map(lambda s: s[0])
               .persist(StorageLevel.MEMORY_AND_DISK))
%time count_parsed = parsed_rdd.count()
print "parsed successfully: ", count_parsed, ' lines'
print parsed_rdd.first()

CPU times: user 6.93 ms, sys: 1.84 ms, total: 8.77 ms
Wall time: 29.2 s
parsed successfully:  2408580  lines
Row(date_time=datetime.datetime(1995, 6, 1, 0, 0, 59), epo_seconds=801964859.0, http_status=200, http_verb=u'GET', remote_host=u'202.32.92.47', resp_size=271, url=u'/~scottp/publish.html', user_id=u'-', week_day=4)


### Now we will save the log data in parquet format for further analysis. 
#### This is to be done only once. It will give error if the "file" is already there.

In [10]:
## Save to parquet format
## Do this only once; it will give error if the "file" is already there.
#
sqlContext = SQLContext(sc)
weblogDf   = sqlContext.createDataFrame(parsed_rdd)
#
weblogDf.write.parquet("data/UofS_access_log.parquet")
weblogDf.printSchema()

root
 |-- date_time: timestamp (nullable = true)
 |-- epo_seconds: double (nullable = true)
 |-- http_status: long (nullable = true)
 |-- http_verb: string (nullable = true)
 |-- remote_host: string (nullable = true)
 |-- resp_size: long (nullable = true)
 |-- url: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- week_day: long (nullable = true)



### Load done.  Will read the data and explore it further in another notebook.