### Qual o objetivo do comando cache em Spark?

O cache do Spark pode armazenar o resultado de quaisquer dados de subconsulta e dados armazenados em formatos diferentes (como CSV, JSON, parquet e ORC) em RDD. Isto permite melhorar a eficiência do código pois possibilita que resultados intermediários de operações lazy (operações que não são executadas quando são chamadas, mas somente quando uma operação de Ação é chamada ou disparada) possam ser armazenados e reutilizados repetidamente.

### O mesmo código implementado em Spark é normalmente mais rápido que a implementação equivalente em MapReduce. Por quê

No MapReduce uma nova instância JVM é iniciada para cada job executado e o processamento dos dados ocorre em etapas que partem da leitura dos dados até a escrita do resultado dos mesmos em disco (gerando os dados intermediários) para que estes sejam utilizados na próxima operação (job) (leitura – processamento operação 1 – escrita operação 1 – leitura resultado operação 1 – processamento operação 2 – escrita operação 2, ...)
Já o Spark opera a partir de todo o conjunto de dados sendo os resultados intermediários de todas as operações passados diretamente entre elas através do caching dos mesmos em memória (leitura – processamento – escrita). Além disto, o Spark mantém a JVM constantemente em execução em cada um dos nós sendo necessário apenas iniciar uma nova thread. 
Porém, vale ressaltar que devem ser avaliadas todas as variáveis existentes por exemplo algoritimo a ser implementado, memória RAM disponível e propósito do aplicação e decidir qual framework é mais adequado ao objetivo esperado e ao custo envolvido tendo em vista que operações em MapReduce são indicadas para movimentação batch de grandes volumes de dados enquanto o Spark tem uma implementação mais ampla ao permitir o processamento de grandes volumes de dados tanto em Batch quanto em streaming.



### Qual é a função do SparkContext?

Esta função serve para criar uma conexão com um cluster Spark e ativar serviços internos do ambiente Spark com o auxilio do Resource Manager configurado que aloca os recursos através das aplicações conforme configurações/parâmetros estabelecidos. Ele estabelece configurações de memória e processamento de worknodes. Pode ser utilizado, com o auxílio do Resource Manager, para criação de RDD, acumuladores e publicar variáveis nos clusters além de possibilitar obtenção do status atual, definir configurações e disparar/cancelar um job por exemplo.

### Explique com suas palavras o que é Resilient Distributed Datasets (RDD)?

RDD é a estrutura de dados do SPARK caracterizada por possibilitar a distribuição (particionamento) de dados de maneira segura em diferentes nós em um cluster e executar operações com estes sendo esta distribuição possível devido à característica de resiliência uma vez que, possibilita que os dados na memória que por ventura venham a ser perdidos por possíveis falhas possam ser recriados da mesma forma como foram originados (imutáveis).
Para garantir estas características os RDDs suportam dois tipos de operações: transformação e ação. A operação de transformação garante a resiliência e imutabilidade dos dados uma vez que ao serem recriados dados perdidos os mesmos são recriados em novas RDDs sem que as RDDs “originais”sejam modificadas.  
Toda vez que existir a necessidade de output de dados é executada uma operação de Ação, desta forma, uma ação de transformação não é iniciada sem que uma operação de ação seja solicitada. (Lazy Evaluation conforme descrito na primeira questão)


### GroupByKey é menos eficiente que reduceByKey em grandes datasets. Por quê?

Enquanto o GroupByKey agrupa os dados com base na chave num processo de redução único no RDD redutor (reducer), o reduceByKey realiza o agrupamento pela chave localmente no RDD mapper usando as funções associativa e comutativa antes de enviar os dados ao RDD redutor de forma similar ao método “combiner” do MapReduce.
O GroupByKey transfere todo Dataset pela rede, enquanto o reduceByBey calcula somas locais para cada chave em cada partição e combina essas somas locais em somas maiores após o shuffle de forma que um conjunto substancialmente menor de dados e movimentado consumindo menos recursos de memoria e rede.


### Explique o que o código Scala abaixo faz.

![alt text](Picture1.png "Title")

### Explicação: 

![alt text](Picture2.png "Title")

### Carrega um arquivo de texto de um determinado diretorio do HDFS para a variavel textFile num RDD usando contexto Spark

val textFile = sc.textFile("hdfs://...")

### Quebra cada linha em uma sequencia de palavras usando o criterio de espaco em branco e armazena o conjunto sequencial de palavras na variavel counts (novo RDD) e em seguida executa as duas próximas instruções abaixo

val counts = textFile.flatMap(line.split(""))

