In [1]:
import sys
import gzip
import shutil
import pyspark
import urllib.request
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import split, regexp_extract, year, month, dayofmonth, udf

## Realizando Download dos arquivos

In [2]:
def download_file(source_url, destination_file):
    try:
        print("Downloading: {} to {}".format(source_url, destination_file))
        urllib.request.urlretrieve(source_url, destination_file)
    except Exception as e:
        print(e)
        sys.exit(1)

In [3]:
nasa_log_jul95_destination_file = 'data/NASA_access_log_Jul95.gz'
nasa_log_aug95_destination_file = 'data/NASA_access_log_Aug95.gz'
nasa_log_jul95_source_url = 'ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz'
nasa_log_aug95_source_url = 'ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz'

In [4]:
download_file(nasa_log_jul95_source_url, nasa_log_jul95_destination_file)
download_file(nasa_log_aug95_source_url, nasa_log_aug95_destination_file)

Downloading: ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz to data/NASA_access_log_Jul95.gz
Downloading: ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz to data/NASA_access_log_Aug95.gz


## Descompactando Arquivos

In [8]:
nasa_log_jul95_destination_raw_file = 'data/access_log_jul95'
nasa_log_aug95_destination_raw_file = 'data/access_log_aug95'

In [None]:
def decompress_file(source_compressed_file, destination_raw_file):
    print('Decompressing file {} to {}'.format(source_compressed_file, destination_raw_file))
    with gzip.open(source_compressed_file, 'rb') as c:
        with open(destination_raw_file, 'wb') as f:
            shutil.copyfileobj(c, f)

In [None]:
decompress_file(nasa_log_jul95_destination_file, nasa_log_jul95_destination_raw_file)
decompress_file(nasa_log_aug95_destination_file, nasa_log_aug95_destination_raw_file)

### Realizando a leitura dos arquivos

In [6]:
spark = SparkSession.builder \
        .master("local") \
        .appName("NASA Web Log Analysis") \
        .getOrCreate()

In [9]:
df_nasa_log_jul95 = spark.read.text(nasa_log_jul95_destination_raw_file)
df_nasa_log_jul95.printSchema()
df_nasa_log_jul95.count()


root
 |-- value: string (nullable = true)



1891715

In [10]:
df_nasa_log_aug95 = spark.read.text(nasa_log_jul95_destination_raw_file)
df_nasa_log_aug95.printSchema()
df_nasa_log_aug95.count()

root
 |-- value: string (nullable = true)



1891715

### Concatenando dataframes

In [11]:
df_raw_nasa_log = df_nasa_log_aug95.union(df_nasa_log_jul95)
df_raw_nasa_log.printSchema()
df_raw_nasa_log.count()

root
 |-- value: string (nullable = true)



3783430

In [12]:
df_raw_nasa_log.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                         |
+------------------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                        |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                             |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085          |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0                      |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small

### Realizando o Parse dos logs do servidor web

In [30]:
df_nasa_log = df_raw_nasa_log.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                          regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                          regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                          regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                          regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
df_nasa_log.show(truncate=False)
df_nasa_log.cache()

+--------------------------+--------------------------+-------------------------------------------------+------+------------+
|host                      |timestamp                 |path                                             |status|content_size|
+--------------------------+--------------------------+-------------------------------------------------+------+------------+
|199.72.81.55              |01/Jul/1995:00:00:01 -0400|/history/apollo/                                 |200   |6245        |
|unicomp6.unicomp.net      |01/Jul/1995:00:00:06 -0400|/shuttle/countdown/                              |200   |3985        |
|199.120.110.21            |01/Jul/1995:00:00:09 -0400|/shuttle/missions/sts-73/mission-sts-73.html     |200   |4085        |
|burger.letters.com        |01/Jul/1995:00:00:11 -0400|/shuttle/countdown/liftoff.html                  |304   |0           |
|199.120.110.21            |01/Jul/1995:00:00:11 -0400|/shuttle/missions/sts-73/sts-73-patch-small.gif  |200   |4179  

DataFrame[host: string, timestamp: string, path: string, status: int, content_size: int]

### Checagem de dados

In [40]:
from pyspark.sql.functions import isnan, when, count, col
df_nasa_log.select([count(when(col(c).isNull(), c)).alias(c) for c in df_nasa_log.columns]).show()

+----+---------+----+------+------------+
|host|timestamp|path|status|content_size|
+----+---------+----+------+------------+
|   0|        0|   0|     0|           0|
+----+---------+----+------+------------+



### Retirando registros em 'status'

In [33]:
df_nasa_log = df_nasa_log.filter(~df_nasa_log.status.isNull())

### Checando 'content-size' vazio

In [36]:
df_empty_content_size = df_nasa_log.filter(df_nasa_log.content_size.isNull())
df_empty_content_size.show()

