In [2]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

In [4]:
spark = (
    SparkSession
    .builder
    .appName('LakeHouse')
    .config('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension')
    .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog')
    .getOrCreate()
)

In [5]:
spark

In [6]:
raw = 's3a://camada-bronze-aula5/futebol'
trusted = 's3a://camada-prata-aula5/futebol'

In [7]:
df = spark.read.csv(raw, header = True)

In [8]:
df.show()

+----------+----------------+-------------+--------------+--------------+--------------+---------+-------------------+-----------+
|      date|   home_teamName|away_teamName|home_scoreHome|away_scoreAway|tournamentName| cityCity|     countryCountry|neutralTRUE|
+----------+----------------+-------------+--------------+--------------+--------------+---------+-------------------+-----------+
|1872-11-30|        Scotland|      England|             0|             0|      Friendly|  Glasgow|           Scotland|      FALSE|
|1873-03-08|         England|     Scotland|             4|             2|      Friendly|   London|            England|      FALSE|
|1874-03-07|        Scotland|      England|             2|             1|      Friendly|  Glasgow|           Scotland|      FALSE|
|1875-03-06|         England|     Scotland|             2|             2|      Friendly|   London|            England|      FALSE|
|1876-03-04|        Scotland|      England|             3|             0|      Frie

In [11]:
from pyspark.sql.functions import col

partidas = (
    df
    .drop('neutralTrue')
    .withColumnRenamed('date','DATA')
    .withColumnRenamed('home_teamName','MANDANTE')
    .withColumnRenamed('away_teamName','VISITANTE')
    .withColumnRenamed('home_scoreHome','GOL_MANDANTE').withColumn('GOL_MANDANTE',col('GOL_MANDANTE').cast('int'))
    .withColumnRenamed('away_scoreAway','GOL_VISITANTE').withColumn('GOL_VISITANTE',col('GOL_VISITANTE').cast('int'))
    .withColumnRenamed('tournamentName','TORNEIO')
    .withColumnRenamed('cityCity','CIDADE')
    .withColumnRenamed('countryCountry','PAIS')
)

In [25]:
partidas = partidas.drop_duplicates()

In [29]:
if not DeltaTable.isDeltaTable(spark,trusted):
    print('Delta nao existe')
    partidas.write.format('delta').mode('overwrite').save(trusted)
else:
    print('Delta existe')
    delta_table = DeltaTable.forPath(spark,trusted)
    delta_table.alias('trusted') \
        .merge(
            partidas.alias('raw'),
            "raw.DATA = trusted.DATA and raw.VISITANTE = trusted.VISITANTE and raw.MANDANTE = trusted.MANDANTE and raw.GOL_MANDANTE = trusted.GOL_VISITANTE "
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
        

Delta nao existe
