In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
bronze_path="abfss://bronze@cryptostoragesy01.dfs.core.windows.net/"
silver_path="abfss://silver@cryptostoragesy01.dfs.core.windows.net/silver_crypto"
schema_path="abfss://schema@cryptostoragesy01.dfs.core.windows.net/"
checkpoint_path="abfss://silver@cryptostoragesy01.dfs.core.windows.net/_checkpoints/crypto_market"

# Using Autoloader

In [0]:
spark.read\
  .format('json')\
  .option("multiline", True)\
  .load(bronze_path)

In [0]:
df = spark.readStream\
    .format("cloudFiles")\
    .option("cloudFiles.format", "json")\
    .option("cloudFiles.schemaLocation", schema_path)\
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")\
    .option("cloudFiles.schemaHints", 
            """
            roi STRUCT<times:DOUBLE, currency:STRING, percentage:DOUBLE>,
            ath STRING,
            ath_change_percentage STRING,
            ath_date STRING,
            atl STRING,
            atl_change_percentage STRING,
            atl_date STRING,
            circulating_supply STRING,
            current_price STRING,
            high_24h STRING,
            id STRING,
            last_updated STRING,
            low_24h STRING,
            market_cap STRING,
            market_cap_change_24h STRING,
            market_cap_change_percentage_24h STRING,
            market_cap_rank STRING,
            max_supply STRING,
            name STRING,
            price_change_24h STRING,
            price_change_percentage_24h STRING,
            symbol STRING,
            total_supply STRING,
            total_volume STRING
            """)\
    .load(bronze_path)\
    .withColumn("ingested_time", current_timestamp())

## Transforming our data using Pyspark

In [0]:
df_parsed = df\
    .withColumn('roi_time', col('roi.times'))\
    .withColumn('roi_currency', col('roi.currency'))\
    .withColumn('roi_percentage', col('roi.percentage'))\
    .drop('roi')

In [0]:
from pyspark.sql.functions import regexp_replace, to_timestamp, to_date

In [0]:
df_transformed = df_parsed\
    .withColumn("last_updated_clean", regexp_replace(col("last_updated"), "(Z|\\+00:00)$", ""))\
    .withColumn("last_updated_ts",
        when(col("last_updated_clean").isNotNull(),
            to_timestamp(col("last_updated_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        ).otherwise(current_timestamp())
    )\
    .drop("last_updated_clean", "last_updated")\
    .withColumn("date_partition", to_date(col("last_updated_ts")))\
    .withColumn("ath_date_clean", regexp_replace(col("ath_date"), "(Z|\\+00:00)$", ""))\
    .withColumn("ath_date_ts",
        when(col("ath_date_clean").isNotNull(),
            to_timestamp(col("ath_date_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        )
    )\
    .drop("ath_date_clean", "ath_date")\
    .withColumn("atl_date_clean", regexp_replace(col("atl_date"), "(Z|\\+00:00)$", ""))\
    .withColumn("atl_date_ts",
        when(col("atl_date_clean").isNotNull(),
            to_timestamp(col("atl_date_clean"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
        )
    )\
    .drop("atl_date_clean", "atl_date")
    
                   

In [0]:
df_metrics = df_transformed\
    .withColumn("market_cap_double", col("market_cap").cast(DoubleType()))\
    .withColumn("total_volume_double", col("total_volume").cast(DoubleType()))\
    .withColumn("market_cap_billions",
        when(col("market_cap_double").isNotNull(), col("market_cap_double") / 1_000_000_000.0)
        .otherwise(0.0)
    )\
    .withColumn("volume_to_market_cap_ratio", 
        when(
            (col("total_volume_double").isNotNull()) &
            (col("market_cap_double").isNotNull()) &
            (col("market_cap_double") > 0),
            col("total_volume_double") / col("market_cap_double")
        ).otherwise(0.0)
    )

In [0]:
df_final = df_metrics\
    .filter(
        col("symbol").isNotNull() & 
        col("last_updated_ts").isNotNull() &
        col("date_partition").isNotNull() &
        col("market_cap_billions").isNotNull() &
        col("id").isNotNull()
    )\
    .dropDuplicates(["id", "last_updated_ts"])

## Writing to our silver layer

In [0]:
query = df_final.writeStream\
    .format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", checkpoint_path)\
    .trigger(once=True)\
    .start(silver_path)