In [2]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
# import pyspark
# from delta import *

# spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0").getOrCreate()

In [4]:
data = spark.range(0, 5)
data.write.format("delta").mode("append").save("delta_table1")

24/03/11 21:53:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [5]:
df = spark.read.format("delta").load("delta_table1")
df.show()

+---+
| id|
+---+
|  2|
|  3|
|  1|
|  4|
|  3|
|  1|
|  1|
|  2|
|  4|
|  0|
|  1|
|  3|
|  3|
|  2|
|  4|
|  0|
|  2|
|  0|
|  4|
|  0|
+---+


In [2]:
# Create a DataFrame that reads from the input Kafka topic name src-topic
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("subscribe", "real_time_data_options").option("startingOffsets", "latest").load()

In [4]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, StructType

# Define the schema of your JSON data
schema = StructType([
    StructField("jsonrpc", StringType()),
    StructField("method", StringType()),
    StructField("params", StructType([
        StructField("channel", StringType()),
        StructField("data", StructType([
            StructField("funding_8h", DoubleType()),
            StructField("current_funding", DoubleType()),
            StructField("estimated_delivery_price", DoubleType()),
            StructField("best_bid_amount", DoubleType()),
            StructField("best_ask_amount", DoubleType()),
            StructField("best_bid_price", DoubleType()),
            StructField("best_ask_price", DoubleType()),
            StructField("interest_value", DoubleType()),
            StructField("open_interest", LongType()),
            StructField("max_price", DoubleType()),
            StructField("min_price", DoubleType()),
            StructField("last_price", DoubleType()),
            StructField("settlement_price", DoubleType()),
            StructField("instrument_name", StringType()),
            StructField("mark_price", DoubleType()),
            StructField("index_price", DoubleType()),
            StructField("stats", StructType([
                StructField("volume_notional", DoubleType()),
                StructField("volume_usd", DoubleType()),
                StructField("volume", DoubleType()),
                StructField("price_change", DoubleType()),
                StructField("low", DoubleType()),
                StructField("high", DoubleType()),
            ])),
            StructField("state", StringType()),
            StructField("timestamp", LongType()),
        ])),
    ])),
])

parsed_df = df.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

In [5]:
from pyspark.sql.functions import col
from pyspark.sql.functions import col, from_unixtime, to_timestamp

flattened_df = parsed_df.select(
    col("parsed_value.jsonrpc"),
    col("parsed_value.method"),
    col("parsed_value.params.channel"),
    col("parsed_value.params.data.funding_8h").alias("funding_8h"),
    col("parsed_value.params.data.current_funding").alias("current_funding"),
    col("parsed_value.params.data.estimated_delivery_price").alias("estimated_delivery_price"),
    col("parsed_value.params.data.best_bid_amount").alias("best_bid_amount"),
    col("parsed_value.params.data.best_ask_amount").alias("best_ask_amount"),
    col("parsed_value.params.data.best_bid_price").alias("best_bid_price"),
    col("parsed_value.params.data.best_ask_price").alias("best_ask_price"),
    col("parsed_value.params.data.interest_value").alias("interest_value"),
    col("parsed_value.params.data.open_interest").alias("open_interest"),
    col("parsed_value.params.data.max_price").alias("max_price"),
    col("parsed_value.params.data.min_price").alias("min_price"),
    col("parsed_value.params.data.last_price").alias("last_price"),
    col("parsed_value.params.data.settlement_price").alias("settlement_price"),
    col("parsed_value.params.data.instrument_name").alias("instrument_name"),
    col("parsed_value.params.data.mark_price").alias("mark_price"),
    col("parsed_value.params.data.index_price").alias("index_price"),
    # col("parsed_value.params.data.stats.volume_notional").alias("volume_notional"),
    # col("parsed_value.params.data.stats.volume_usd").alias("volume_usd"),
    # col("parsed_value.params.data.stats.volume").alias("volume"),
    # col("parsed_value.params.data.stats.price_change").alias("price_change"),
    # col("parsed_value.params.data.stats.low").alias("low"),
    # col("parsed_value.params.data.stats.high").alias("high"),
    # col("parsed_value.params.data.state").alias("state"),
    col("parsed_value.params.data.timestamp").alias("timestamp"),
    to_timestamp(from_unixtime(col("parsed_value.params.data.timestamp") / 1000)).alias("datetimestamp")
)

In [18]:
# Collect and display a small batch of streaming data (for demonstration only)
import time
query = flattened_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("my_streaming_table") \
    .start()

time.sleep(10)  # Wait a few seconds for data to be processed

# Now you can query the in-memory table as a static DataFrame
display(spark.sql("SELECT * FROM my_streaming_table LIMIT 20").toPandas())

query.stop()  # Don't forget to stop the streaming query

Unnamed: 0,jsonrpc,method,channel,funding_8h,current_funding,estimated_delivery_price,best_bid_amount,best_ask_amount,best_bid_price,best_ask_price,...,open_interest,max_price,min_price,last_price,settlement_price,instrument_name,mark_price,index_price,timestamp,datetimestamp
0,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001512,67195.51,275580.0,289780.0,67319.0,67319.5,...,861293560,68320.42,66301.1,67319.0,66755.31,BTC-PERPETUAL,67313.94,67195.51,1709724220802,2024-03-06 11:23:40
1,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001512,67193.33,34320.0,216680.0,67318.5,67319.0,...,861110250,68318.31,66299.05,67318.5,66755.31,BTC-PERPETUAL,67311.74,67193.33,1709724221809,2024-03-06 11:23:41
2,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001506,67193.33,536520.0,57300.0,67304.5,67305.0,...,861058050,68318.17,66298.92,67305.0,66755.31,BTC-PERPETUAL,67311.29,67193.33,1709724222745,2024-03-06 11:23:42
3,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001511,67193.94,66840.0,22230.0,67315.0,67315.5,...,861029690,68319.06,66299.78,67312.5,66755.31,BTC-PERPETUAL,67312.24,67193.94,1709724223738,2024-03-06 11:23:43
4,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001515,67204.02,428530.0,25200.0,67328.0,67328.5,...,861122390,68329.55,66309.96,67328.0,66755.31,BTC-PERPETUAL,67322.64,67204.02,1709724224796,2024-03-06 11:23:44
5,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001521,67206.24,319380.0,168010.0,67330.5,67331.0,...,861208050,68332.1,66312.43,67330.5,66755.31,BTC-PERPETUAL,67325.24,67206.24,1709724225824,2024-03-06 11:23:45
6,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001523,67209.49,311330.0,198160.0,67330.5,67331.0,...,861239260,68335.57,66315.8,67331.0,66755.31,BTC-PERPETUAL,67328.63,67209.49,1709724226796,2024-03-06 11:23:46
7,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001524,67206.38,51070.0,372740.0,67330.5,67331.0,...,861238950,68332.54,66312.86,67330.5,66755.31,BTC-PERPETUAL,67325.59,67206.38,1709724227711,2024-03-06 11:23:47
8,2.0,subscription,ticker.BTC-PERPETUAL.agg2,0.000659,0.001526,67200.65,389400.0,141900.0,67318.5,67319.0,...,861195830,68326.9,66307.38,67318.5,66755.31,BTC-PERPETUAL,67320.0,67200.65,1709724228811,2024-03-06 11:23:48


In [19]:
df2 = spark.sql("SELECT * FROM my_streaming_table LIMIT 20")

In [20]:
type(df2)

pyspark.sql.dataframe.DataFrame

In [25]:
# Writing to a Delta table
df2.write.format("parquet").mode("overwrite").save("/home/jovyan/work/tmp/parquet_sql.parquet")