In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from datetime import datetime
import re

In [19]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Internal Access Log Analysis') \
    .getOrCreate()

sc = spark.sparkContext

In [131]:
# This is the regular expression for Apache webserver access log
ACCESS_LOG_PATTERN = '(\S+) (\S+) (\S+) (\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\s[+\-]\d{4}) "(\S+) (\S+)\s*(\S*)\s*" (\S+) (\S+) "(\S+)" "(\S+)\s*(\S*\s*\S*)" (\S+) (\S+) (\S+)'
DATETIME_RE = '%Y-%m-%d %H:%M:%S %z'

# Returns a Row containing the Apache Access Log info
def parse_apache_log_line(logline):
    match = re.search(ACCESS_LOG_PATTERN, logline)
    if match is None:
        print("No match")
        return None
    date_obj = datetime.strptime(match.group(4), DATETIME_RE)

    return Row(
        ipAddress    = match.group(1),
        clientIdentd = match.group(2),
        userId       = match.group(3),
        dateTime     = match.group(4),
        timestamp    = date_obj.timestamp(),
        date         = date_obj.strftime('%Y-%m-%d'),
        method       = match.group(5),
        endpoint     = match.group(6),
        protocol     = match.group(7),
        responseCode = (match.group(8)),
        port         = (match.group(9)),
        field_10     = match.group(10),
        platform     = match.group(11),
        access_tool  = match.group(12),
        in_byte      = match.group(13),
        out_byte     = match.group(14),
        us_byte      = match.group(15))

In [132]:
test_line = 'localhost - - 2016-12-13 12:52:47 +0100 "GET /server-status?auto HTTP/1.1" rc:200 438 "-" "collectd/5.5.1" in:94 out:636 us:390'
test_row = parse_apache_log_line(test_line)
print(test_row)

Row(access_tool='', clientIdentd='-', date='2016-12-13', dateTime='2016-12-13 12:52:47 +0100', endpoint='/server-status?auto', field_10='-', in_byte='in:94', ipAddress='localhost', method='GET', out_byte='out:636', platform='collectd/5.5.1', port='438', protocol='HTTP/1.1', responseCode='rc:200', timestamp=1481629967.0, us_byte='us:390', userId='-')


In [133]:
ACCESS_LOG_PATTERN = '(\S+) (\S+) (\S+) (\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\s[+\-]\d{4}) "(\S+) (\S+)\s*(\S*)\s*" (\S+) (\S+) "(\S+)" "(\S+)\s*(\S*\s*\S*)" (\S+) (\S+) (\S+)'
match = re.search(ACCESS_LOG_PATTERN, test_line)


In [134]:
match.group(13)

'in:94'

In [135]:
access_logs_raw = sc.textFile('access_logs_201612.txt')


In [136]:
access_logs_raw.count()


2529245

In [137]:
access_logs_parsed = access_logs_raw.map(parse_apache_log_line) \
                .filter((lambda x: x is not None))


access_logs_parsed.count()


2275168

In [138]:
access_logs_df = access_logs_parsed.toDF()


In [139]:
type(access_logs_df)

access_logs_df.cache()

DataFrame[access_tool: string, clientIdentd: string, date: string, dateTime: string, endpoint: string, field_10: string, in_byte: string, ipAddress: string, method: string, out_byte: string, platform: string, port: string, protocol: string, responseCode: string, timestamp: double, us_byte: string, userId: string]

In [140]:
distinct_ip_address = access_logs_df.select('ipAddress').distinct()


In [146]:
access_logs_df.groupBy("ipAddress").count().show()

+---------------+-------+
|      ipAddress|  count|
+---------------+-------+
| 147.214.81.243|      8|
|      localhost| 141701|
|147.214.176.155|   4214|
|  147.214.64.44|  11097|
| 147.214.64.148|  17058|
|    153.88.5.88|     16|
| 147.214.17.153|   5711|
| 134.138.165.73|     17|
| 147.214.18.128|2093165|
| 147.214.17.154|   2181|
+---------------+-------+

