In [0]:
spark

### Import Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Declare the path
bronze_path = "abfss://bronze@joecryptostorage01.dfs.core.windows.net/"
silver_path = "abfss://silver@joecryptostorage01.dfs.core.windows.net/silver_crypto"
schema_path = "abfss://schema@joecryptostorage01.dfs.core.windows.net/"

### Auto Loader Setup

In [0]:
df_crypto = spark.readStream\
    .format("cloudFiles")\
    .option("cloudFiles.format", "json")\
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")\
    .option("cloudFiles.schemaLocation", schema_path)\
    .option("cloudFiles.schemaHints", 
            """
            roi STRUCT<times:DOUBLE, currency:STRING, percentage:DOUBLE>,
            ath STRING,
            ath_change_percentage STRING,
            ath_date STRING,
            atl STRING,
            atl_change_percentage STRING,
            atl_date STRING,
            circulating_supply STRING,
            current_price STRING,
            high_24h STRING,
            id STRING,
            last_updated STRING,
            low_24h STRING,
            market_cap STRING,
            market_cap_change_24h STRING,
            market_cap_change_percentage_24h STRING,
            market_cap_rank STRING,
            max_supply STRING,
            name STRING,
            price_change_24h STRING,
            price_change_percentage_24h STRING,
            symbol STRING,
            total_supply STRING,
            total_volume STRING
            """) \
    .load(bronze_path)\
    .withColumn("ingested_time", current_timestamp())

In [0]:
df_crypto.printSchema()

### Transforming Data using Pyspark

In [0]:
# Creates a new column i.e roi_time by extracting the times field from the roi struct column.
df_parsed = df_crypto\
  .withColumn("roi_time", col("roi.times"))\
  .withColumn("roi_currency", col("roi.currency"))\
  .withColumn("roi_percentage", col("roi.percentage"))\
  .drop('roi')

In [0]:
df_parsed.printSchema()

In [0]:
df_transformed3.display()
# df_parsed.show(n=10, truncate=False) #-to display few rows

In [0]:
#Replacing the Z or +00:00 with empty string and converting the last_updated column to timestamp
from pyspark.sql.functions import regexp_replace, to_timestamp, to_date, when
df_transformed1 = df_parsed \
    .withColumn("last_updated_clean", regexp_replace(col("last_updated"), "(Z|\\+00:00)$", "")) \
    .withColumn(
        "last_updated_ts",
        when(
            col("last_updated_clean").isNotNull(),
            to_timestamp(col("last_updated_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        )
        .otherwise(current_timestamp())
    )\
    .withColumn("date_partition", to_date(col("last_updated_ts"))) \
    .drop("last_updated", "last_updated_clean")
                

In [0]:

df_transformed2 = df_transformed1 \
    .withColumn("atl_date_clean", regexp_replace(col("atl_date"), "(Z|\\+00:00)$", "")) \
    .withColumn(
        "atl_date_ts",
        when(
            col("atl_date_clean").isNotNull(),
            to_timestamp(col("atl_date_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        )
        .otherwise(current_timestamp())
    )\
    .withColumn("atl_date_partition", to_date(col("atl_date_ts"))) \
    .drop("atl_date", "atl_date_clean")


df_transformed3 = df_transformed2 \
    .withColumn("ath_date_clean", regexp_replace(col("ath_date"), "(Z|\\+00:00)$", "")) \
    .withColumn(
        "ath_date_ts",
        when(
            col("ath_date_clean").isNotNull(),
            to_timestamp(col("ath_date_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        )
        .otherwise(current_timestamp())
    )\
    .withColumn("ath_date_partition", to_date(col("ath_date_ts"))) \
    .drop("ath_date", "ath_date_clean")




In [0]:
df_transformed3.printSchema()

In [0]:
# convert the market_cap and total_volume columns to double data types.
# create a new column named "market_cap_billions" that converts the market_cap column to billions.
# create a calculated column named "volume_to_market_cap_ratio" that calculates the ratio of total_volume to market_cap.
df_metrics = df_transformed3 \
    .withColumn("market_cap_double", col("market_cap").cast("double")) \
    .withColumn("total_volume_double", col("total_volume").cast("double")) \
    .withColumn("market_cap_billions", 
        when(col("market_cap_double").isNotNull(), col("market_cap_double") / 1000000000.0)
        .otherwise(0.0)
    ) \
    .withColumn("volume_to_market_cap_ratio",
        when(
            (col("total_volume_double").isNotNull()) & 
            (col("market_cap_double").isNotNull()) & 
            (col("market_cap_double") > 0),
            col("total_volume_double") / col("market_cap_double")
        ).otherwise(0.0)
    )

In [0]:
df_metrics.printSchema()

In [0]:
#Starts with a DataFrame called df_metrics.
#Applies a filter to remove rows where any of the specified columns have null values.
#The result is stored in a new DataFrame called df_final.
df_final = df_metrics \
    .filter(
    col("symbol").isNotNull() & 
    col("last_updated_ts").isNotNull() &
    col("date_partition").isNotNull() &
    col("market_cap_billions").isNotNull() &
    col('id').isNotNull()
    )\
    .dropDuplicates(['id', 'last_updated_ts'])

In [0]:
df_final.printSchema()

In [0]:
%sql
SHOW EXTERNAL LOCATIONS;

### Writing to Silver Layer

In [0]:
#Start writing df_final as a structured streaming output.
#Use Delta Lake format for output (ACID transactions, versioning, schema evolution).
#Append only new data to the destination (no updates or overwrites).
#Define a checkpoint directory to store the stream’s state → needed for fault tolerance and exactly-once processing.
#Process all available data in one go, then stop the stream. (A common pattern for micro-batch or scheduled jobs).
#Start writing to the location specified by silver_path.
cryptodataout = df_final.writeStream \
    .format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@joecryptostorage01.dfs.core.windows.net/_checkpoints/crypto_market")\
    .trigger(once=True)\
    .start(silver_path)