In [1]:
# !pip install dateparser

# create pyspark context and session
from pyspark import SparkContext
from pyspark.sql import SparkSession

from dateparser import parse
import calendar

In [2]:
spark = SparkSession.builder.appName("accessLogAnalysis").getOrCreate()
sc = spark.sparkContext


In [3]:
# read the log file
path = "./data/access_log.txt"
columns = ["ip_address", "1", "2", "datetime", "timezone", "method", "path", "type", "response_code", "response_size"]

In [4]:
rdd = sc.textFile(path).map(
    lambda line: dict(zip(columns, line.split()))
)
# print(dataframe.show(5, False))

## Questão 1) Identifique as 10 maiores origens de acesso (Client IP) por quantidade de acessos.

In [5]:
rdd.map(
    lambda line: (
        line["ip_address"],
        1,
    )
).reduceByKey(
    lambda k, v: k + v
).map(
    lambda item: tuple(list(item)[::-1])
).sortByKey(
    False
).take(10)

[(158614, '10.216.113.172'),
 (51942, '10.220.112.1'),
 (47503, '10.173.141.213'),
 (43592, '10.240.144.183'),
 (37554, '10.41.69.177'),
 (22516, '10.169.128.121'),
 (20866, '10.211.47.159'),
 (19667, '10.96.173.111'),
 (18878, '10.203.77.198'),
 (18721, '10.31.77.18')]

## Questão 2) Liste os 6 endpoints mais acessados, desconsiderando aqueles que representam arquivos

In [6]:
rdd.filter(
    lambda item: "." not in item.get("path")
).map(
    lambda line: (
        line["path"],
        1,
    )
).reduceByKey(
    lambda k, v: k + v
).map(
    lambda item: tuple(list(item)[::-1])
).sortByKey(
    False
).take(6)


[(99303, '/'),
 (25937, '/release-schedule/'),
 (23055, '/search/'),
 (18940, '/release-schedule'),
 (8415, '/release-schedule/?p=1&r=&l=&o=&rpp=10'),
 (7505, '/news/')]

## Questão 3) Qual a quantidade de Client IPs distintos

In [7]:
rdd.map(
    lambda line: line["ip_address"]
).distinct().count()

333923

## Questão 4) Quantos dias de dados estão representados no arquivo?

In [8]:
rdd.map(
    lambda line: line["datetime"].split(":")[0]
).distinct().count()

793

## Questão 5) Com base no tamanho (em bytes) do conteúdo das respostas, faça a seguinte análise

In [9]:
def _try_cast(obj, type):
    try:
        return isinstance(type(obj), type)
    except:
        return False

In [10]:
question_5_rdd = rdd.filter(
    lambda item: (
        item.get("response_code", "").startswith("2")
    )
).filter(
    lambda item: _try_cast(item.get("response_size", ""), int)
).map(
    lambda item: int(item.get("response_size"))
)

### O volume total de dados retornado.

In [11]:
question_5_rdd_sum = question_5_rdd.sum()
print(
    question_5_rdd_sum,
    "Bytes"
)
print(
    round(question_5_rdd_sum / 1024 / 1024, 2),
    "Megabytes"
)
print(
    round(question_5_rdd_sum / 1024 / 1024 / 1024, 2),
    "Gigabytes"
)
print(
    round(question_5_rdd_sum / 1024 / 1024 / 1024 / 1024, 2),
    "Terabytes"
)

805193340248 Bytes
767892.21 Megabytes
749.89 Gigabytes
0.73 Terabytes


### O maior volume de dados em uma única resposta.

In [12]:
question_5_rdd_max = question_5_rdd.max()
print(
    question_5_rdd_max,
    "Bytes"
)
print(
    round(question_5_rdd_max / 1024 / 1024, 2),
    "MegaBytes"
)

80215074 Bytes
76.5 MegaBytes


### O menor volume de dados em uma única resposta.

In [13]:
question_5_rdd_min = question_5_rdd.min() 
print(
    question_5_rdd_min,
    "Bytes"
)
print(
    round(question_5_rdd_min / 1024 / 1024, 2),
    "MegaBytes"
)

1 Bytes
0.0 MegaBytes


### O volume médio de dados retornado

In [14]:
question_5_rdd_mean = question_5_rdd.mean()
print(
    question_5_rdd_mean,
    "Bytes"
)
print(
    round(question_5_rdd_mean / 1024 / 1024, 2),
    "MegaBytes"
)

201076.29656152302 Bytes
0.19 MegaBytes


## Questão 6) Qual o dia da semana com o maior número de erros do tipo "HTTP Client Error"?

In [15]:
# Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status#client_error_responses
rdd.filter(
    lambda item: item.get("response_code", "").startswith("4")
).map(
    lambda line: (
        calendar.day_name[
            parse(line["datetime"].split(":")[0].replace("[", "")).weekday()
        ],
        1
    )
).reduceByKey(
    lambda k, v: k + v
).map(
    lambda item: tuple(list(item)[::-1])
).sortByKey(
    False
).take(1)[0][1]

'Friday'

In [16]:
rdd.count()

4477843

In [None]:
rdd_initial = rdd.map(
    lambda line: (
        line["response_size"],
        1,
    )
).reduceByKey(
    lambda k, v: k + v
).map(
    lambda item: tuple(list(item)[::-1])
).sortByKey(
    False
).take(10)
rdd_initial