### Importando a biblioteca pyspark e criando uma sessão e contexto do spark para utilização

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("etl") \
    .getOrCreate()

### Importando o arquivo de mapeamento para conversão de data types

In [None]:
mappings = spark.read.option("multiline","true").json('config/types_mapping.json')

In [None]:
mappings.show()

In [None]:
mappings = mappings.select("age", "create_date", "update_date").first()

### Importando o arquivo .csv de input

In [None]:
df = spark.read.csv('data/input/load.csv', header='true')

### Ordenando o dataframe para Utilizar o update_date como filtro para obter os dados mais novos

In [None]:
from pyspark.sql.functions import col, max as max_
df_ordenado = df.withColumn("update_date", col("update_date").cast("timestamp")).groupBy("id", "name", "email", "phone", "address", "age", "create_date", "update_date").agg(max_("update_date"))

### Coletando apenas o ultimo registro de cada ID com o filtro de data mais recente da coluna Update_date e convertendo os datatypes fornecidos no arquivo JSON de mapeamento que foi carregado no inicio do processo.

In [None]:
from pyspark.sql.functions import col, max as max_

df_transformado = df_ordenado.withColumn("update_date", col("update_date").cast(mappings.update_date)) \
    .withColumn("create_date", col("create_date").cast(mappings.create_date)) \
    .withColumn("age", col("age").cast(mappings.age)) \
    .groupBy("id", "name", "email", "phone", "address", "age", "create_date").agg(max_("update_date")) \
    .dropDuplicates(["id"]).orderBy("id")

In [None]:
df_transformado.printSchema()

### Renomeando a coluna para update_date

In [None]:
df_transformado = df_transformado.withColumnRenamed("max(update_date)", "update_date")

### Gravando o arquivo no formato parquet na pasta de saida

In [None]:
df_transformado.coalesce(1).write.mode('overwrite').parquet("data/output/cadastro/")

In [None]:
dt_parquet = spark.read.parquet('data/output/cadastro/')
dt_parquet.show()