In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType, DateType
import sys
import os
from delta import DeltaTable
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
from delta.tables import *
import io
import json

In [0]:
def create_spark_session():
    return SparkSession \
        .builder \
        .appName("File Streaming Demo") \
        .master("local[3]") \
        .config("spark.databricks.delta.schema.autoMerge.enabled", "true")\
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .enableHiveSupport()\
        .getOrCreate()

In [0]:
def create_deltaTable_insert_update_rows(spark:SparkSession,columns:list, location:str,merge_condition:str,df:DataFrame):
    if (DeltaTable.isDeltaTable(spark, location)):
        print('tabela delta existente')
        deltaTable = DeltaTable.forPath(spark, location)
        deltaTable.alias('tgt') \
            .merge(
                df.alias('src'),
                merge_condition
            ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
    else:
        print('tabela delta inexistente')    
        DeltaTable \
            .create(spark) \
            .addColumns(columns) \
            .location(location) \
            .execute()
        deltaTable = DeltaTable.forPath(spark, location)
        deltaTable.alias('tgt') \
            .merge(
                df.alias('src'),
                merge_condition
            ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()


In [0]:
location_bronze = '/FileStore/bronze/dados_degue/chuvas'

#### Leiura camada bronze em dataframe spark

In [0]:
df_chuva = spark.read.format('delta').option('header',True).option('Sep','|').option('InferSchema', True).load(location_bronze)

In [0]:
df_chuva.display()

data,mm,uf
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA
2015-09-02,-9999,PA


#### Transformações camada silver 

In [0]:
from pyspark.sql.functions import year, month, dayofmonth

In [0]:
df_chuva = df_chuva.withColumn("ano", year("data")) \
       .withColumn("mes", month("data")) \
       .withColumn("dia", dayofmonth("data"))

df_chuva.display()

data,mm,uf,ano,mes,dia
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2


In [0]:
df_chuva = df_chuva.withColumnRenamed("data", "data_medicao") \
       .withColumnRenamed("mm", "mm") \
       .withColumnRenamed("uf", "estado") 

In [0]:
df_chuva.display()

data_medicao,mm,estado,ano,mes,dia
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2
2015-09-02,-9999,PA,2015,9,2


In [0]:
path_silver_chuva= '/FileStore/silver/dados_degue/chuvas'

#### Processo de Merge/Update para camada Silver

In [0]:
merge_condition = "tgt.data_medicao = src.data_medicao and tgt.estado = src.estado"

In [0]:
columns = [
    StructField('data_medicao', DateType(), True),
    StructField('mm', IntegerType(), True),
    StructField('estado', StringType(), True),
    StructField('dia', IntegerType(), True),
    StructField('mes', IntegerType(), True),
    StructField('ano', IntegerType(), True)
]

In [0]:
create_deltaTable_insert_update_rows(spark,columns, path_silver_chuva, merge_condition, df_chuva)

tabela delta inexistente
