In [2]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession

In [3]:
from pyspark.sql.functions import regexp_extract

In [4]:
import glob

In [5]:
import re
import pandas as pd

In [6]:
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [7]:
raw_data_files = glob.glob('*.gz')
raw_data_files

['NASA_access_log_Jul95.gz', 'NASA_access_log_Aug95.gz']

In [8]:
base_df = spark.read.text(raw_data_files)

In [23]:
sample_logs = [item['value'] for item in base_df.take(15)]

In [51]:
ts_pattern = r'\[(\d{2}/\w{3}/\d{4})'
timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]
timestamps

['01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995',
 '01/Jul/1995']

In [52]:
base_df.take(5)

[Row(value='199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245'),
 Row(value='unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985'),
 Row(value='199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0'),
 Row(value='199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179')]

In [53]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}/\w{3}/\d{4})'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'
content_size_pattern = r'\s(\d+)$'

In [54]:
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

In [55]:
logs_df.take(5)

[Row(host='199.72.81.55', timestamp='01/Jul/1995', method='GET', endpoint='/history/apollo/', protocol='HTTP/1.0', status=200, content_size=6245),
 Row(host='unicomp6.unicomp.net', timestamp='01/Jul/1995', method='GET', endpoint='/shuttle/countdown/', protocol='HTTP/1.0', status=200, content_size=3985),
 Row(host='199.120.110.21', timestamp='01/Jul/1995', method='GET', endpoint='/shuttle/missions/sts-73/mission-sts-73.html', protocol='HTTP/1.0', status=200, content_size=4085),
 Row(host='burger.letters.com', timestamp='01/Jul/1995', method='GET', endpoint='/shuttle/countdown/liftoff.html', protocol='HTTP/1.0', status=304, content_size=0),
 Row(host='199.120.110.21', timestamp='01/Jul/1995', method='GET', endpoint='/shuttle/missions/sts-73/sts-73-patch-small.gif', protocol='HTTP/1.0', status=200, content_size=4179)]

In [56]:
logs_df.select("host").distinct().show()
print("A quantidade de hosts distintos: %s" % (logs_df.select("host").distinct().count()))

+--------------------+
|                host|
+--------------------+
|ppp3_136.bekkoame...|
|    ppp31.texoma.com|
|ix-wc7-20.ix.netc...|
|nb1-du5.polarnet....|
|    ttyb5.shasta.com|
|dialup00004.cinet...|
|dd14-025.compuser...|
|nigrlpr.actrix.ge...|
| uckm001.pn.itnet.it|
|      queulen.puc.cl|
|pipe2.nyc.pipelin...|
|      198.53.164.131|
|asdsun.larc.nasa.gov|
|pm45-52.smartlink...|
|     193.166.184.116|
|       194.20.34.120|
|dd09-021.compuser...|
|     leo.racsa.co.cr|
| freenet.carleton.ca|
|  enigma.idirect.com|
+--------------------+
only showing top 20 rows

A quantidade de hosts distintos: 137933


In [57]:
logs_404 = logs_df.select("status").where(logs_df.status == 404)

In [58]:
from pyspark.sql.functions import col

In [59]:
logs_404.count()

20899

In [60]:
logs_df.select("endpoint").where(logs_df.status == 404).groupby("endpoint").count().sort(col("count").desc()).show(5)

+--------------------+-----+
|            endpoint|count|
+--------------------+-----+
|/pub/winvn/readme...| 2004|
|/pub/winvn/releas...| 1732|
|/shuttle/missions...|  683|
|/shuttle/missions...|  428|
|/history/apollo/a...|  384|
+--------------------+-----+
only showing top 5 rows



In [61]:
from pyspark.sql.functions import to_date

In [65]:
logs_df.select(["status", "timestamp"]).where(logs_df.status == 404).groupby("timestamp").count().sort(col("timestamp").asc()).show(60)

+-----------+-----+
|  timestamp|count|
+-----------+-----+
|01/Aug/1995|  243|
|01/Jul/1995|  316|
|02/Jul/1995|  291|
|03/Aug/1995|  304|
|03/Jul/1995|  474|
|04/Aug/1995|  346|
|04/Jul/1995|  359|
|05/Aug/1995|  236|
|05/Jul/1995|  497|
|06/Aug/1995|  373|
|06/Jul/1995|  640|
|07/Aug/1995|  537|
|07/Jul/1995|  570|
|08/Aug/1995|  391|
|08/Jul/1995|  300|
|09/Aug/1995|  279|
|09/Jul/1995|  348|
|10/Aug/1995|  315|
|10/Jul/1995|  398|
|11/Aug/1995|  263|
|11/Jul/1995|  471|
|12/Aug/1995|  196|
|12/Jul/1995|  471|
|13/Aug/1995|  216|
|13/Jul/1995|  532|
|14/Aug/1995|  287|
|14/Jul/1995|  413|
|15/Aug/1995|  327|
|15/Jul/1995|  254|
|16/Aug/1995|  259|
|16/Jul/1995|  257|
|17/Aug/1995|  271|
|17/Jul/1995|  406|
|18/Aug/1995|  256|
|18/Jul/1995|  465|
|19/Aug/1995|  209|
|19/Jul/1995|  639|
|20/Aug/1995|  312|
|20/Jul/1995|  428|
|21/Aug/1995|  305|
|21/Jul/1995|  334|
|22/Aug/1995|  288|
|22/Jul/1995|  192|
|23/Aug/1995|  345|
|23/Jul/1995|  233|
|24/Aug/1995|  420|
|24/Jul/1995|  328|


In [88]:
logs_df = logs_df.na.fill({'content_size': 0})

In [92]:
import numpy as np 

In [100]:
from pyspark.sql import functions as F

In [112]:
total = logs_df.groupBy().agg(F.sum("content_size")).collect()[0][0]