## Questionário

## 1. Qual o objetivo do comando cache em Spark?


O comando cache() salva o resultado de uma operação em memória. Este comando é útil quando se deseja usar o resultado de alguma operação mais de uma vez, evitando reprocessamento.

## 2. O mesmo código implementado em Spark é normalmente mais rápido que a implementação equivalente em MapReduce. Por quê?


Um dos principais motivos é o fato de o Spark conseguir diminuir a quantidade de leitura e escrita em disco, utilizando a memória de maneira mais eficiente que um MapReduce.
Quando executamos um comando em Spark, ele monta uma espécie de plano de execução sobre os dados (DAG). O plano é construído de modo a minimizar a transferência de dados e a leitura e escrita em disco. A ordem lógica de execuções é também levada em consideração, verificando-se as dependências entre cada etapa, o que também otimiza a execução. O plano de ação também determina a melhor maneira de paralelizar as tarefas dentro do cluster, o que é uma das grandes vantagens em se usar o Spark.



## 3. Qual é a função do SparkContext?


O SparkContext é a interface de interação com o ambiente Spark. Através dele determinam-se diversas configurações de ambiente local, cluster, sistema de arquivos e etc. Ele acaba sendo uma espécie de orquestrador de uma aplicação Spark.

## 4. Explique com suas palavras o que é Resilient Distributed Datasets(RDD).


O RDD é a estrutura de dados base do Spark. É a representação lógica usada para representar os datasets a serem manipulados em um ambiente Spark. O RDD determina como os dados são distribuídos através do cluster e suas partições.

## 5. GroupByKey é menos eficiente que reduceByKey em grandes dataset. Por quê?


No ReduceByKey, o Spark faz uma espécie de join dos dados de saída com as chaves correspondentes em cada partição antes de transferir os dados entre partições. Já GroupByKey, todos os pares chave-valor são transferidos entre as partições, sem remover nenhuma redundância. Assim, com o GroupByKey há muito mais transferência de dados entre o cluster e mais I/O, e para grandes datasets, ambos acabam sendo um gargalo de performance.

## 6. Explique o que o código Scala abaixo faz.


```scala
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                .map(word => (word, 1)) 
                .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
```

É basicamente um word count, usando map reduce.

Primeiro, carrega um arquivo do hdfs para o RDD. Em seguinda, quebra o texto a cada caracter de espaço(' '), ou seja, está separando cada palavra individual do texto.
O map armazena na variável counts a quantidade de vezes em que cada palavra única aparece no texto.
O último passo é escrever em um arquivo hdfs o resultado dessa operação, ou seja, a frequência com que cada palavra aparece no texto.


## Desafio dataset "HTTP requests to the NASA Kennedy Space Center WWW server"

### Bibliotecas necessárias

In [1]:
import re

from pyspark import SparkContext, SparkConf
from pyspark.sql import Row

### Carregando e limpando o dataset

In [6]:
conf = SparkConf().setAppName('nasa-http-requests-dataset').setMaster('local').set('spark.executor.cores','1')
sc = SparkContext.getOrCreate(conf=conf)

raw_log_files = sc.textFile("NASA_access_log_*")

def parseLog(line):
    
    # A regex abaixo considera linhas com o seguinte padrão (exemplo retirado do próprio dataset):
    # 199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
    # Qualquer linha que não dê match com a regex será considerada inválida
    
    regex = '^(\S+) (-) (-) \[(.*?)\] "(.*?)" (\d{3}) (\S+)'

    match = re.search(regex, line)
    if match is None:
        return (line, 0)
    return (Row(
        hostname       = match.group(1),
        #dash1         = match.group(2),
        #dash2         = match.group(3),
        date           = match.group(4).split(':')[0], #o comando split remove horário e timezone, já que estes não são necessários para o teste
        request        = match.group(5),
        return_code    = match.group(6),
        bytesize       = match.group(7)
    ), 1)


parsed_lines = raw_log_files.map(lambda line: parseLog(line)).cache()

print('Número total de linhas: {}'.format(parsed_lines.count()))

clean_lines = (parsed_lines
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

print('Número de linhas dentro do formato válido: {}'.format(clean_lines.count()))


failed_lines = (parsed_lines
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))

print('Número de linhas inválidas: {}'.format(failed_lines.count()))

Número total de linhas: 3461613
Número de linhas dentro do formato válido: 3461612
Número de linhas inválidas: 1


Daqui em diante consideraremos apenas as linhas que estão no formato esperado.

## Questão 1: Qual é o número de hosts únicos?

In [7]:
hosts = clean_lines.map(lambda log: (log.hostname, 1))
uniqueHosts = hosts.distinct()

print('Número de hosts únicos: {}'.format(uniqueHosts.count()))

Número de hosts únicos: 137978


## Questão 2: qual é a quantidade total de erros 404?

In [9]:
code_404 = (clean_lines.filter(lambda log: log.return_code == '404')).cache()

print('Quantidade de erros 404: {}'.format(code_404.count()))

Quantidade de erros 404: 20901


## Questão 3: Quais são os 5 requests que mais causaram erro 404?

In [7]:
requests_resulting_in_404_code = code_404.map(lambda log: (log.request, 1))
sum_by_request_404 = requests_resulting_in_404_code.reduceByKey(lambda a, b: a + b)
top5_requests = sum_by_request_404.takeOrdered(5, lambda s: -1 * s[1])
print('Os 5 Requests que mais causaram erro 404 : ')
top5_requests

Os 5 Requests que mais causaram erro 404 : 


[('GET /pub/winvn/readme.txt HTTP/1.0', 2004),
 ('GET /pub/winvn/release.txt HTTP/1.0', 1732),
 ('GET /shuttle/missions/STS-69/mission-STS-69.html HTTP/1.0', 682),
 ('GET /shuttle/missions/sts-68/ksc-upclose.gif HTTP/1.0', 426),
 ('GET /history/apollo/a-001/a-001-patch-small.gif HTTP/1.0', 384)]

## Questão 4: Qual é a quantidade de erros 404 por dia?

In [8]:
dates = code_404.map(lambda log: (log.date, 1))
date_sum = dates.reduceByKey(lambda a, b: a + b)

print('Quantidade de erros 404 por dia: ')
date_sum.collect()

NameError: name 'code_404' is not defined

## Questão 5: Qual é o total de bytes retornados?

In [37]:
filter_bitesize_zero = (clean_lines.filter(lambda log: log.bytesize != '-')).cache()
df = spark.createDataFrame(filter_bitesize_zero)
total_bytes = df.agg({'bytesize':'sum'}).collect()

print('Total de bytes retornados: {}'.format(total_bytes[0][0]))


Total de bytes retornados: 65524314915.0