### Transforma cada palavra criando uma relação de chave-valor entre a palavra e o número 1

        .map(word => (word, 1))
    
### Gera agrupamento por palavras de forma a somar a quantidade de incidências 

        .reduceByKey(_+_)
    
### Grava o resultado com a contagem de palavras do RDD criado em formato texto no HDFS    

counts.saveASTextFile("hdfs://...") 

# Questões - datasets NASA 

# Datasets:
### 	Julho -> ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
### 	Agosto -> ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz


In [None]:
import patoolib
import os
import urllib.request
import pandas as pd

In [None]:
os.chdir("C:\\Users\\Usuario\\Desktop\\Desafio")


In [None]:
os.listdir()

In [None]:
urllib.request.urlretrieve(
    "ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz", 
    "C:\\Users\\Usuario\\Desktop\\Desafio\\Jul.gz")

In [None]:
urllib.request.urlretrieve(
    "ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz ", 
    "C:\\Users\\Usuario\\Desktop\\Desafio\\Ago.gz")

In [None]:
patoolib.extract_archive("Ago.gz", outdir="C:\\Users\\Usuario\\Desktop\\Desafio")

patoolib.extract_archive("Jul.gz", outdir="C:\\Users\\Usuario\\Desktop\\Desafio")


In [218]:
jul = sc.textFile('access_log_Jul95',use_unicode=True)
jul = jul.cache()

In [226]:
ago = sc.textFile('access_log_Aug95',use_unicode=True)
ago = ago.cache()

## 1. Número de hosts únicos.

In [227]:
# Número de hosts únicos
jul_count = jul.flatMap(lambda line: str(line.encode('utf-8')).split(' ')[0]).distinct().count()
ago_count = ago.flatMap(lambda line: str(line.encode('utf-8')).split(' ')[0]).distinct().count()
print('Número de hosts únicos em Julho: %s' % jul_count)
print('Número de hosts únicos em Agosto: %s' % ago_count)

Número de hosts únicos em Julho: 56
Número de hosts únicos em Agosto: 54


## 2.	O total de erros 404.

In [138]:
# Total de erros 404
def response_code_404(line):
    try:
        code = line.split(' ')[-2]
        if code == '404':
            return True
    except:
        pass
    return False

In [233]:
jul_404 = jul.filter(response_code_404).cache()
ago_404 = ago.filter(lambda line: str(line.encode('utf-8')).split(' ')[-2] == '404').cache()

print('Total de erros 404 em Julho: %s' % format(jul_404.count(), ",").replace(",", "."))
print('Total de erros 404 em Agosto: %s' % format(ago_404.count(), ",").replace(",", "."))

Total de erros 404 em Julho: 10.845
Total de erros 404 em Agosto: 10.056


## 3.	Os 5 URLs que mais causaram erro 404.

In [253]:
# 5 URLs que mais causaram erro 404
def top5_endpoints(rdd):
    endpoints = rdd.map(lambda line: str(line.encode('utf-8')).split('"')[1].split(' ')[1])
    counts = endpoints.map(lambda endpoint: (endpoint, 1)).reduceByKey(add)
    top = counts.sortBy(lambda pair: -pair[1]).take(5)
    
    print('\nTop 5 URLs que mais causaram erro 404: \n')
    for endpoint, count in top:
        print(endpoint, count)

    return top

print("Julho: ")
top5_endpoints(july_404)
print("\nAgosto: ")
top5_endpoints(august_404)

Julho: 

Top 5 URLs que mais causaram erro 404: 

/pub/winvn/readme.txt 667
/pub/winvn/release.txt 547
/history/apollo/apollo-13.html 286
/shuttle/resources/orbiters/atlantis.gif 232
/history/apollo/a-001/a-001-patch-small.gif 230

Agosto: 

Top 5 URLs que mais causaram erro 404: 

/pub/winvn/readme.txt 1337
/pub/winvn/release.txt 1185
/shuttle/missions/STS-69/mission-STS-69.html 683
/images/nasa-logo.gif 319
/shuttle/missions/sts-68/ksc-upclose.gif 253


[('/pub/winvn/readme.txt', 1337),
 ('/pub/winvn/release.txt', 1185),
 ('/shuttle/missions/STS-69/mission-STS-69.html', 683),
 ('/images/nasa-logo.gif', 319),
 ('/shuttle/missions/sts-68/ksc-upclose.gif', 253)]

## 4.	Quantidade de erros 404 por dia.

