In [11]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_extract

import glob
import re

In [None]:
raw_data_files = glob.glob('/home/cloudera/Documents/nasa-project/data/*.gz')
raw_data_files

In [12]:
base_df = sqlContext.read.text('file:///home/cloudera/Documents/nasa-project/data/*.gz')
base_df.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
type(base_df)

In [None]:
print((base_df.count(), len(base_df.columns)))
base_length = base_df.count()

base_length

In [40]:
sample_logs = [item['value'] for item in base_df.take(15)]
type(sample_logs)

list

In [41]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
type(hosts)

list

In [15]:
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]
timestamps

[u'01/Aug/1995:00:00:01 -0400',
 u'01/Aug/1995:00:00:07 -0400',
 u'01/Aug/1995:00:00:08 -0400',
 u'01/Aug/1995:00:00:08 -0400',
 u'01/Aug/1995:00:00:08 -0400',
 u'01/Aug/1995:00:00:09 -0400',
 u'01/Aug/1995:00:00:10 -0400',
 u'01/Aug/1995:00:00:10 -0400',
 u'01/Aug/1995:00:00:10 -0400',
 u'01/Aug/1995:00:00:11 -0400',
 u'01/Aug/1995:00:00:12 -0400',
 u'01/Aug/1995:00:00:12 -0400',
 u'01/Aug/1995:00:00:13 -0400',
 u'01/Aug/1995:00:00:14 -0400',
 u'01/Aug/1995:00:00:16 -0400']

In [16]:
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
               if re.search(method_uri_protocol_pattern, item)
               else 'no match'
              for item in sample_logs]
method_uri_protocol

[(u'GET', u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', u'HTTP/1.0'),
 (u'GET', u'/', u'HTTP/1.0'),
 (u'GET', u'/images/ksclogo-medium.gif', u'HTTP/1.0'),
 (u'GET', u'/images/MOSAIC-logosmall.gif', u'HTTP/1.0'),
 (u'GET', u'/images/USA-logosmall.gif', u'HTTP/1.0'),
 (u'GET', u'/images/launch-logo.gif', u'HTTP/1.0'),
 (u'GET', u'/images/WORLD-logosmall.gif', u'HTTP/1.0'),
 (u'GET', u'/history/skylab/skylab.html', u'HTTP/1.0'),
 (u'GET', u'/images/launchmedium.gif', u'HTTP/1.0'),
 (u'GET', u'/history/skylab/skylab-small.gif', u'HTTP/1.0'),
 (u'GET', u'/images/ksclogosmall.gif', u'HTTP/1.0'),
 (u'GET', u'/history/apollo/images/apollo-logo1.gif', u'HTTP/1.0'),
 (u'GET', u'/history/apollo/images/apollo-logo.gif', u'HTTP/1.0'),
 (u'GET', u'/images/NASA-logosmall.gif', u'HTTP/1.0'),
 (u'GET', u'/shuttle/missions/sts-69/mission-sts-69.html', u'HTTP/1.0')]

In [42]:
status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]
status

[u'200',
 u'304',
 u'304',
 u'304',
 u'304',
 u'200',
 u'304',
 u'200',
 u'200',
 u'200',
 u'200',
 u'200',
 u'200',
 u'304',
 u'200']

In [18]:
content_size_pattern = r'\s(\d+)$'
content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]
print(content_size)

[u'1839', u'0', u'0', u'0', u'0', u'1713', u'0', u'1687', u'11853', u'9202', u'3635', u'1173', u'3047', u'0', u'10566']


In [20]:
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+--------------------+--------------------+------+--------------------+--------+------+------------+
|                host|           timestamp|method|            endpoint|protocol|status|content_size|
+--------------------+--------------------+------+--------------------+--------+------+------------+
|   in24.inetnebr.com|01/Aug/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        1839|
|     uplherc.upl.com|01/Aug/1995:00:00...|   GET|                   /|HTTP/1.0|   304|           0|
|     uplherc.upl.com|01/Aug/1995:00:00...|   GET|/images/ksclogo-m...|HTTP/1.0|   304|           0|
|     uplherc.upl.com|01/Aug/1995:00:00...|   GET|/images/MOSAIC-lo...|HTTP/1.0|   304|           0|
|     uplherc.upl.com|01/Aug/1995:00:00...|   GET|/images/USA-logos...|HTTP/1.0|   304|           0|
|ix-esc-ca2-07.ix....|01/Aug/1995:00:00...|   GET|/images/launch-lo...|HTTP/1.0|   200|        1713|
|     uplherc.upl.com|01/Aug/1995:00:00...|   GET|/images/WORLD-log...|HTTP/1.0|   304|    

In [22]:
logs_df.select(F.countDistinct("host")).show()

+-----------+
|count(host)|
+-----------+
|     137933|
+-----------+



In [25]:
logs_df.filter(logs_df.status == 404).count()

20899

In [35]:
logs_df.filter(logs_df.status == 404).groupBy("endpoint").count().sort('count', ascending=False).show()

+--------------------+-----+
|            endpoint|count|
+--------------------+-----+
|/pub/winvn/readme...| 2004|
|/pub/winvn/releas...| 1732|
|/shuttle/missions...|  683|
|/shuttle/missions...|  428|
|/history/apollo/a...|  384|
|/history/apollo/s...|  383|
|/://spacelink.msf...|  381|
|/images/crawlerwa...|  374|
|/elv/DELTA/uncons...|  372|
|/history/apollo/p...|  359|
|/images/nasa-logo...|  319|
|/shuttle/resource...|  314|
|/history/apollo/a...|  304|
|/shuttle/resource...|  263|
|/shuttle/missions...|  190|
|/shuttle/resource...|  170|
|/shuttle/missions...|  158|
|/history/apollo/i...|  150|
| /images/lf-logo.gif|  143|
|/history/apollo/p...|  140|
+--------------------+-----+
only showing top 20 rows



In [38]:
logs_df.filter(logs_df.status == 404).groupBy("timestamp").count().show()

+--------------------+-----+
|           timestamp|count|
+--------------------+-----+
|04/Aug/1995:14:53...|    1|
|06/Aug/1995:02:38...|    1|
|07/Aug/1995:01:57...|    1|
|07/Aug/1995:09:25...|    1|
|07/Aug/1995:09:58...|    1|
|07/Aug/1995:13:12...|    1|
|08/Aug/1995:02:40...|    1|
|08/Aug/1995:16:06...|    1|
|09/Aug/1995:09:02...|    1|
|09/Aug/1995:15:24...|    2|
|09/Aug/1995:16:12...|    1|
|10/Aug/1995:13:55...|    1|
|11/Aug/1995:07:28...|    1|
|11/Aug/1995:15:53...|    1|
|12/Aug/1995:08:13...|    1|
|13/Aug/1995:20:07...|    1|
|14/Aug/1995:13:01...|    1|
|14/Aug/1995:17:52...|    1|
|15/Aug/1995:10:19...|    1|
|15/Aug/1995:10:37...|    1|
+--------------------+-----+
only showing top 20 rows



In [39]:
logs_df.groupBy().agg(F.sum('content_size')).show()

+-----------------+
|sum(content_size)|
+-----------------+
|      65524314915|
+-----------------+

