# Cognitivo.AI - Atividade Técnica
<sub>*Engenheiro de Dados*</sub>

**Para realizar essa atividade utilizei o Google Colab, por ser um ambiente em nuvem que provê uso de GPU do Google para processamento de dados e que já contêm diversas bibliotecas instaladas.**




Para trabalhar com spark no Google Colab é preciso realizar alguns procedimentos instalando as dependencias e configurando as variáveis de ambientes, conforme os procedimentos abaixo descrevem. 

In [2]:
# instalando as dependencias necessárias para usar o pyspark no google colab
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
# configuração das variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
 
# sequencia para fazer o pyspark ser "importável" para o google colab
import findspark
findspark.init('spark-3.0.1-bin-hadoop3.2')

In [4]:
# iniciar uma sessão local e importar dados
from pyspark.sql import SparkSession, Row
sc = SparkSession.builder.master('local[*]').getOrCreate()
 
# download do http para arquivo local
!wget --quiet --show-progress https://raw.githubusercontent.com/zorrex82/cognitivo_ai_eng_dados/master/load.csv
 
# carregando os dados
df_spark = sc.read.csv("./load.csv", inferSchema=True, header=True)
 
# ver algumas informações sobre os tipos de dados de cada coluna
df_spark.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



Processo de conversão de um dataframe em um arquivo Parquet, optei por usar o Apache Parquet pelos motivos:

1.   Armazenamento Eficiente (a compressão por colunas é mais eficaz, sendo que em cenários de big data optimizar o armazenamento diminui os custos que por vezes pode invalidar ou viabilizar um projeto.
2.   Algoritmo de compressão que pode ser espeficicado por coluna.




In [5]:
# Primeiro salvei o dataframe em um arquivo parquet
df_spark.write.parquet("load.parquet")

# Carreguei o arquivo Parquet criado para realizar algumas consultas e verificar que todas as informações estão preservadas.
parquet_df = sc.read.parquet("load.parquet")

# Então eu crio uma View temporária para executar alguns comandos SQL.
parquet_df.createOrReplaceTempView("loadView")

# Listei todos os itens salvos no Parquet para conferir com o dataframe inicial.
load_view = sc.sql("SELECT * FROM loadView order by id")
load_view.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9998|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-04-14 17:09:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9997|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-03-03 18:47:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 91234-5678|124 Conch Street,...

Processo de deduplicação dos dados contidos no arquivo.
Primeiro fiz uma breve investigação da coluna update_date e em seguida optei por usar as functions do PySpark para selecionar a coluna e remover as entradas duplicadas mantendo apenas a mais recente atualizada.

In [6]:
# Investigando melhor a coluna update_date que será usada para a deduplicação dos dados
load_view.describe(['update_date']).show()

+-------+--------------------+
|summary|         update_date|
+-------+--------------------+
|  count|                   6|
|   mean|                null|
| stddev|                null|
|    min|2018-03-03 18:47:...|
|    max|2018-05-23 10:13:...|
+-------+--------------------+



In [7]:
# Importando as funções do Pyspark e removendo os dados duplicados com base nos criterios pré estabelecidos
from pyspark.sql.functions import * 

df = load_view.orderBy(col('update_date').desc()).dropDuplicates(['id'])
df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



*Fiz um map do arquivo json para utilizar na formatação do resultado usando as funções read.format do spark passando o arquivo json como option e por fim carregando o arquivo csv com os campos selecionados.*

In [8]:
# Importando as bibliotecas necessarias
import json
import urllib.request

# Carregando o arquivo json
url = "https://raw.githubusercontent.com/zorrex82/cognitivo_ai_eng_dados/master/types_mapping.json"

json_url = urllib.request.urlopen(url)

# Criando o map do pipeline
pipeline = json.loads(json_url.read())


In [9]:
# Criei uma variavel para conter os nomes das colunas desejadas
colunas = ['age', 'create_date', 'update_date']

# Gerei um novo dataframe somente com os campos informados
new_df = df[colunas]

# Novo arquivo csv para gerar um arquivo final
new_df.write.csv("new_load.csv")

In [12]:
# Gerendo o arquivo com as transformações dos campos.
df = sc.read.format('com.databricks.spark.csv').option("pipeline", pipeline).load("new_load.csv")

In [13]:
df.toDF(*colunas).printSchema()

root
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)