+--------------------+--------------------+--------------------+------+------------+
|                host|           timestamp|                path|status|content_size|
+--------------------+--------------------+--------------------+------+------------+
|dd15-062.compuser...|01/Jul/1995:00:01...|/news/sci.space.s...|   404|        null|
|    dynip42.efn.org |01/Jul/1995:00:02...|           /software|   302|        null|
|ix-or10-06.ix.net...|01/Jul/1995:00:02...|     /software/winvn|   302|        null|
|ix-or10-06.ix.net...|01/Jul/1995:00:03...|           /software|   302|        null|
|link097.txdirect....|01/Jul/1995:00:05...|            /shuttle|   302|        null|
|ix-war-mi1-20.ix....|01/Jul/1995:00:05...|/shuttle/missions...|   302|        null|
|ix-war-mi1-20.ix....|01/Jul/1995:00:05...|/shuttle/missions...|   302|        null|
|  netport-27.iu.net |01/Jul/1995:00:10...|/pub/winvn/readme...|   404|        null|
|  netport-27.iu.net |01/Jul/1995:00:10...|/pub/winvn/readme...| 

In [38]:
df_empty_content_size.groupBy('status').count().show()

+------+-----+
|status|count|
+------+-----+
|   501|   28|
|   400|   10|
|   403|  108|
|   404|21688|
|   200|  166|
|   302|17452|
+------+-----+



### Substituindo 'content-size' nulo para '0'

In [39]:
df_nasa_log = df_nasa_log.na.fill({'content_size': 0})

### Transformação timestamp

In [23]:
months = {
    'Jan':1,
    'Feb':2,
    'Mar':3,
    'Apr':4,
    'May':5,
    'Jun':6,
    'Jul':7,
    'Aug':8,
    'Sep':9,
    'Oct':10,
    'Nov':11,
    'Dec':12
}

def parse_timestamp(s):
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(s[7:11]),
      months[s[3:6]],
      int(s[0:2]),
      int(s[12:14]),
      int(s[15:17]),
      int(s[18:20])
    )

u_parse_timestamp = udf(parse_timestamp, StringType())


df_nasa_log = df_nasa_log.select('*', u_parse_timestamp(df_nasa_log['timestamp']) \ 
                                 .cast('timestamp') \
                                 .alias('time')) \
                .drop('timestamp')

### Encontrando o número de hosts unicos

In [14]:
distinct_hosts = df_nasa_log.select('host') \
                .distinct() \
                .count()
print("Número de Hosts Unicos: {}".format(distinct_hosts))

Número de Hosts Unicos: 81983


### Encontrando número de respostas 404 (Not Found)

In [15]:
not_found = df_nasa_log.filter('status = 404').count()
print("Número de Respostas 404: {}".format(not_found))

Número de Respostas 404: 21690


### Encontrando TOP 5 URLs não encontradas

In [16]:
urls = df_nasa_log.filter('status = 404') \
        .groupBy('path') \
        .count() \
        .sort("count", ascending=False)
urls.show(5, truncate=False)

+-------------------------------------------+-----+
|path                                       |count|
+-------------------------------------------+-----+
|/pub/winvn/readme.txt                      |1334 |
|/pub/winvn/release.txt                     |1094 |
|/history/apollo/apollo-13.html             |572  |
|/shuttle/resources/orbiters/atlantis.gif   |460  |
|/history/apollo/a-001/a-001-patch-small.gif|460  |
+-------------------------------------------+-----+
only showing top 5 rows



### Encontrando número de Erros 404 por dia

#### Convertendo coluna timestamp

In [27]:
not_found_per_day = df_nasa_log.select(month('time').alias('month'), \
                                       dayofmonth('time').alias('day')) \
                    .filter('status = 404') \
                    .groupBy('month', 'day') \
                    .count() \
                    .sort("count", ascending=False)
not_found_per_day.show(truncate=False)


+-----+---+-----+
|month|day|count|
+-----+---+-----+
|7    |6  |1280 |
|7    |19 |1278 |
|7    |7  |1140 |
|7    |13 |1064 |
|7    |5  |994  |
|7    |3  |948  |
|7    |12 |942  |
|7    |11 |942  |
|7    |18 |930  |
|7    |25 |922  |
|7    |20 |856  |
|7    |14 |826  |
|7    |17 |812  |
|7    |10 |796  |
|7    |4  |718  |
|7    |9  |696  |
|7    |26 |672  |
|7    |27 |672  |
|7    |21 |668  |
|7    |24 |656  |
+-----+---+-----+
only showing top 20 rows



In [None]:
df_nasa_log.show(truncate=False)

### Encontrando o número de bytes trafegados

In [48]:
response_bytes_size = df_nasa_log.agg(sum('content_size')).collect()[0][0]
print('Total de bytes trafegados: {}'.format(response_bytes_size))

Total de bytes trafegados: 77391946982
