# Teste Apache Spark - Willian Henrique Barbosa Rocha
<hr>

## Qual o objetivo do comando cache em Spark?
O comando cache tem como objetivo colocar os data-sets in memory atraves dos cluster, para que o acesso a esses dados fiquem mais rapidos sem a necessidade de ler novamente em disco. Caso nao seja possivel alocar o data-set na memoria,o Spark ira fazer o split desses dados no disco.

## O mesmo código implementado em Spark é normalmente mais rápido que a implementação equivalente em MapReduce. Por quê?
O Spark foi concebido para trabalhar com os dados in-memory, ou seja, utilizando o acesso a memoria, enquanto o Hadoop utiliza-se do acesso ao disco para processar as consultas aos dados. Porem o hardware para processar Spark precisa ser mais robusto, devido ao maior utilizacao de memoria.

## Qual é a função do *SparkContext* ?
O SparkContext representa a conexao, um objeto do cluster Spark, e pode ser utilizado para criar os RDDs, acumuladores e variáveis de broadcast no cluster Spark.

## Explique com suas palavras o  que é Resilient Distributed Datasets (RDD).
RDD's são as estruturas de dados fundamentais do Spark, distribuições imutáveis e tolerantes a falhas das coleções de objetos que são paralelizados através do cluster. Existem 2 tipos de operações: Transformations, Actions.

## *GroupByKey* é menos eficiente que *reduceByKey* em grandes dataset. Por quê?
O comando **GroupByKey** realiza o Suffle dos dados através da rede para formar a lista (K, V), isso é particularmente custoso quando estamos trabalhando com um conjunto de dados de grande volume, entretanto o comando **reduceByKey** realiza o suffle dos dados somente nos resultados da operação reduce, isso resultado em significante redução de tráfico na rede.

## Explique o  que o  código Scala abaixo faz.
```scala
val textFile = sc.textFile("hdfs://...")
val counts   = textFile.flatMap(line => line.split(""))
               .map(word => (word,1))
               .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
```

Codigo de word count, realiza o mapping das palavras, e a realiza a contagem de quantas vezes aquela palavra apareceu.

## HTTP requests to the NASA Kennedy Space Center WWW server
<hr>

**Fonte oficial do dateset:** [http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html](http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html)

**Dados:**
- [Jul​ ​ 01​ ​ to​ ​ Jul​ ​ 31,​ ​ ASCII​ ​ format,​ ​ 20.7​ ​ MB​ ​ gzip​ ​ compressed](ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz)​ , ​ ​ 205.2​ ​ MB.
- [Aug​ ​ 04​ ​ to​ ​ Aug​ ​ 31,​ ​ ASCII​ ​ format,​ ​ 21.8​ ​ MB​ ​ gzip​ ​ compressed](ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz)​ , ​ ​ 167.8​ ​ MB.

**Sobre o dataset**​ : Esses dois conjuntos de dados possuem todas as requisições HTTP para o servidor da NASA Kennedy
Space​ ​ Center​ ​ WWW​ ​ na​ ​ Flórida​ ​ para​ ​ um​ ​ período​ ​ específico.

Os​ ​ logs​ ​ estão​ ​ em​ ​ arquivos​ ​ ASCII​ ​ com​ ​ uma​ ​ linha​ ​ por​ ​ requisição​ ​ com​ ​ as​ ​ seguintes​ ​ colunas:
- **Host fazendo a requisição**​ . Um hostname quando possível, caso contrário o endereço de internet se o nome
não​ ​ puder​ ​ ser​ ​ identificado.
- **Timestamp**​ ​ no​ ​ formato​ ​ "DIA/MÊS/ANO:HH:MM:SS​ ​ TIMEZONE"
- **Requisição​ ​ (entre​ ​ aspas)**
- **Código​ ​ do​ ​ retorno​ ​ HTTP**
- **Total​ ​ de​ ​ bytes​ ​ retornados**

### Data Wrangling

Fontes de consultas utilizadas

