In [1]:
# build the sc and spark objects of pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
conf = SparkConf()
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import re
import datetime
from pyspark.sql import Row
import os
import sys
from test_helper import Test
import matplotlib.pyplot as plt

month_map = {'Jan':1, 'Feb':2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, 'Aug':8, 'Sep':9, 'Oct':10, 'Nov':11, 'Dec':12}

# Convert Apache time format into a Python datetime object
def parse_apache_time(s):
    return datetime.datetime(int(s[7:11]), month_map[s[3:6]], int(s[0:2]), int(s[12:14]), int(s[15:17]), int(s[18:20]))

# A regular expression pattern to extract fields from the log line
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s?" (\d{3}) (\S+)'

def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Args:
        logline (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        return (logline, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = int(0)
    else:
        size = int(size_field)
    return (Row(
        host          = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = parse_apache_time(match.group(4)),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = size), 1)

logFile = "hdfs://192.168.242.128:9000/input/access_log_Jul95"
parsed_logs = sc.textFile(logFile).map(parseApacheLogLine).cache()
access_logs = parsed_logs.filter(lambda s: s[1] == 1).map(lambda s: s[0]).cache()
failed_logs = parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0])
parsed_logs_count = parsed_logs.count()
access_logs_count = access_logs.count()
failed_logs_count = failed_logs.count()
if failed_logs_count > 0:
    print('Number of invalid logline: %d' % failed_logs_count)
    for line in failed_logs.take(20):
        print(line)

print('Read %d lines, successfully parsed %d lines, failed to parse %d lines' %(parsed_logs_count, access_logs_count, failed_logs_count))


Number of invalid logline: 849
204.120.229.63 - - [01/Jul/1995:04:29:05 -0400] "GET /history/history.html                                                 hqpao/hqpao_home.html HTTP/1.0" 200 1502
nccse.gsfc.nasa.gov - - [01/Jul/1995:07:36:13 -0400] "GET /shuttle/missions/missions.html Shuttle Launches from Kennedy Space Center HTTP/1.0" 200 8677
ix-nbw-nj1-22.ix.netcom.com - - [01/Jul/1995:10:42:09 -0400] "GET /finger @net.com HTTP/1.0" 404 -
gpotterpc.llnl.gov - - [01/Jul/1995:22:26:51 -0400] "GET /htbin/wais.pl?orbit sts71 HTTP/1.0" 200 317
wxs6-7.worldaccess.nl - - [02/Jul/1995:08:09:27 -0400] "GET / /   HTTP/1.0" 200 7074
wxs6-7.worldaccess.nl - - [02/Jul/1995:08:11:20 -0400] "GET / /facts/facts.html HTTP/1.0" 200 7074
s29.abqslip.indirect.com - - [02/Jul/1995:14:24:26 -0400] "GET /htbin/wais.pl?Wake Shield HTTP/1.0" 200 7020
pipe3.nyc.pipeline.com - - [02/Jul/1995:22:24:41 -0400] "GET /shuttle/countdown/dy �?shuttle%20tracking HTTP/1.0" 404 -
pipe3.nyc.pipeline.com - - [02/Jul/1995