#### Start session PySpark

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, desc
import json

spark = SparkSession \
    .builder \
    .appName("Cognitivo.ai Test") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Importando arquivo de configuração com o tipo dos campos

In [25]:
json_file_path = "config/types_mapping.json"

with open(json_file_path, 'r') as j:
     contents = json.loads(j.read())

#### Lendo CSV, alocando em um dataframe e atribuindo o tipo à alguns campos

In [26]:
df = spark.read.csv("data/input/users/load.csv", header=True)

df = df.withColumn('age', col('age').cast(contents['age']))
df = df.withColumn('create_date', col('create_date').cast(contents['create_date']))
df = df.withColumn('update_date', col('update_date').cast(contents['update_date']))

In [27]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



In [28]:
df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9997|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-03-03 18:47:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9998|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-04-14 17:09:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 91234-5678|124 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 04:07:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...

#### Deduplicando os dados, mantendo apenas a última atualização de cada ID

In [32]:
df = df.sort(desc("update_date"))

In [33]:
df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 91234-5678|124 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 04:07:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9998|Mulholland Drive,...

In [34]:
df = df.dropDuplicates(['id'])

In [39]:
df.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+



#### Exportando dataframe em formato parquet

In [36]:
df.write.parquet("data/output/load.parquet", mode='overwrite')

#### Testando a leitura do arquivo parquet

In [37]:
df_p = spark.read.parquet("data/output/load.parquet", header=True)

In [38]:
df_p.show()

+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
| id|                name|               email|          phone|             address|age|         create_date|         update_date|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+
|  3|spongebob.squarep...|Spongebob Squarep...|(11) 98765-4321|122 Conch Street,...| 13|2018-05-19 04:07:...|2018-05-19 05:08:...|
|  2|sherlock.holmes@c...|     Sherlock Holmes|(11) 94815-1623|221B Baker Street...| 34|2018-04-21 20:21:...|2018-04-21 20:21:...|
|  1|david.lynch@cogni...|         David Lynch|(11) 99999-9999|Mulholland Drive,...| 72|2018-03-03 18:47:...|2018-05-23 10:13:...|
+---+--------------------+--------------------+---------------+--------------------+---+--------------------+--------------------+

