In [13]:
import re

from pyspark.sql import Row

In [27]:
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)?\s*(\S+)?\s*(\S+)?\s*" (\d{3}) (\d+|-)'

# Returns a dictionary containing the parts of the Apache Access Log.
def parse_apache_log_line(logline):
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        raise Exception("Invalid logline: %s" % logline)
    return Row(
        ip_address    = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = match.group(4),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = long(match.group(9)) if  match.group(9) != '-' else  0
    )

In [28]:
access_logs = (sc.textFile("sample_access_log")
               .map(parse_apache_log_line)
               .cache())

In [29]:
# Response Code to Count
responseCodeToCount = (access_logs.map(lambda log: (log.response_code, 1))
                       .reduceByKey(lambda a, b : a + b)
                       .take(100))
print "Response Code Counts: %s" % (responseCodeToCount)

Response Code Counts: [(200, 1274), (304, 137), (408, 1), (404, 5), (302, 6), (401, 123)]


In [30]:
# Any IPAddress that has accessed the server more than 10 times.
ipAddresses = (access_logs
               .map(lambda log: (log.ip_address, 1))
               .reduceByKey(lambda a, b : a + b)
               .filter(lambda s: s[1] > 10)
               .map(lambda s: s[0])
               .take(100))
print "IpAddresses that have accessed more then 10 times: %s" % (ipAddresses)

IpAddresses that have accessed more then 10 times: [u'10.0.0.153', u'208-38-57-205.ip.cal.radiant.net', u'mail.geovariances.fr', u'ip68-228-43-49.tc.ph.cox.net', u'proxy0.haifa.ac.il', u'lhr003a.dhl.com', u'200-55-104-193.dsl.prima.net.ar', u'h24-70-69-74.ca.shawcable.net', u'195.246.13.119', u'p213.54.168.132.tisdip.tiscali.de', u'203.147.138.233', u'h24-71-236-129.ca.shawcable.net', u'212.92.37.62', u'216-160-111-121.tukw.qwest.net', u'cr020r01-3.sac.overture.com', u'ns.wtbts.org', u'prxint-sxb3.e-i.net', u'pc3-registry-stockholm.telia.net', u'market-mail.panduit.com', u'ts04-ip92.hevanet.com', u'64.242.88.10', u'128.227.88.79', u'ogw.netinfo.bg', u'2-110.cnc.bc.ca', u'207.195.59.160', u'ts05-ip44.hevanet.com']


In [31]:
# Top Endpoints
topEndpoints = (access_logs
                .map(lambda log: (log.endpoint, 1))
                .reduceByKey(lambda a, b : a + b)
                .takeOrdered(10, lambda s: -1 * s[1]))
print "Top Endpoints: %s" % (topEndpoints)

Top Endpoints: [(u'/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif', 64), (u'/', 47), (u'/twiki/bin/view/Main/WebHome', 41), (u'/icons/mailman.jpg', 37), (u'/icons/gnu-head-tiny.jpg', 37), (u'/icons/PythonPowered.png', 37), (u'/favicon.ico', 28), (u'/robots.txt', 27), (u'/razor.html', 26), (u'/twiki/bin/view/Main/SpamAssassinTaggingOnly', 18)]


In [32]:
# Average Content Size
averageContentSize = (access_logs
                .map(lambda log: log.content_size)
                .mean())
print "Avg Content Size: %s" % (averageContentSize)

Top Endpoints: 7078.54075032
