In [39]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
spark

## Verify that Spark Connect is setup correctly
Executing a hello world of dataframes

In [41]:
from datetime import datetime, date
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 01:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 01:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 01:00:00|
+---+---+-------+----------+-------------------+



## Hello world - QuestDB Connect
From the [QuestDB Spark Connector](https://questdb.io/docs/third-party-tools/spark/) documentation, we can see that the connector is available in the Maven Central Repository.

Note that for the spark JDBC connection to work, the following --packages org.postgresql:postgresql:42.5.1 was added to the spark stack

In [42]:
# load 1-minute aggregated trade data into the dataframe
df = (
    spark.read.format("jdbc")
    .option("url", "jdbc:postgresql://host.docker.internal:8812/questdb")
    .option("driver", "org.postgresql.Driver")
    .option("user", "admin")
    .option("password", "quest")
    .option(
        "dbtable",
        "(SELECT symbol, sum(amount) as volume, "
        "round((max(price)+min(price))/2, 2) as mid, "
        "timestamp as ts "
        "FROM trades WHERE symbol = 'BTC-USD' "
        "SAMPLE BY 1m ALIGN to CALENDAR) AS mid_prices",
    )
    .option("partitionColumn", "ts")
    .option("numPartitions", "3")
    .option("lowerBound", "2023-03-01T00:00:00.000000Z")
    .option("upperBound", "2023-03-04T00:00:00.000000Z")
    .load()
)
df.show()

+-------+------------------+------------------+-------------------+
| symbol|            volume|               mid|                 ts|
+-------+------------------+------------------+-------------------+
|BTC-USD|2.1604038145086886|           5947.66|2023-03-01 00:00:00|
|BTC-USD|3.4010091992904536|            5237.6|2023-03-01 00:01:00|
|BTC-USD| 3.097252125417988|            6417.6|2023-03-01 00:02:00|
|BTC-USD| 3.321120621981665|4929.6900000000005|2023-03-01 00:03:00|
|BTC-USD|2.8235243777240364|           6584.03|2023-03-01 00:04:00|
|BTC-USD| 4.632750296707132|           5453.74|2023-03-01 00:05:00|
|BTC-USD|3.8039631314824702|           5122.54|2023-03-01 00:06:00|
|BTC-USD|3.2126240899145113|           7673.62|2023-03-01 00:07:00|
|BTC-USD|1.8925783094879787|           3867.31|2023-03-01 00:08:00|
|BTC-USD|3.2591621183734603|           4817.47|2023-03-01 00:09:00|
|BTC-USD|2.2492996767206943|           4622.86|2023-03-01 00:10:00|
|BTC-USD|1.4936611049063955|           5240.25|2

In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, stddev, when

# extract new features, clean data
window_10 = Window.partitionBy(df.symbol).rowsBetween(-10, Window.currentRow)
df = df.withColumn("ma10", avg(df.mid).over(window_10))
df = df.withColumn("std", stddev(df.mid).over(window_10))
df = df.withColumn("std", when(df.std.isNull(), 0.0).otherwise(df.std))

# save the data as 'trades_enriched', overwrite if already exists
(
    df.write.format("jdbc")
    .option("url", "jdbc:postgresql://host.docker.internal:8812/questdb")
    .option("driver", "org.postgresql.Driver")
    .option("user", "admin")
    .option("password", "quest")
    .option("dbtable", "trades_enriched")
    .option("truncate", True)
    .option(
        "createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE"
    )
    .save(mode="overwrite")
)

In [44]:
# load 1-minute aggregated trade data into the dataframe
df = (
    spark.read.format("jdbc")
    .option("url", "jdbc:postgresql://host.docker.internal:8812/questdb")
    .option("driver", "org.postgresql.Driver")
    .option("user", "admin")
    .option("password", "quest")
    .option("query", "SELECT * FROM trades_enriched")
    .load()
)
df.show()

+-------+------------------+------------------+-------------------+-----------------+------------------+
| symbol|            volume|               mid|                 ts|             ma10|               std|
+-------+------------------+------------------+-------------------+-----------------+------------------+
|BTC-USD|2.1604038145086886|           5947.66|2023-03-01 00:00:00|          5947.66|               0.0|
|BTC-USD|3.4010091992904536|            5237.6|2023-03-01 00:01:00|          5592.63|502.08824104931955|
|BTC-USD| 3.097252125417988|            6417.6|2023-03-01 00:02:00|          5867.62| 594.0579106450818|
|BTC-USD| 3.321120621981665|4929.6900000000005|2023-03-01 00:03:00|5633.137500000001| 674.6836582367075|
|BTC-USD|2.8235243777240364|           6584.03|2023-03-01 00:04:00|5823.316000000001| 722.6602509685998|
|BTC-USD| 4.632750296707132|           5453.74|2023-03-01 00:05:00|          5761.72|  663.742927615805|
|BTC-USD|3.8039631314824702|           5122.54|2023-03-

## Spark Streaming - QuestDB
The following code will create a streaming dataframe from the rate source, and write the data to QuestDB.

JDBC connections aren't supported by writestream API, so we'll use the foreachBatch method to write the data to QuestDB.

To avoid concurrent write issues and lock contention, we'll repartition dataframe to 1 partition.



In [51]:
from pyspark.sql.functions import expr
from pyspark.sql.utils import AnalysisException

# Create a streaming DataFrame with rate source
streamingDF = spark.readStream.format("rate").option("rowsPerSecond", "10000").load()

# Perform transformations on the streaming data
transformedDF = streamingDF.selectExpr("value AS id", "timestamp AS event_time")
TABLE_NAME = "spark_rate"
SPARK_WAREHOUSE_BASE_CHECKPOINT = (
    f"/opt/bitnami/spark/spark-warehouse/checkpoint/{TABLE_NAME}"
)



In [52]:
# Write the transformed stream to QuestDB using the JDBC format
# def write_to_questdb(batch_df, batch_id):
def write_to_questdb(batch_df, batch_id):
    _ = (
        batch_df
        .repartition(1)
        .write.format("jdbc")
        .option("url", "jdbc:postgresql://host.docker.internal:8812/questdb")
        .option("driver", "org.postgresql.Driver")
        .option("user", "admin")
        .option("password", "quest")
        .option("dbtable", TABLE_NAME)
        .option("truncate", True)
        .save(mode="append")
    )


query = (
    transformedDF.writeStream.trigger(processingTime="1 seconds")
    .foreachBatch(write_to_questdb)
    .option("checkpointLocation", SPARK_WAREHOUSE_BASE_CHECKPOINT)
    .start()
)
import time

# Sleep for 10 seconds
time.sleep(30)

# Stop the streaming query
query.stop()