In [1]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from delta.tables import *
import datetime
import pytz

spark = (
    SparkSession
    .builder
    .appName('Lakehouse_OlhoVivo_Raw_To_Trusted')
    .config('spark.sql.extensions','oi.delta.DeltaSparkSessionExtension')
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
    .getOrCreate()
)

In [2]:
spark

In [31]:
date = datetime.datetime.today()
d = date.astimezone(pytz.timezone('America/Sao_Paulo'))
d.strftime('%Y-%m-%d')
path_raw = 's3a://raw/olhovivo/linha/buscar/'#{}/'.format(d.strftime('%Y-%m-%d'))
path_trusted = 's3a://trusted/olhovivo/linha/buscar/'#{}/'#.format(d.strftime('%Y-%m-%d'))

Path RAW: s3a://raw/olhovivo/linha/buscar/
Path TRUSTED: s3a://trusted/olhovivo/linha/buscar/


In [33]:
print(f"Path RAW: {path_raw}")
print(f"Path TRUSTED: {path_trusted}")

Path RAW    : s3a://raw/olhovivo/linha/buscar/
Path TRUSTED: s3a://trusted/olhovivo/linha/buscar/


In [54]:
df_raw = spark.read.json(path_raw)
df_raw.show(1000)

+-----+-----+----+---+---+--------------------+------------+
|   cl|   lc|  lt| sl| tl|                  tp|          ts|
+-----+-----+----+---+---+--------------------+------------+
| 2506|false|8000|  1|  1|PÇA. RAMOS DE AZE...|  TERM. LAPA|
|35274|false|8000|  2|  1|PÇA. RAMOS DE AZE...|  TERM. LAPA|
| 1273|false|8000|  1| 10|PÇA. RAMOS DE AZE...|  TERM. LAPA|
|34041|false|8000|  2| 10|PÇA. RAMOS DE AZE...|  TERM. LAPA|
|  798|false|8010|  1| 10|                LAPA|       PERUS|
|33566|false|8010|  2| 10|                LAPA|       PERUS|
| 1737| true|8010|  1| 21|     SÍTIO BOTUQUARA|  CPTM PERUS|
|34505| true|8010|  2| 21|     SÍTIO BOTUQUARA|  CPTM PERUS|
| 2562|false|8030|  1| 10|     TERM. VL. SÔNIA|PARAISÓPOLIS|
|35330|false|8030|  2| 10|     TERM. VL. SÔNIA|PARAISÓPOLIS|
| 1726| true|8020|  1| 10|       SHOP. MORUMBI|     BUTANTÃ|
|34494| true|8020|  2| 10|       SHOP. MORUMBI|     BUTANTÃ|
| 1726| true|8020|  1| 10|       SHOP. MORUMBI|     BUTANTÃ|
|34494| true|8020|  2| 1

In [37]:
df_raw.printSchema()

root
 |-- cl: long (nullable = true)
 |-- lc: boolean (nullable = true)
 |-- lt: string (nullable = true)
 |-- sl: long (nullable = true)
 |-- tl: long (nullable = true)
 |-- tp: string (nullable = true)
 |-- ts: string (nullable = true)



In [55]:
df_merge = (
    df_raw.select('cl','lc')
)
df_merge.show()

+-----+-----+
|   cl|   lc|
+-----+-----+
| 2506|false|
|35274|false|
| 1273|false|
|34041|false|
|  798|false|
|33566|false|
| 1737| true|
|34505| true|
| 2562|false|
|35330|false|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
+-----+-----+
only showing top 20 rows



In [56]:
total_records = df_merge.count()
print(total_records)

26


In [None]:
#df_merge.write.format('delta').mode('append').save(path_trusted)

In [57]:
# Deduplicate the source DataFrame
df_deduplicated = df_merge.select('cl', 'lc').dropDuplicates(['cl'])
#df_deduplicated.cache()
#df_deduplicated.count()  # This will force the deduplication to be computed

if not DeltaTable.isDeltaTable(spark, path_trusted):
    df_deduplicated.write.format('delta').mode('append').save(path_trusted)
    print('TRUSTED criado com sucesso.')
else:
    delta_table = DeltaTable.forPath(spark, path_trusted)
    delta_table.alias('trusted') \
    .merge(
            df_deduplicated.alias('raw'), 'trusted.cl = raw.cl'
    ) \
    .whenMatchedUpdate(set = {
        'trusted.lc':col('raw.lc')
    }) \
    .whenNotMatchedInsert(values = {
        'trusted.cl':col('raw.cl'),
        'trusted.lc':col('raw.lc')
    }) \
    .execute()
    print('MERGE executado com sucesso.')

MERGE executado com sucesso.


In [59]:
df_merge.show(1000)
df_deduplicated.show(1000)

+-----+-----+
|   cl|   lc|
+-----+-----+
| 2506|false|
|35274|false|
| 1273|false|
|34041|false|
|  798|false|
|33566|false|
| 1737| true|
|34505| true|
| 2562|false|
|35330|false|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
| 1726| true|
|34494| true|
|  418|false|
|33186|false|
+-----+-----+

+-----+-----+
|   cl|   lc|
+-----+-----+
| 1737| true|
|33566|false|
| 2562|false|
| 1273|false|
| 2506|false|
|35274|false|
|34505| true|
|34041|false|
|  798|false|
|35330|false|
|34494| true|
| 1726| true|
|  418|false|
|33186|false|
+-----+-----+



In [60]:
print(delta_table.toDF().count())

14


In [62]:
#df_trusted = spark.read.parquet(path_trusted)
df_trusted = spark.read.format('delta').load(path_trusted)
#df_trusted.printSchema()
df_trusted.show(1000)



+-----+-----+
|   cl|   lc|
+-----+-----+
|  418|false|
|  798|false|
| 1273|false|
| 1726| true|
| 1737| true|
| 2506|false|
| 2562|false|
|33186|false|
|33566|false|
|34041|false|
|34494| true|
|34505| true|
|35274|false|
|35330|false|
+-----+-----+