- [Manual de referência do SparkSQL](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- [exemplo regex](https://stackoverflow.com/questions/53090003/split-string-in-a-spark-dataframe-column-by-regular-expressions-capturing-groups)
- [exemplo de split column](https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns)

In [158]:
# Configurando a sessao spark
from pyspark.sql import SparkSession

# importando as lib que serao utilizadas para fazer a limpeza dos dados
import pyspark.sql.functions as f
from pyspark.sql.functions import trim
from pyspark.sql.functions import regexp_replace

spark = SparkSession.builder.appName("Teste Técnico de Apache Spark").getOrCreate()

In [159]:
def data_clean_log_file(file_name):
    # https://spark.apache.org/docs/2.2.1/sql-programming-guide.html#generic-loadsave-functions
    df_temp = spark.sql("SELECT * FROM csv.`" + file_name + "`")
    
    df_temp = df_temp.select("_c0",
                             f.split("_c0", " - - ")[0].alias("host_requisicao"),
                             f.split("_c0", " - - ")[1].alias("_c1"))
    
    df_temp = df_temp.select("host_requisicao",
                             f.split(f.regexp_replace("_c1","\[([^]]+)\]",r"$1,"),",")[0]
                             .alias("timestamp_requisicao"),
                             f.split(f.regexp_replace("_c1","\[([^]]+)\]",r"$1,"),",")[1]
                             .alias("_c2"))
    
    df_temp = df_temp.select("host_requisicao","timestamp_requisicao",
                             trim(f.split(f.regexp_replace("_c2","\"([^]]+)\"",r"$1,"),",")[0])
                             .alias("requisicao"),
                             trim(f.split(f.regexp_replace("_c2","\"([^]]+)\"",r"$1,"),",")[1])
                             .alias("_c3"))
    
    df_temp = df_temp.select("host_requisicao","timestamp_requisicao","requisicao",
                             (f.split("_c3", " ")[0]).alias("codigo_retorno_http"),
                             regexp_replace(f.split("_c3", " ")[1],"-","0").alias("bytes_retorno"))
    
    return df_temp

In [173]:
df_jul = data_clean_log_file("access_log_Jul95")
df_ago = data_clean_log_file("access_log_Aug95")

In [174]:
df_ago.show()

+--------------------+--------------------+--------------------+-------------------+-------------+
|     host_requisicao|timestamp_requisicao|          requisicao|codigo_retorno_http|bytes_retorno|
+--------------------+--------------------+--------------------+-------------------+-------------+
|   in24.inetnebr.com|01/Aug/1995:00:00...|GET /shuttle/miss...|                200|         1839|
|     uplherc.upl.com|01/Aug/1995:00:00...|      GET / HTTP/1.0|                304|            0|
|     uplherc.upl.com|01/Aug/1995:00:00...|GET /images/ksclo...|                304|            0|
|     uplherc.upl.com|01/Aug/1995:00:00...|GET /images/MOSAI...|                304|            0|
|     uplherc.upl.com|01/Aug/1995:00:00...|GET /images/USA-l...|                304|            0|
|ix-esc-ca2-07.ix....|01/Aug/1995:00:00...|GET /images/launc...|                200|         1713|
|     uplherc.upl.com|01/Aug/1995:00:00...|GET /images/WORLD...|                304|            0|
|slppp6.in

### Questões
Responda​ ​ as​ ​ seguintes​ ​ questões​ ​ devem​ ​ ser​ ​ desenvolvidas​ ​ em​ ​ Spark​ ​ utilizando​ ​ a ​ ​ sua​ ​ linguagem​ ​ de​ ​ preferência.

#### 1. Número​ ​ de​ ​ hosts​ ​ únicos.

In [172]:
df_jul.createOrReplaceTempView("log_jul")
df_ago.createOrReplaceTempView("log_ago")
sql = """select count(distinct QTD) as QTD 
            from (SELECT host_requisicao as QTD 
                    FROM log_jul
                   union all
                  SELECT host_requisicao FROM log_ago)"""
sqlDF = spark.sql(sql)
sqlDF.show()

+------+
|   QTD|
+------+
|137979|
+------+



#### 2. O​ ​ total​ ​ de​ ​ erros​ ​ 404.

In [182]:
sql = """select count(*) as QTD
            from (SELECT *
                    FROM log_jul
                   union all
                  SELECT * FROM log_ago)
         where codigo_retorno_http = '404'
       """
sqlDF = spark.sql(sql)
sqlDF.show()

+-----+
|  QTD|
+-----+
|20847|
+-----+



#### 3. Os​ ​ 5 ​ ​ URLs​ ​ que​ ​ mais​ ​ causaram​ ​ erro​ ​ 404.

In [187]:
sql = """
        select * from (
        select count(*) as QTD, host_requisicao
            from (SELECT *
                    FROM log_jul
                   union all
                  SELECT * FROM log_ago)
         where codigo_retorno_http = '404'
         group by host_requisicao) 
         order by qtd desc 
         limit 5
       """
sqlDF = spark.sql(sql)
sqlDF.show()

+---+--------------------+
|QTD|     host_requisicao|
+---+--------------------+
|251|hoohoo.ncsa.uiuc.edu|
|157|piweba3y.prodigy.com|
|132|jbiagioni.npt.nuw...|
|114|piweba1y.prodigy.com|
| 91|www-d4.proxy.aol.com|
+---+--------------------+



#### 4. Quantidade​ ​ de​ ​ erros​ ​ 404​ ​ por​ ​ dia.

In [192]:
sql = """
        select count(*) as QTD, substr(timestamp_requisicao,1,2) as dia
            from (SELECT *
                    FROM log_jul
                   union all
                  SELECT * FROM log_ago)
         where codigo_retorno_http = '404'
         group by substr(timestamp_requisicao,1,2)
         order by dia
       """
sqlDF = spark.sql(sql)
sqlDF.show(31)

+----+---+
| QTD|dia|
+----+---+
| 557| 01|
| 291| 02|
| 775| 03|
| 704| 04|
| 732| 05|
|1010| 06|
|1103| 07|
| 688| 08|
| 626| 09|
| 713| 10|
| 734| 11|
| 666| 12|
| 740| 13|
| 698| 14|
| 581| 15|
| 515| 16|
| 673| 17|
| 721| 18|
| 848| 19|
| 730| 20|
| 639| 21|
| 477| 22|
| 577| 23|
| 748| 24|
| 875| 25|
| 702| 26|
| 706| 27|
| 504| 28|
| 420| 29|
| 571| 30|
| 523| 31|
+----+---+



#### 5. O​ ​ total​ ​ de​ ​ bytes​ ​ retornados.

In [193]:
sql = """
        select sum(bytes_retorno) as TOTAL_BYTES
            from (SELECT *
                    FROM log_jul
                   union all
                  SELECT * FROM log_ago)
       """
sqlDF = spark.sql(sql)
sqlDF.show(31)

+--------------+
|   TOTAL_BYTES|
+--------------+
|6.551669721E10|
+--------------+

