In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import split, regexp_extract
from pyspark import SparkContext

sc = SparkContext()
sqlContext = SQLContext(sc)
logJul95 = sqlContext.read.text("/home/vinicius/Desktop/datasets_teste/access_log_Jul95")
logAug95 = sqlContext.read.text("/home/vinicius/Desktop/datasets_teste/access_log_Aug95")
log_file = logJul95.union(logAug95)

In [142]:
log_file.show(10)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
|burger.letters.co...|
|burger.letters.co...|
|205.212.115.106 -...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
+--------------------+
only showing top 10 rows



In [143]:
log_file.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [144]:
split_df = log_file.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host_requisicao'),
                        regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                        regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('requisicao'),
                        regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('cod_http'),
                        regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('bytes_total'))
split_df.show(2, truncate=False)

+---------------------+--------------------------+-------------------+--------+-----------+
|host_requisicao      |timestamp                 |requisicao         |cod_http|bytes_total|
+---------------------+--------------------------+-------------------+--------+-----------+
|199.72.81.55         |01/Jul/1995:00:00:01 -0400|/history/apollo/   |200     |6245       |
|unicomp6.unicomp.net |01/Jul/1995:00:00:06 -0400|/shuttle/countdown/|200     |3985       |
+---------------------+--------------------------+-------------------+--------+-----------+
only showing top 2 rows



In [145]:
split_df.cache()

DataFrame[host_requisicao: string, timestamp: string, requisicao: string, cod_http: int, bytes_total: int]

In [146]:
split_df.columns

['host_requisicao', 'timestamp', 'requisicao', 'cod_http', 'bytes_total']

In [147]:
log_file.filter(log_file["value"].isNull()).count()

0

In [148]:
cleaned_df = split_df.na.fill( 0 )
cleaned_df.cache()

DataFrame[host_requisicao: string, timestamp: string, requisicao: string, cod_http: int, bytes_total: int]

In [149]:
cleaned_df.filter( cleaned_df["bytes_total"].isNull() ).count()

0

In [18]:
hosts_unicos = cleaned_df.select('host_requisicao').distinct().count()
hosts_unicos

137979

In [24]:
erros = cleaned_df.filter(cleaned_df['cod_http']==404).count()
erros

20901

In [150]:
from pyspark.sql.functions import desc

top_5_erros = cleaned_df.filter(cleaned_df['cod_http']==404).groupby('requisicao').count().sort(desc('count'))
top_5_erros.show(5,truncate=False)

+--------------------------------------------+-----+
|requisicao                                  |count|
+--------------------------------------------+-----+
|/pub/winvn/readme.txt                       |2004 |
|/pub/winvn/release.txt                      |1732 |
|/shuttle/missions/STS-69/mission-STS-69.html|682  |
|/shuttle/missions/sts-68/ksc-upclose.gif    |426  |
|/history/apollo/a-001/a-001-patch-small.gif |384  |
+--------------------------------------------+-----+
only showing top 5 rows



In [177]:
from pyspark.sql.functions import dayofmonth

cleaned_df.withColumn('timestamp', cleaned_df['timestamp'].cast('timestamp'))
erros_per_day = cleaned_df.filter(cleaned_df['cod_http']==404).groupby(cleaned_df['timestamp']).count().sort(desc('count'))
erros_per_day.show()


+--------------------+-----+
|           timestamp|count|
+--------------------+-----+
|28/Aug/1995:11:56...|    7|
|11/Aug/1995:12:05...|    7|
|12/Jul/1995:10:35...|    5|
|12/Jul/1995:10:24...|    5|
|11/Aug/1995:12:05...|    5|
|28/Aug/1995:17:14...|    5|
|17/Aug/1995:16:55...|    5|
|12/Jul/1995:10:35...|    5|
|12/Jul/1995:10:21...|    5|
|12/Jul/1995:10:35...|    5|
|12/Jul/1995:10:20...|    5|
|28/Aug/1995:17:14...|    5|
|11/Jul/1995:14:08...|    5|
|28/Aug/1995:17:14...|    4|
|12/Jul/1995:10:35...|    4|
|12/Jul/1995:10:21...|    4|
|28/Aug/1995:01:05...|    4|
|13/Aug/1995:14:28...|    4|
|20/Jul/1995:07:21...|    4|
|04/Aug/1995:18:45...|    4|
+--------------------+-----+
only showing top 20 rows



In [208]:
from pyspark.sql import functions as F
total = cleaned_df.groupBy().agg(F.sum("bytes_total")).collect()
total

[Row(sum(bytes_total)=65524314915)]