# Aprender Spark na Prática
## Você foi contratado como Engenheiro de Dados na Empresa AllWeDoWithSpark e sua primeira tarefa será executar as seguintes atividades em um sample de um dataset que foi disponibilizado pelo cliente:
- Conversão do formato dos arquivos: Converter o arquivo CSV presente no diretório data/input/users/load.csv, para um formato colunar de alta performance de leitura de sua escolha. Justificar brevemente a escolha do formato;
- Deduplicação dos dados convertidos: No conjunto de dados convertidos haverão múltiplas entradas para um mesmo registro, variando apenas os valores de alguns dos campos entre elas. Será necessário realizar um processo de deduplicação destes dados, a fim de apenas manter a última entrada de cada registro, usando como referência o id para identificação dos registros duplicados e a data de atualização (update_date) para definição do registro mais recente;
- Conversão do tipo dos dados deduplicados: No diretório config haverá um arquivo JSON de configuração (types_mapping.json), contendo os nomes dos campos e os respectivos tipos desejados de output. Utilizando esse arquivo como input, realizar um processo de conversão dos tipos dos campos descritos, no conjunto de dados deduplicados;

### Para esta atividade, está sendo utilizada uma Data Science Virtual Machine no MS Azure. Leia mais sobre ela em https://docs.microsoft.com/en-us/azure/machine-learning/data-science-virtual-machine/

In [1]:
# Realizando todos os imports necessários
import pandas as pd
import pyspark
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

In [2]:
# Definindo os caminhos
SOURCE_FILE = r'C:\Users\paulo\Downloads\teste-eng-dados\data\input\users\load.csv'
WRITE_DIR = r'C:\Users\paulo\Downloads\teste-eng-dados\data\output'
CONFIG_FILE =  r'C:\Users\paulo\Downloads\teste-eng-dados\config\types_mapping.json'

In [3]:
# Explorando o arquivo de origem com Pandas
pd_sourcefile = pd.read_csv(SOURCE_FILE, sep=',')
pd_sourcefile.head()

Unnamed: 0,id,name,email,phone,address,age,create_date,update_date
0,1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03 18:47:01.954752,2018-03-03 18:47:01.954752
1,1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03 18:47:01.954752,2018-04-14 17:09:48.558151
2,2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21 20:21:24.364752,2018-04-21 20:21:24.364752
3,3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19 04:07:06.854752,2018-05-19 04:07:06.854752
4,1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03 18:47:01.954752,2018-05-23 10:13:59.594752


### Dado os requisitos, realizaremos o processamento ETL com Apache Spark na seguinte ordem:
1. Carregamento do arquivo para um dataframe do Spark
2. Deduplicação dos dados
3. Conversão dos tipos de dados a partir dos tipos sugeridos no arquivo de configuração
4. Escrita do arquivo do diretório de origem

In [4]:
# Construindo uma Sessão Spark e Executando a Carga do Arquivo de Origem:
spark = SparkSession.builder.appName("A Simple Spark ETL Processing Example").enableHiveSupport().getOrCreate()
df_sourcefile = spark.read.format("csv").option("header", "true").load(SOURCE_FILE)

In [5]:
# Desduplicando os Dados no Dataframe utilizando Window Functions
windowSpec = Window.partitionBy(df_sourcefile['id']).orderBy(df_sourcefile['update_date'].desc())
df_deduped = df_sourcefile.withColumn("rank_id", rank().over(windowSpec)).filter(col('rank_id') == 1).drop(col("rank_id")).sort(col('id').asc())

In [6]:
# Verificando os dados desduplicados
df_deduped.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



In [7]:
# Convertendo os tipos de dados baseado no arquivo de configuração
# Primeiro, vamos dar uma olhada no schema atual
df_deduped.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



In [8]:
# Checando o arquivo de configuração, devemos alterar três colunas: age, create_date e update_date
def cast_from_file(json_file, df):
    with open(json_file) as types_mapping:
        data = json.load(types_mapping)
        for d in data:
            df = df.withColumn(d,col(d).cast(data[d]))
    return df

In [9]:
# Aplicando a função definida para conversão
df_converted = cast_from_file(CONFIG_FILE, df_deduped)

In [10]:
# Opcional: considerando o dataset, vamos adicionalmente converter a coluna id
df_converted = df_converted.withColumn('id', col('id').cast('int'))

In [11]:
# Validando o schema após a função
df_converted.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



### O Parquet foi escolhido como formato de saída porque, além de ser um formato colunar de alta performance de leitura, é otimizado para o Spark, comumente adotado quando se precisa armazenar terabytes de dados em um Data Lake e trabalha melhor a soluções como AWS Athena, Google BigQuery etc

In [12]:
# Escrevendo no diretório de saída no formato Parquet. Opcionalmente pode se adotar no futuro o partitionBy
df_converted.coalesce(1).write.mode("overwrite").parquet(WRITE_DIR)