# EML4 - Aula 04 - Processamento de Massivo de Dados

Principais links:
* [Spark docs](https://spark.apache.org/docs/latest/)
* [Dataset Adult Pre-processed](https://drive.google.com/file/d/1WLKTZUnhcVVwwBDvwe84TRSKdyHd-0uh/view?usp=sharing)
  * Dados do censo de 1994
  * [Link Repositório Original](https://archive.ics.uci.edu/ml/datasets/Adult)
    *  Formato: csv

Nos notebooks da Databricks, o Spark Session fica disponivel pela variável global `spark`

In [1]:
spark

In [2]:
# Cole aqui o caminho gerado para o seu dataset
dataset_location = "adult-preprocessed.data"

### Upload do Dataset
* Faça upload do conjunto de dados "Adult" preprocessado que disponibilizamos junto ao notebook da aula
* Obtenha o caminho para DFS onde foi armazenado o arquivo e coloque na variável "dataset_location" no próxima célula

Uma fez feito o upload, o arquivo será divido, replicado e colocado em um DFS (Distributed File System). Para acessá-lo precisaremos utilizar uma estrutura que o abstraia.

![Unified Engine](https://files.training.databricks.com/images/105/unified-engine.png)

Inicialmente, a estrutura básica do Spark foi definida como RDD

* **R**esilient: Fault-tolerant
* **D**istributed: Across multiple nodes
* **D**ataset: Collection of partitioned data

RDDs são imutáveis quando criados e mantém registros de sua linhagem para recuperação de falhas! Estas estruturas permitem o acesso ao conjunto de dados distribuído, mas utilizam memória principal sempre que possível. Outras abstrações do Spark utilizam um RDD para acesso aos dados armazenados.

Contudo, existem abstrações de alto nível, mais fáceis de usar e que possibilitam um resultado com melhor performance como os DataFrames!

![RDD vs DataFrames](https://files.training.databricks.com/images/105/rdd-vs-dataframes.png)

### DataFrame
Através da sessão, vamos importar o dataset para um DataFrame

In [3]:
dataset = spark.read.format('csv') \
               .option('inferSchema', True) \
               .option('header', False) \
               .option('sep', ',') \
               .load(dataset_location)

display(dataset)

DataFrame[_c0: int, _c1: string, _c2: int, _c3: string, _c4: int, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: int, _c11: int, _c12: int, _c13: string, _c14: string]

É possível obter o esquema de dados de um DataFrame usando o método `printSchema()`.

In [4]:
dataset.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)



O data set foi carregado como um DataFrame.

Uma vez que o dataset não possuía header, não foi possível atribuir o nome das colunas automaticamente. 

Agora, vamos atribuir manualmente o nome às colunas

In [5]:
dataset1 = dataset \
            .withColumnRenamed('_c0', 'idade') \
            .withColumnRenamed('_c1', 'classe_trabalho') \
            .withColumnRenamed('_c2', 'final_weight') \
            .withColumnRenamed('_c3', 'escolaridade') \
            .withColumnRenamed('_c4', 'escolaridade_num') \
            .withColumnRenamed('_c5', 'estado_civil') \
            .withColumnRenamed('_c6', 'ocupacao') \
            .withColumnRenamed('_c7', 'relacionamento_householder') \
            .withColumnRenamed('_c8', 'raca') \
            .withColumnRenamed('_c9', 'sexo') \
            .withColumnRenamed('_c10', 'ganho_capital') \
            .withColumnRenamed('_c11', 'perda_capital') \
            .withColumnRenamed('_c12', 'jornada_trabalho') \
            .withColumnRenamed('_c13', 'nacionalidade') \
            .withColumnRenamed('_c14', 'renda_anual')

display(dataset1)

DataFrame[idade: int, classe_trabalho: string, final_weight: int, escolaridade: string, escolaridade_num: int, estado_civil: string, ocupacao: string, relacionamento_householder: string, raca: string, sexo: string, ganho_capital: int, perda_capital: int, jornada_trabalho: int, nacionalidade: string, renda_anual: string]

In [6]:
dataset1.printSchema()

root
 |-- idade: integer (nullable = true)
 |-- classe_trabalho: string (nullable = true)
 |-- final_weight: integer (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- escolaridade_num: integer (nullable = true)
 |-- estado_civil: string (nullable = true)
 |-- ocupacao: string (nullable = true)
 |-- relacionamento_householder: string (nullable = true)
 |-- raca: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- ganho_capital: integer (nullable = true)
 |-- perda_capital: integer (nullable = true)
 |-- jornada_trabalho: integer (nullable = true)
 |-- nacionalidade: string (nullable = true)
 |-- renda_anual: string (nullable = true)



In [7]:
# Selecionando colunas específicas
dataset1.select('idade', 'nacionalidade').show(5)

+-----+-------------+
|idade|nacionalidade|
+-----+-------------+
|   39|United-States|
|   50|United-States|
|   38|United-States|
|   53|United-States|
|   28|         Cuba|
+-----+-------------+
only showing top 5 rows



In [8]:
# Operando colunas
dataset1.select( \
                'ganho_capital', 'perda_capital', \
                dataset1['ganho_capital'] - dataset1['perda_capital'] \
               ).show(5)

+-------------+-------------+-------------------------------+
|ganho_capital|perda_capital|(ganho_capital - perda_capital)|
+-------------+-------------+-------------------------------+
|         2174|            0|                           2174|
|            0|            0|                              0|
|            0|            0|                              0|
|            0|            0|                              0|
|            0|            0|                              0|
+-------------+-------------+-------------------------------+
only showing top 5 rows



In [9]:
# Utilizando expressões
from pyspark.sql.functions import expr
dataset1.select(\
     'ganho_capital', 'perda_capital',\
     expr('ganho_capital - perda_capital as capital_liquido')\
).show(5)

+-------------+-------------+---------------+
|ganho_capital|perda_capital|capital_liquido|
+-------------+-------------+---------------+
|         2174|            0|           2174|
|            0|            0|              0|
|            0|            0|              0|
|            0|            0|              0|
|            0|            0|              0|
+-------------+-------------+---------------+
only showing top 5 rows



In [10]:
# Utilizando expressões com agregação
dataset1.selectExpr('avg(idade)').show()

+-----------------+
|       avg(idade)|
+-----------------+
|38.58164675532078|
+-----------------+



#### Exercício 1
Obtenha o valor máximo de capital líquido (ganho de capital - perda de capital)

In [11]:
dataset1.selectExpr("max(ganho_capital - perda_capital)").show()

+------------------------------------+
|max((ganho_capital - perda_capital))|
+------------------------------------+
|                               99999|
+------------------------------------+



In [12]:
# Filtrando registros
dataset1.filter('idade < 30').show(2)
dataset1.where('idade > 30').show(2)

+-----+---------------+------------+------------+----------------+------------------+--------------+--------------------------+-----+------+-------------+-------------+----------------+-------------+-----------+
|idade|classe_trabalho|final_weight|escolaridade|escolaridade_num|      estado_civil|      ocupacao|relacionamento_householder| raca|  sexo|ganho_capital|perda_capital|jornada_trabalho|nacionalidade|renda_anual|
+-----+---------------+------------+------------+----------------+------------------+--------------+--------------------------+-----+------+-------------+-------------+----------------+-------------+-----------+
|   28|        Private|      338409|   Bachelors|              13|Married-civ-spouse|Prof-specialty|                      Wife|Black|Female|            0|            0|              40|         Cuba|      <=50K|
|   23|        Private|      122272|   Bachelors|              13|     Never-married|  Adm-clerical|                 Own-child|White|Female|            

In [13]:
# Filtrando registros
dataset1\
  .filter('idade < 30')\
  .where(dataset1['estado_civil'] == 'Never-married')\
  .filter(dataset1['nacionalidade'] != 'United-States')\
  .show(5)

+-----+---------------+------------+------------+----------------+-------------+---------------+--------------------------+------------------+------+-------------+-------------+----------------+-------------+-----------+
|idade|classe_trabalho|final_weight|escolaridade|escolaridade_num| estado_civil|       ocupacao|relacionamento_householder|              raca|  sexo|ganho_capital|perda_capital|jornada_trabalho|nacionalidade|renda_anual|
+-----+---------------+------------+------------+----------------+-------------+---------------+--------------------------+------------------+------+-------------+-------------+----------------+-------------+-----------+
|   18|        Private|      226956|     HS-grad|               9|Never-married|  Other-service|                 Own-child|             White|Female|            0|            0|              30|            ?|      <=50K|
|   27|        Private|      213921|     HS-grad|               9|Never-married|  Other-service|                 Own

In [14]:
#ordenando por idade e filtrando por sexo
dataset1\
  .sort(dataset1['idade'].desc())\
  .filter(dataset1['sexo'] == 'Male')

#esse código tem boa performance?  

DataFrame[idade: int, classe_trabalho: string, final_weight: int, escolaridade: string, escolaridade_num: int, estado_civil: string, ocupacao: string, relacionamento_householder: string, raca: string, sexo: string, ganho_capital: int, perda_capital: int, jornada_trabalho: int, nacionalidade: string, renda_anual: string]

Porque os resultados não são mostrados na célula acima?

Ordenação e filtros são transformações, que são avaliadas de forma *lazy* pelo Spark.

Isso gera várias vantagens, dentre elas: impede a leitura desnecessária do conjunto de dados; facilita o paralelismo; possibilita otimização!
  
Para saber mais sobre o otimizador do Spark **Catalyst** leia [esse blog!](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)
  
![Catalyst](https://files.training.databricks.com/images/105/catalyst-diagram.png)

#### Exercício 2

Obtenha a idade média das pessoas viúvas com jornada de trabalho acima de 20 horas semanais

In [15]:
#Faça aqui o código do exercício 2

dataset1 \
    .filter("jornada_trabalho > 20") \
    .filter(dataset1["estado_civil"] == "Widowed") \
    .selectExpr("avg(idade)") \
    .show()

+------------------+
|        avg(idade)|
+------------------+
|56.394101876675606|
+------------------+



In [16]:
# Obtendo valores distintos
dataset1.select('estado_civil').distinct().show()

+--------------------+
|        estado_civil|
+--------------------+
|           Separated|
|       Never-married|
|Married-spouse-ab...|
|            Divorced|
|             Widowed|
|   Married-AF-spouse|
|  Married-civ-spouse|
+--------------------+



#### Exercício 3
Obtendo valores distintos para a combinação de sexo e raça para pessoas com idade acima de 60 anos

In [17]:
#Faça aqui o código do exercício 3
dataset1 \
    .filter("idade > 60") \
    .select("sexo", "raca") \
    .distinct() \
    .show()

+------+------------------+
|  sexo|              raca|
+------+------------------+
|  Male|             White|
|Female|Asian-Pac-Islander|
|Female|             White|
|Female|Amer-Indian-Eskimo|
|  Male|             Other|
|  Male|             Black|
|  Male|Asian-Pac-Islander|
|  Male|Amer-Indian-Eskimo|
|Female|             Other|
|Female|             Black|
+------+------------------+



In [18]:
# Obtendo valores agregados, dado um agrupamento
from pyspark.sql.functions \
    import count, sum, max, min, avg

dataset1 \
    .groupBy('sexo') \
    .agg(avg('idade')) \
    .show()

+------+-----------------+
|  sexo|       avg(idade)|
+------+-----------------+
|Female|36.85823043357163|
|  Male|39.43354749885268|
+------+-----------------+



In [19]:
# Obtendo valores agregados, dado vários agrupamentos
dataset1 \
    .groupBy('sexo', 'estado_civil') \
    .agg(avg('idade'), max('idade'), min('idade')) \
    .show()

+------+--------------------+------------------+----------+----------+
|  sexo|        estado_civil|        avg(idade)|max(idade)|min(idade)|
+------+--------------------+------------------+----------+----------+
|  Male|   Married-AF-spouse|31.444444444444443|        47|        26|
|Female|   Married-AF-spouse|33.142857142857146|        75|        19|
|  Male|            Divorced|42.688311688311686|        83|        18|
|  Male|             Widowed|61.666666666666664|        90|        18|
|  Male|Married-spouse-ab...|41.568075117370896|        79|        18|
|  Male|  Married-civ-spouse|43.708386515504166|        90|        17|
|Female|           Separated| 39.59904912836767|        90|        19|
|Female|             Widowed| 58.42787878787879|        90|        22|
|Female|            Divorced| 43.27582335329341|        90|        19|
|Female|  Married-civ-spouse| 39.54375377187689|        90|        17|
|  Male|           Separated| 38.96192893401015|        74|        18|
|Femal

#### Exercício 4
Obtenha a média de capital líquido (ganho de capital - perda de capital) por escolaridade de pessoas com idade acima de 30 anos

In [20]:
#Faça aqui o código do exercício 4

dataset1 \
    .filter('idade > 30') \
    .groupBy('escolaridade') \
    .agg(avg(expr('ganho_capital - perda_capital'))) \
    .show()

+------------+------------------------------------+
|escolaridade|avg((ganho_capital - perda_capital))|
+------------+------------------------------------+
|     Masters|                  2643.3674540682414|
|        10th|                   533.1254612546126|
|     5th-6th|                  132.26141078838174|
|  Assoc-acdm|                   732.9663072776281|
|   Assoc-voc|                   655.4662638469285|
|     7th-8th|                  196.75375939849624|
|         9th|                   430.9438202247191|
|     HS-grad|                   638.0167474048443|
|   Bachelors|                  2117.6493677555322|
|        11th|                   262.4105461393597|
|     1st-4th|                   94.40579710144928|
|   Preschool|                   72.71794871794872|
|        12th|                  468.90909090909093|
|   Doctorate|                   4751.202046035805|
|Some-college|                   776.4566966398486|
| Prof-school|                  10928.190291262135|
+-----------

In [21]:
# Obtendo as 5 ocupações com maior ganho de capital médio

from pyspark.sql.functions import asc, desc
dataset1 \
    .groupBy('ocupacao') \
    .agg(avg('ganho_capital').alias('ganho_medio')) \
    .orderBy(desc('ganho_medio')) \
    .limit(5) \
    .show()

+---------------+------------------+
|       ocupacao|       ganho_medio|
+---------------+------------------+
| Prof-specialty|2726.6995169082124|
|Exec-managerial|2262.7729955730447|
|          Sales| 1319.829315068493|
|Protective-serv| 708.0986132511556|
|   Tech-support| 673.5528017241379|
+---------------+------------------+



#### Exercício 5
Obtenha a combinação de escolaridade e ocupação com menor jornada de trabalho média que tenham renda maior de 50 mil dólares por ano

In [22]:
#Faça aqui o código do exercício 5

dataset1 \
    .filter(dataset1['renda_anual'] ==  '>50K') \
    .groupBy('escolaridade', 'ocupacao') \
    .agg(avg('jornada_trabalho').alias('jornada_media')) \
    .orderBy('jornada_media') \
    .limit(1) \
    .show()

+------------+-------------+-------------+
|escolaridade|     ocupacao|jornada_media|
+------------+-------------+-------------+
|     Masters|Other-service|         15.0|
+------------+-------------+-------------+



In [23]:
# Usando collect, take ou first para obter os valores calculados

resultado =  dataset1 \
    .groupBy('estado_civil')\
    .agg(avg('idade')) \
    .collect()

print(resultado)

print(resultado[0]['avg(idade)'])

[Row(estado_civil='Separated', avg(idade)=39.35414634146341), Row(estado_civil='Never-married', avg(idade)=28.150987550313584), Row(estado_civil='Married-spouse-absent', avg(idade)=40.578947368421055), Row(estado_civil='Divorced', avg(idade)=43.04163853252307), Row(estado_civil='Widowed', avg(idade)=58.97583081570997), Row(estado_civil='Married-AF-spouse', avg(idade)=32.47826086956522), Row(estado_civil='Married-civ-spouse', avg(idade)=43.24759615384615)]
39.35414634146341


In [24]:
#Obter a idade média por ocupação, substituindo o agrupamento de dados por um laço iterativo (não é uma estratégia recomendada)

ocupacoes = dataset1.select('ocupacao').distinct().collect()

print(ocupacoes)

for i in ocupacoes:
  ocupacao = i['ocupacao']
  
  idade_media = dataset1 \
      .filter(dataset1['ocupacao'] == ocupacao) \
      .selectExpr('avg(idade)') \
      .first()
  
  print("%s: %f" % (ocupacao, idade_media['avg(idade)']))

[Row(ocupacao='Sales'), Row(ocupacao='Exec-managerial'), Row(ocupacao='Prof-specialty'), Row(ocupacao='Handlers-cleaners'), Row(ocupacao='Farming-fishing'), Row(ocupacao='Craft-repair'), Row(ocupacao='Transport-moving'), Row(ocupacao='Priv-house-serv'), Row(ocupacao='Protective-serv'), Row(ocupacao='Other-service'), Row(ocupacao='Tech-support'), Row(ocupacao='Machine-op-inspct'), Row(ocupacao='Armed-Forces'), Row(ocupacao='?'), Row(ocupacao='Adm-clerical')]
Sales: 37.353973
Exec-managerial: 42.169208
Prof-specialty: 40.517633
Handlers-cleaners: 32.165693
Farming-fishing: 41.211268
Craft-repair: 39.031471
Transport-moving: 40.197871
Priv-house-serv: 41.724832
Protective-serv: 38.953775
Other-service: 34.949621
Tech-support: 37.022629
Machine-op-inspct: 37.715285
Armed-Forces: 30.222222
?: 40.882800
Adm-clerical: 36.964456


### API SQL

In [25]:
# Para utilizar a API, vamos disponibilizar nosso DataFrame em formato de tabela para a API SQL

temp_table_name = 'dataset'
dataset1.createOrReplaceTempView(temp_table_name)

In [26]:
# A partir de agora, é possível fazer consultas SQL sobre a tabela recém-criada (dataset).

spark.sql('''
  select sexo, avg(idade) as idade_media
  from dataset
  group by sexo
''').show()

+------+-----------------+
|  sexo|      idade_media|
+------+-----------------+
|Female|36.85823043357163|
|  Male|39.43354749885268|
+------+-----------------+



In [33]:
# Isto funciona somente no databricks:
# %sql
# -- Consulta a partir de uma célula SQL
# select sexo, avg(idade) as idade_media
# from dataset
# group by sexo

#### Exercício 6
O atributo final_weight representa quantas vezes uma determinada leitura do censo é repetida, ou seja, o peso daquela linha no conjunto de dados. Itere sobre a lista das 5 maiores nacionalidades (baseado na soma do final weight) e para cada uma, obtenha os três estados civis com maior jornada de trabalho médio. Utilize a API SQL.

In [34]:
#Faça aqui o código do exercício 6

In [36]:
nacionalidades = spark.sql('''
    select nacionalidade, sum(final_weight) qtde
    from dataset
    group by nacionalidade
    order by qtde desc
    limit 5
''').collect()

for row in nacionalidades:
    nacionalidade = row["nacionalidade"]
    print(nacionalidade)

    res = spark.sql('''
        select estado_civil, avg(jornada_trabalho) as jornada_media
        from dataset
        where nacionalidade = '%s'
        group by estado_civil
        order by jornada_media desc
        limit 3
    ''' % nacionalidade).collect()

    for row2 in res:
        print(" %s: %d" % row2["estado_civil", row2["jornada_media"]])


United-States
Mexico
?
Philippines
El-Salvador
