# <font color='blue'>Semantix</font>
## <font color='blue'>Desafio Engenheiro de Dados</font>

### Qual o objetivo do comando <u>cache</u> em Spark?
Possibilita a persistencia dos dados em memória, permitindo que sejam reutilizados em etapas seguintes evitando um novo processamento.

### O mesmo código implementado em Spark é normalmente mais rápido que a implementação equivalente em MapReduce. Por quê?
O MapReduce utiliza-se do disco (HDFS) para realizar a gravação dos resultados intermediários em uma atividade de processamento, ao passo que o Spark utiliza-se da memória.
O processamento em memória é até 100x mais rápido.

### Qual é a função do <u>SparkContext</u>?
Estabelecer a conexão com o ambiente de execução do Spark, permitindo acesso à todas as suas funcionalidades.

### Explique com suas palavras  o que é <u>Resilient Distributed Datasets (RDD)</u>.
É uma coleção de elementos de dados particionados distribuída e imutável.

### <u>GroupByKey</u> é menos eficiente que <u>reduceByKey</u> em grandes dataset. Por quê?
O GroupByKey realiza o agrupamento de todos os dados de diferentes partições para só então realizar a operação.
O reduceByKey realiza o agrupamento dos dados por partição para depois realizar a operação. Isso implica em um volume menor de dados trafegando na rede.


### Explique o que o código Scala abaixo faz.

In [None]:
val textFile = sc.textFile("hdfs://...") #CRIA UM RDD A PARTIR DE UM ARQUIVO DO HDFS
val counts = textFile.flatMap(line => line.split(" ") #APLICA O SPLIT PARA QUEBRAR O RDD EM PALAVRAS
    .map(word => (word, 1)) #FAZ UM MAPEAMENTO "CHAVE, VALOR" DE CADA PALAVRA ATRIBUINDO O VALOR 1
    .reduceByKey(_ + _) #FAZ REDUÇÃO SUMARIZANDO OS VALORES POR PALAVRAS, RESULTANDO NA QUANTIDADE TOTAL DE CADA PALAVRA
counts.saveAsTextFile("hdfs://...") #SALVA O ARQUIVO COM A CONTAGEM DE REPETIÇÃO DAS PALAVRAS NO HDFS


# <font color='blue'>HTTP requests to the NASA Kennedy Space Center WWW server</font>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row

In [None]:
# Criando o Spark Session
spSession = SparkSession.builder.master("local").appName("Semantix-Desafio").config("spark.some.config.option", "some-value").getOrCreate()

In [None]:
# Criando o SQL Context
sqlContext = SQLContext(sc)

In [None]:
# Criando os RDDs
Jul95 = sc.textFile("access_log_Jul95.csv")
Aug95 = sc.textFile("access_log_Aug95.csv")

In [None]:
# Unindo os RDDs e fazendo o tratamento de limpeza e estruturação
nasa1 = Jul95.union(Aug95)
nasa2 = nasa1.map(lambda x: x.replace(" - - [","|"))
nasa3 = nasa2.map(lambda x: x.replace(" -","|"))
nasa4 = nasa3.map(lambda x: x.replace('] "',"|"))
nasa5 = nasa4.map(lambda x: x.replace('" ',"|"))
nasa6 = nasa5.map(lambda line: line.split("|"))

In [None]:
# Criando linhas independentes 
nasaFinal = nasa6.map(lambda p: Row(HOST = str(p[0]), DATA = str(p[1].split(":")[0]), REQUISICAO = str(p[3]), RETORNO = int(p[4].split(" ")[0]), BYTES = int(p[4].split(" ")[1])))

In [None]:
# Criando um Data Frame
nasaDF = spSession.createDataFrame(nasaFinal)

In [None]:
# Criando uma tabela temporária
nasaDF.createOrReplaceTempView("nasaTB")

In [None]:
# Executando SQL
spSession.sql("select * from nasaTB").show()

## Questões
<b>Responda as seguintes questões devem ser desenvolvidas em Spark utilizando a sua linguagem de preferência.</b>

### 1. Número de hosts únicos.

### 2. O total de erros 404.

### 3. Os 5 URLs que mais causaram erro 404.

### 4. Quantidade de erros 404 por dia.

### 5. O total de bytes retornados.