In [282]:
# Total de erros 404 diários
def daily_count(rdd):
    days = rdd.map(lambda line: line.split('[')[1].split(':')[0])
    counts = days.map(lambda day: (day, 1)).reduceByKey(add).collect()
    

    for day, count in counts:
        print(day, count)
        
    return counts

print('\nTotal de erros 404 retornados no mês de Julho: \n')
daily_count(jul_404)
print('\nTotal de erros 404 retornados no mês de Agosto: \n')
daily_count(ago_404)


Total de erros 404 retornados no mês de Julho: 

13/Jul/1995 532
21/Jul/1995 334
25/Jul/1995 461
09/Jul/1995 348
15/Jul/1995 254
16/Jul/1995 257
18/Jul/1995 465
17/Jul/1995 406
07/Jul/1995 570
12/Jul/1995 471
19/Jul/1995 639
22/Jul/1995 192
23/Jul/1995 233
03/Jul/1995 474
05/Jul/1995 497
10/Jul/1995 398
14/Jul/1995 413
01/Jul/1995 316
02/Jul/1995 291
04/Jul/1995 359
06/Jul/1995 640
08/Jul/1995 302
11/Jul/1995 471
20/Jul/1995 428
24/Jul/1995 328
26/Jul/1995 336
27/Jul/1995 336
28/Jul/1995 94

Total de erros 404 retornados no mês de Agosto: 

01/Aug/1995 243
07/Aug/1995 537
09/Aug/1995 279
10/Aug/1995 315
21/Aug/1995 305
27/Aug/1995 370
30/Aug/1995 571
03/Aug/1995 304
06/Aug/1995 373
08/Aug/1995 391
16/Aug/1995 259
20/Aug/1995 312
05/Aug/1995 236
11/Aug/1995 263
12/Aug/1995 196
13/Aug/1995 216
15/Aug/1995 327
17/Aug/1995 271
22/Aug/1995 288
23/Aug/1995 345
24/Aug/1995 420
26/Aug/1995 366
28/Aug/1995 410
14/Aug/1995 287
18/Aug/1995 256
19/Aug/1995 209
31/Aug/1995 526
04/Aug/1995 346
25/A

[('01/Aug/1995', 243),
 ('07/Aug/1995', 537),
 ('09/Aug/1995', 279),
 ('10/Aug/1995', 315),
 ('21/Aug/1995', 305),
 ('27/Aug/1995', 370),
 ('30/Aug/1995', 571),
 ('03/Aug/1995', 304),
 ('06/Aug/1995', 373),
 ('08/Aug/1995', 391),
 ('16/Aug/1995', 259),
 ('20/Aug/1995', 312),
 ('05/Aug/1995', 236),
 ('11/Aug/1995', 263),
 ('12/Aug/1995', 196),
 ('13/Aug/1995', 216),
 ('15/Aug/1995', 327),
 ('17/Aug/1995', 271),
 ('22/Aug/1995', 288),
 ('23/Aug/1995', 345),
 ('24/Aug/1995', 420),
 ('26/Aug/1995', 366),
 ('28/Aug/1995', 410),
 ('14/Aug/1995', 287),
 ('18/Aug/1995', 256),
 ('19/Aug/1995', 209),
 ('31/Aug/1995', 526),
 ('04/Aug/1995', 346),
 ('25/Aug/1995', 415),
 ('29/Aug/1995', 420)]

## 5.	O total de bytes retornados.

In [266]:
def format_size(B):   
   B = float(B)
   KB = float(1024)
   MB = float(KB ** 2) # 1,048,576
   GB = float(KB ** 3) # 1,073,741,824
   TB = float(KB ** 4) # 1,099,511,627,776

   if B < KB:
      return '{0} {1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
   elif KB <= B < MB:
      return '{0:.2f} KB'.format(B/KB)
   elif MB <= B < GB:
      return '{0:.2f} MB'.format(B/MB)
   elif GB <= B < TB:
      return '{0:.2f} GB'.format(B/GB)
   elif TB <= B:
      return '{0:.2f} TB'.format(B/TB)

In [269]:
# Total de bytes retornados
def accumulated_byte_count(rdd):
    def byte_count(line):
        try:
            count = int(line.split(" ")[-1])
            if count < 0:
                raise ValueError()
            return count
        except:
            return 0
        
    count = rdd.map(byte_count).reduce(add)
    return count


print('Total de bytes retornados em Julho: %s' % format_size(accumulated_byte_count(jul)))
print('Total de bytes retornados em Agosto: %s' % format_size(accumulated_byte_count(ago)))

Total de bytes retornados em Julho: 36.04 GB
Total de bytes retornados em Agosto: 24.99 GB


In [None]:
sc.stop()