In [None]:
# Set the Datalake Access Key configuration
keyvault_name = "cryptoanalyticssynapse"
secret_name = "cryptoanalyticslake"
linked_service_name = "keyvault"

adls_token = mssparkutils.credentials.getSecret(keyvault_name, secret_name, linked_service_name)

spark.conf.set("fs.azure.account.key.cryptoanalyticslake.dfs.core.windows.net",adls_token)

In [None]:
# Set Day Month Year
from datetime import datetime, timedelta

today = datetime.utcnow()
year = today.year
month = today.month
day = today.day

In [None]:
# Recursive data load for all files from a day from every partition in the Event Hub Namespace
sourcefolderpath = f"abfss://crypto-quotes@cryptoanalyticslake.dfs.core.windows.net/ehns-quote-streams/eh-crypto-stream/*/{year}/{month:0>2d}/{day:0>2d}"

print(sourcefolderpath)

df = spark.read.option("recursiveFileLookup","true").option("header","true").format("avro").load(sourcefolderpath)

display(df)

df.printSchema()

In [None]:
# Change the Body field from Binary to JSON 
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, DoubleType, StructType, StructField

sourceSchema = StructType([
        StructField("Symbol", StringType(), False),
        StructField("Price", DoubleType(), True),
        StructField("PriceTimeStamp", StringType(), True)])

df = df.withColumn("StringBody", col("Body").cast("string"))
jsonOptions = {"dateFormat" : "yyyy-MM-dd HH:mm:ss.SSS"}
df = df.withColumn("JsonBody", from_json(df.StringBody, sourceSchema, jsonOptions))

display(df)

In [None]:
# Flattent he Body JSON field into columns of the DataFrame
for c in df.schema["JsonBody"].dataType:
    df = df.withColumn(c.name, col("JsonBody." + c.name))

In [None]:
# Remove 0 priced assets
df = df.filter("Price > 0")

In [None]:
# Sort the data
df = df.sort("Symbol", "PriceTimeStamp")

In [None]:
display(df)

In [None]:
# Select only the meaningful columns for the export to Bronze data zone
exportDF = df.select("Symbol", "Price", "PriceTimeStamp")

In [None]:
# Write the partquet file in the bronze crypto data zone
destinationfolderpath = f"abfss://crypto-bronze@cryptoanalyticslake.dfs.core.windows.net/quotes-by-day/{year}/{month:0>2d}/{day:0>2d}"

print(destinationfolderpath)

exportDF.write.mode("overwrite").parquet(destinationfolderpath)