# Gerar Silver

In [1]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [2]:
import ipywidgets as widgets
import matplotlib.pyplot as plt
import pyspark.pandas as ps
import seaborn as sns

from delta import *

from matplotlib import dates
from pyspark.sql.avro.functions import *
from pyspark.sql.functions import col, to_date, date_format, current_timestamp, lag, to_timestamp, when, lit
from pyspark.sql.types import StringType, DateType, StructType, DoubleType, IntegerType, LongType, TimestampType
from pyspark.sql.window import Window
#
# Nome da aplicação Spark
#
APP_NAME="GerarBronze"
ps.options.display.max_rows = 10

In [3]:
%run StartSpark.ipynb

In [4]:
%config SqlMagic.lazy_execution = True

In [5]:
%sql spark

# FK

In [6]:
from urllib.parse import urlparse

def delta_exists(delta_path, tier, table):
    url = urlparse(delta_path)

    match url:
        case "abfss":
            print("Blob Storage")
        case "s3":
                pass
        case "s3a":
            print("S3 Compatible")
    

In [7]:
from urllib.parse import urlparse

def delta_exists(delta_path, tier, table):
  """
  Checks if the provided delta path points to a supported storage type.

  Args:
      delta_path (str): The URL or path to the delta data.
      tier (str): Optional tier information (may not be used).
      table (str): Optional table name (may not be used).

  Returns:
      str: A string indicating the storage type ("Blob Storage" or "S3 Compatible")
          or None if the storage type is not supported.
  """

  url = urlparse(delta_path)
  match url.scheme:
      case "abfss":
          return "Blob Storage"
      case "s3" | "s3a":  # Combine S3 and S3A cases for efficiency
          return "S3 Compatible"
      case _:
          return None  # Return None for unsupported schemes

  # Unreachable code, but included for clarity
  # return None  # Redundant return statement here

In [8]:
def table_path(bucket:str, tier:str, table_name:str, storage:str="s3a", base_dir:str="lakehouse"):
    # if len(tier.strip()) == 0:
    #     raise ValueError("Tier cannot be empty")

    path = f"{storage}://{bucket}/{base_dir}/{tier}/{table_name}"
    return path, path + "/_checkpoint/"

In [9]:
stock_bronze, stock_bronze_checkpoint_dir = table_path("nemesys-demo1", "bronze", "stocks_intraday")
stock_silver, stock_silver_checkpoint_dir = table_path("nemesys-demo1", "silver", "stocks_intraday")
stock_dup_silver = stock_silver + "_dup"
stock_dup_silver_checkpoint_dir = stock_dup_silver + "/_checkpoint/"

In [10]:
print(stock_bronze, stock_bronze_checkpoint_dir)
print(stock_silver, stock_silver_checkpoint_dir)
print(stock_dup_silver, stock_dup_silver_checkpoint_dir)

s3a://nemesys-demo1/lakehouse/bronze/stocks_intraday s3a://nemesys-demo1/lakehouse/bronze/stocks_intraday/_checkpoint/
s3a://nemesys-demo1/lakehouse/silver/stocks_intraday s3a://nemesys-demo1/lakehouse/silver/stocks_intraday/_checkpoint/
s3a://nemesys-demo1/lakehouse/silver/stocks_intraday_dup s3a://nemesys-demo1/lakehouse/silver/stocks_intraday_dup/_checkpoint/


In [11]:
# spark.sql(f"""
# create table stock_intraday (
#     ticker string,
#     timestamp string,
#     open double,
#     high double,
#     low double,
#     close double,
#     volume long
# )
# using delta location '{stock_dup_silver}'
# """)

In [12]:
if not DeltaTable.isDeltaTable(spark, stock_silver):
    print("Criar tabela")
    schema = (StructType()
        .add("ticker", StringType())
        .add('ano', IntegerType())
        .add("timestamp", TimestampType())
        .add("open", DoubleType())
        .add("high", DoubleType())
        .add("low", DoubleType())
        .add("close", DoubleType())
        .add("volume", LongType())
        .add("_capture_time_kafka", TimestampType())
        .add("_capture_time_bronze", TimestampType())
        .add("_capture_time_silver", TimestampType())
    )
    emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
    emptyDF.write.format('delta').mode('overwrite').partitionBy("ticker", "ano").save(stock_silver)

deltaTable = DeltaTable.forPath(spark, stock_silver)

Criar tabela


In [13]:
%%time
(spark
    .readStream
    .format("delta")
    .option('startingOffsets', 'earliest')
    .load(stock_bronze)
    .withColumn("_capture_time_silver", current_timestamp())
    .writeStream
    .format('delta')
    .outputMode('append')
    .option('mergeSchema', 'true')
    .option('checkpointLocation', stock_bronze_checkpoint_dir + "silver_dup")
    .trigger(once=True)
    .start(stock_dup_silver)
    .awaitTermination()
)

CPU times: user 21.2 ms, sys: 1.11 ms, total: 22.3 ms
Wall time: 35.4 s


