In [5]:
from pyspark.sql import SparkSession

## Bronze: Ingesting Raw Data

In [6]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("../data/bronze/crypto_trades")
print("rows:", df.count())
df.orderBy("offset", ascending=False).show(10, truncate=False)

rows: 515
+-------------+---------+------+-----------------------+-------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic        |partition|offset|ts_kafka               |ts_type|key     |value_raw                                                                                                                                                          |
+-------------+---------+------+-----------------------+-------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|crypto.trades|0        |514   |2025-09-07 21:34:51.809|0      |XBT/USDT|{"exchange":"kraken","symbol":"XBT/USDT","price":111188.7,"size":0.00068434,"side":"sell","order_type":"market","ts_event":1757306091722,"ts_ingest":1757306091807}|
|crypto.trades|0        |513   |2025-0

## Silver: Processing Raw Data

In [7]:
sdf = spark.read.parquet("../data/silver/trades")
print("rows:", sdf.count())
sdf.select("symbol","price","size","side","event_time","ingest_time")\
   .orderBy("event_time", ascending=False).show(10, False)


rows: 513
+--------+--------+----------+----+-----------------------+-----------------------+
|symbol  |price   |size      |side|event_time             |ingest_time            |
+--------+--------+----------+----+-----------------------+-----------------------+
|XBT/USDT|111188.7|6.8434E-4 |sell|2025-09-07 21:34:51.722|2025-09-07 21:34:51.807|
|XBT/USDT|111139.5|0.00360344|sell|2025-09-07 21:31:19.493|2025-09-07 21:31:19.576|
|XBT/USDT|111127.9|8.999E-5  |buy |2025-09-07 21:30:55.83 |2025-09-07 21:30:55.935|
|XBT/USDT|111116.4|0.00449946|buy |2025-09-07 21:27:59.02 |2025-09-07 21:27:59.14 |
|XBT/USDT|111103.3|3.2434E-4 |sell|2025-09-07 21:23:49.743|2025-09-07 21:23:50.02 |
|XBT/USDT|111067.5|7.199E-5  |buy |2025-09-07 21:17:23.868|2025-09-07 21:17:23.986|
|XBT/USDT|111067.5|0.01823581|buy |2025-09-07 21:17:23.103|2025-09-07 21:17:23.176|
|XBT/USDT|111067.5|0.02683319|buy |2025-09-07 21:17:23.103|2025-09-07 21:17:23.175|
|XBT/USDT|111067.4|9.0124E-4 |sell|2025-09-07 21:16:50.429|2025-09

## Gold: Aggregating to 1 minute Intervals

In [8]:
g = spark.read.parquet("../data/gold/bars_1m")
print(f'rows: {g.count()}')
g.orderBy("bar_start", ascending=False).select("symbol","bar_start","open","high","low","close","volume","vwap","trades").show(10, False)


rows: 173
+--------+-------------------+--------+--------+--------+--------+----------+------------------+------+
|symbol  |bar_start          |open    |high    |low     |close   |volume    |vwap              |trades|
+--------+-------------------+--------+--------+--------+--------+----------+------------------+------+
|XBT/USDT|2025-09-07 21:31:00|111139.5|111139.5|111139.5|111139.5|0.00360344|111139.5          |1     |
|XBT/USDT|2025-09-07 21:30:00|111127.9|111127.9|111127.9|111127.9|8.999E-5  |111127.9          |1     |
|XBT/USDT|2025-09-07 21:27:00|111116.4|111116.4|111116.4|111116.4|0.00449946|111116.4          |1     |
|XBT/USDT|2025-09-07 21:23:00|111103.3|111103.3|111103.3|111103.3|3.2434E-4 |111103.30000000002|1     |
|XBT/USDT|2025-09-07 21:17:00|111067.5|111067.5|111067.5|111067.5|0.04514099|111067.50000000003|3     |
|XBT/USDT|2025-09-07 21:16:00|111067.4|111067.4|111067.4|111067.4|9.0124E-4 |111067.4          |1     |
|XBT/USDT|2025-09-07 21:15:00|111067.5|111067.5|111067