In [0]:
from pyspark.sql.functions import col, expr, from_unixtime, window, min, max
import plotly.graph_objs as go

table = "acm_1.default.glbx_mdp_3_20250224_mbp_1"
selected = {
    'ZFH5': 42072515,
    'ZNH5': 42002219,
    'ZTH5': 42111442,
    'SR1H5': 42177485,
    'SR3H5': 254257
}
selection = {v: k for k, v in selected.items()}

In [0]:
instrument = dbutils.widgets.get("instrument")

instr_id = selected.get(instrument)
if instr_id is None:
    raise ValueError(f"Instrument '{instrument}' not found in the selected dictionary")

df0 = spark.table(table)
df = df0.filter(col('instrument_id') == instr_id)
df2 = df.select(
    [c for c in df.columns if c in ['instrument_id', 'ts_event', 'action', 'side', 'price', 'bid_px_00', 'ask_px_00']]
)
case_expr = "CASE `instrument_id` " + " ".join(
    [f"WHEN {k} THEN '{v}'" for k, v in selected.items()]
) + " END"
df2 = df2.withColumn("event_time", from_unixtime(col('ts_event') / 1E9))

In [0]:
frequency = dbutils.widgets.get("resample_freq")

df2_grouped = df2.withColumn(
    "bid_px_div", col("bid_px_00") / 1E9
).withColumn(
    "ask_px_div", col("ask_px_00") / 1E9
).groupBy(
    window(col("event_time"), frequency), col("instrument_id")
).agg(
    min("bid_px_div").alias("min_bid_px"),
    max("ask_px_div").alias("max_ask_px")
).withColumn(
    "window_end", col("window").getField("end")
)

In [0]:
df_pandas = df2_grouped.filter(col("instrument_id") == instr_id).select("window_end", "min_bid_px", "max_ask_px").toPandas()

fig = go.Figure()

fig.add_trace(go.Scatter(x=df_pandas['window_end'], y=df_pandas['min_bid_px'], name='Min Bid Price', mode='markers'))
fig.add_trace(go.Scatter(x=df_pandas['window_end'], y=df_pandas['max_ask_px'], name='Max Ask Price', mode='markers'))

fig.update_layout(
    title=f'{instrument}: average Bid and Ask Prices, resampled at {frequency}',
    xaxis_title='Window End Time',
    yaxis_title='Price',
    legend_title='Price Type'
)

In [0]:
import plotly.tools as tls
import json
import plotly.utils

# Write the Plotly figure to JSON
fig_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)

# Store the JSON in Databricks File Storage
path_to_save = "/tmp/fig1.json"
dbutils.fs.put(path_to_save, fig_json, overwrite=True)

Wrote 105013 bytes.


True