In [14]:
def config_upsert(delta):
    def upsertToDelta(microbatchdf, batchId):
        print(f'Batch {batchId} com {microbatchdf.count()} linhas')
        # Verificar e remover duplicatas no microbatch
        microbatchdf_clean = microbatchdf.dropDuplicates(["ticker", "timestamp"])
        
        # Garantir que os campos estejam no mesmo formato
        microbatchdf_clean = microbatchdf_clean.withColumn("ticker", col("ticker").cast("string"))
        microbatchdf_clean = microbatchdf_clean.withColumn("timestamp", col("timestamp").cast("timestamp"))
        
        delta.alias("t").merge(
          microbatchdf_clean.alias("s"),
          "s.ticker = t.ticker and s.timestamp = t.timestamp") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
        print(f'Exportadas {microbatchdf_clean.count()} linhas')
        
    return upsertToDelta

In [15]:
%%time
(spark
    .readStream
    .format("delta")
    .option('startingOffsets', 'earliest')
    .load(stock_bronze)
    .withColumn('timestamp_real', to_timestamp("timestamp"))
    .withColumn('ano', date_format('timestamp', 'yyyy').cast(IntegerType()))
    .withColumn("_capture_time_silver", current_timestamp())
    .writeStream
    .format('delta')
    .foreachBatch(config_upsert(deltaTable))
    .outputMode('update')
    .option('checkpointLocation', stock_bronze_checkpoint_dir + "silver")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

Batch 0 com 4397528 linhas
Exportadas 11148 linhas
CPU times: user 26.2 ms, sys: 20.2 ms, total: 46.3 ms
Wall time: 56.5 s


## Avaliar processo

In [19]:
df = spark.read.format("delta").load(stock_silver)

In [20]:
%%time
print("Count.........:", df.count())

Count.........: 27790
CPU times: user 2.69 ms, sys: 817 µs, total: 3.51 ms
Wall time: 3.58 s


In [21]:
%%time
df.dropDuplicates(["ticker", "timestamp"]).count()

CPU times: user 5.63 ms, sys: 0 ns, total: 5.63 ms
Wall time: 2.83 s


27790

In [22]:
df.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- _capture_time_kafka: timestamp (nullable = true)
 |-- _capture_time_bronze: timestamp (nullable = true)
 |-- _capture_time_silver: timestamp (nullable = true)



In [23]:
%%time
df.where("timestamp > '2024-06-27T10:30:00-03:00'").sort("ticker","timestamp").pandas_api()

CPU times: user 15.2 ms, sys: 857 µs, total: 16.1 ms
Wall time: 65.7 ms


Unnamed: 0,ticker,ano,timestamp,open,high,low,close,volume,_capture_time_kafka,_capture_time_bronze,_capture_time_silver
0,AAPL,2024,2024-06-27 13:31:00,214.830002,215.035004,214.729996,214.820007,372083,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
1,AAPL,2024,2024-06-27 14:31:00,213.669998,213.804993,213.610001,213.789993,95609,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
2,AAPL,2024,2024-06-27 14:32:00,213.794998,213.850006,213.690002,213.850006,132342,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
3,AAPL,2024,2024-06-27 14:33:00,213.850006,213.939697,213.759995,213.789993,139063,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
4,AAPL,2024,2024-06-27 14:34:00,213.779999,213.783997,213.669998,213.710007,153720,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
5,AAPL,2024,2024-06-27 14:35:00,213.699997,213.699997,213.399994,213.399994,146203,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
6,AAPL,2024,2024-06-27 14:36:00,213.399994,213.710007,213.399994,213.679993,210017,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
7,AAPL,2024,2024-06-27 14:37:00,213.669998,213.690002,213.529999,213.580002,113738,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
8,AAPL,2024,2024-06-27 14:38:00,213.580002,213.710007,213.580002,213.703995,93405,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920
9,AAPL,2024,2024-06-27 14:39:00,213.705002,213.899994,213.695007,213.889999,111381,2024-06-27 23:59:05.720,2024-06-27 23:59:20.496,2024-06-27 23:59:32.920


In [34]:
%%time
df.where("timestamp > '2024-06-01T00:00:00-03:00'").withColumn("dia", date_format("timestamp", "yyyy-MM-dd")).groupBy("ticker", "dia").count().sort("ticker","dia").pandas_api()

CPU times: user 14.9 ms, sys: 620 µs, total: 15.5 ms
Wall time: 78.1 ms


Unnamed: 0,ticker,dia,count
0,AAPL,2024-06-13,390
1,AAPL,2024-06-14,388
2,AAPL,2024-06-17,390
3,AAPL,2024-06-18,387
4,AAPL,2024-06-20,386
5,AAPL,2024-06-21,76
6,AAPL,2024-06-24,374
7,AAPL,2024-06-25,381
8,AAPL,2024-06-26,388
9,AAPL,2024-06-27,904


# Otimizar Camada Bronze

In [39]:
deltaTable = DeltaTable.forPath(spark, stock_silver)
df = deltaTable.optimize().executeCompaction()

In [40]:
df.show()

+--------------------+--------------------+
|                path|             metrics|
+--------------------+--------------------+
|s3a://nemesys-dem...|{6, 36, {93853, 1...|
+--------------------+--------------------+



In [41]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "False")
deltaTable.vacuum(0)

DataFrame[]