In [0]:
access_key = dbutils.secrets.get(scope="aws-creds", key="access-key")
secret_key = dbutils.secrets.get(scope="aws-creds", key="secret-key")

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

In [0]:

try:
    spark.sql("create schema local_stream_s3.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema local_stream_s3.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema local_stream_s3.gold;")
except:
    print('check if gold schema already exists')
     

In [0]:
%python
s3_path = "s3a://localstreams3/"
schema_location = "dbfs:/local_stream_s3/bronze_schema"

bronze_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", schema_location)
    .load(s3_path)
)


In [0]:
bronze_df.writeStream\
    .option("checkpointLocation", "/mnt/local_stream_s3/bronze/transactions")\
    .outputMode("append")\
    .format("delta")\
    .toTable("local_stream_s3.bronze.transactions")

In [0]:
from pyspark.sql.types import *

silver_schema = StructType([
    StructField("amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("username", StringType(), True)
])

silver_df = (
    spark.readStream
    .table("local_stream_s3.bronze.transactions")
    .selectExpr(
        "CAST(amount AS DOUBLE) as amount",
        "CAST(currency AS STRING) as currency",
        "CAST(username AS STRING) as username"
    )
    
)

silver_df.writeStream \
    .option("checkpointLocation", "/mnt/local_stream_s3/silver/cls_transactions") \
    .outputMode("append") \
    .format("delta") \
    .toTable("local_stream_s3.silver.cls_transactions")

In [0]:
from pyspark.sql.functions import sum, count, window

gold_df = (
    spark.readStream
    .table("local_stream_s3.silver.cls_transactions")
    .groupBy(
        
        "currency"
    )
    .agg(
        sum("amount").alias("total_amount"),
        count("username").alias("total_users")
    )
)

gold_df.writeStream \
    .option("checkpointLocation", "/mnt/local_stream_s3/gold/agg_transactions") \
    .format('memory') \
    .outputMode("complete") \
    .format("delta") \
    .toTable("local_stream_s3.gold.agg_transactions")

In [0]:
%sql
select * from local_stream_s3.gold.agg_transactions

In [0]:
# ...1379429
# ...1644559