In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [12]:
spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

stream = spark.\
    readStream.\
    format("ws").\
    option("schema", "ticker").\
    load() # we need to pass `option("schema", "ticker")` to get correct channel subscribed

query = stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time").\
    writeStream.\
    format("console").\
    outputMode("append").\
    option("truncate", "false").\
    start()

query.awaitTermination(10) # Let's wait for 10 seconds.
query.stop() # Let's stop the query
stream.printSchema()
#spark.stop() # And stop the whole session

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)


Uruchamiająć `stream.start()` uruchamiamy w osobnym demonie websocket który streamuje wyniki. Jeżeli wystąpi jakiś błąd po stronie front-endu (np. błąd parsowania kolejnej linijki Pythona) fakt ten nie zostanie zgłoszony do sparka i socket pozostanie otwarty! Należy pamiętać, by zamykać stream za każdym razem używająć metody `stop()` (w powyższym przykładzie `query.stop()`). W przypadku utracenia referencji do zapytania, należy zastopować całą sesję również metodą `stop()` (w powyższym przykładzie `spark.stop()`) 

In [3]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

# query.stop()
# spark.stop()

# Zadanie 1

**Analiza strumienia danych CoinBase (3p)**. Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. `price`) w przesuwnych oknach czasowych względem czasu transakcji (kolumna `time`), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna `product_id`). 

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window

spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)

# group by time window of size 60 and 20second interval. The windows overlap
query = (
    stream
    .select("product_id", "price", "time")
    .groupBy(window("time", "60 seconds", "20 seconds").alias("time_window"), "product_id")
    .mean("price")
    .orderBy("product_id","time_window")
    .writeStream
    .format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(15)
query.stop()

Example output:

```
2024-11-14 20:30:13 
2024-11-14 20:30:14 -------------------------------------------
2024-11-14 20:30:14 Batch: 1
2024-11-14 20:30:14 -------------------------------------------
2024-11-14 20:30:14 +------------------------------------------+----------+----------+
2024-11-14 20:30:14 |time_window                               |product_id|avg(price)|
2024-11-14 20:30:14 +------------------------------------------+----------+----------+
2024-11-14 20:30:14 |{2024-11-14 19:29:20, 2024-11-14 19:30:20}|BTC-EUR   |84905.77  |
2024-11-14 20:30:14 |{2024-11-14 19:29:40, 2024-11-14 19:30:40}|BTC-EUR   |84905.77  |
2024-11-14 20:30:14 |{2024-11-14 19:30:00, 2024-11-14 19:31:00}|BTC-EUR   |84905.77  |
2024-11-14 20:30:14 |{2024-11-14 19:29:20, 2024-11-14 19:30:20}|BTC-USD   |89576.8   |
2024-11-14 20:30:14 |{2024-11-14 19:29:40, 2024-11-14 19:30:40}|BTC-USD   |89576.8   |
2024-11-14 20:30:14 |{2024-11-14 19:30:00, 2024-11-14 19:31:00}|BTC-USD   |89576.8   |
2024-11-14 20:30:14 |{2024-11-14 19:28:00, 2024-11-14 19:29:00}|ETH-BTC   |0.03508   |
2024-11-14 20:30:14 |{2024-11-14 19:28:20, 2024-11-14 19:29:20}|ETH-BTC   |0.03508   |
2024-11-14 20:30:14 |{2024-11-14 19:28:40, 2024-11-14 19:29:40}|ETH-BTC   |0.03508   |
2024-11-14 20:30:14 |{2024-11-14 19:29:20, 2024-11-14 19:30:20}|ETH-USD   |3141.12   |
2024-11-14 20:30:14 |{2024-11-14 19:29:40, 2024-11-14 19:30:40}|ETH-USD   |3141.12   |
2024-11-14 20:30:14 |{2024-11-14 19:30:00, 2024-11-14 19:31:00}|ETH-USD   |3141.12   |
2024-11-14 20:30:14 +------------------------------------------+-------
--+----------+

```

# Zadanie 2

**Watermarking i dane opóźnione (3p).** 
Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update), w szczególności aktualizacja wcześniejszych okien czasowych po przybyciu danych opóźnionych. **Do rozwiązania tego zadania proszę dołączyć przykładowy output i jego opis wyjaśniający na konkretnym przykładzie działanie znaku wodnego i danych opóźnionych**. 

Do ćwiczenia można wykorzystać skrypt w katalogu `/mock` napisany w [Scala-cli](https://scala-cli.virtuslab.org), który posłuży jako kontrolowane źródło danych CoinBase przez Websocket. 

Skrypt można uruchomić wykorzystując Docker:

```
make image
make run
```

Spowoduje to utworzenie websocketowego serwera pod adresem `ws://mock:8025`

Po uruchomieniu serwera należy wykonać poniższą komórkę, w której zapytanie czyta dane z utworzonego websocketa. Skrypt wysyła przykładowe wiadomości w formacie CoinBase co 10 sekund:

- W pierwszej serii wysyłane wiadomości o znacznikach czasowych 0s, 14s, 7s  
- W drugiej serii wysyłane są wiadomości o znacznikach czasowych 15s, 8s, 21s  
- W trzeciej serii wysyłane są wiadomości o znacznikach czasowych 4s, 17s  

Dla tych danych można ustawić okno czasowe na interwał 10 sekund. Skrypt można też zmodyfikować, tak aby wysyłał inne dane. 


In [25]:
from pyspark.sql import SparkSession


spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .option("url", "ws://mock:8025")
    .load()
)


query = (
    stream
    .select("product_id", "price", "time")
    .withWatermark("time", "10 seconds")
    .groupBy(window("time", "10 seconds", "2 seconds").alias("time_window"), "product_id").mean("price")
    .writeStream
    .format("console")
    .outputMode("update")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(60)
query.stop()

Received output:

```
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+----------+----------+
|time_window|product_id|avg(price)|
+-----------+----------+----------+
+-----------+----------+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----------+-----------------+
|time_window                               |product_id|avg(price)       |
+------------------------------------------+----------+-----------------+
|{2021-10-31 23:59:52, 2021-11-01 00:00:02}|ETH-USD   |992.7133912209973|
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |992.7133912209973|
|{2021-10-31 23:59:56, 2021-11-01 00:00:06}|ETH-USD   |992.7133912209973|
|{2021-10-31 23:59:54, 2021-11-01 00:00:04}|ETH-USD   |992.7133912209973|
|{2021-10-31 23:59:58, 2021-11-01 00:00:08}|ETH-USD   |992.7133912209973|
+------------------------------------------+----------+-----------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----------+------------------+
|time_window                               |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:06, 2021-11-01 00:00:16}|ETH-USD   |279.417360963366  |
|{2021-11-01 00:00:04, 2021-11-01 00:00:14}|ETH-USD   |312.68207468644704|
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |652.6977329537222 |
|{2021-11-01 00:00:08, 2021-11-01 00:00:18}|ETH-USD   |246.15264724028495|
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |246.15264724028495|
|{2021-11-01 00:00:12, 2021-11-01 00:00:22}|ETH-USD   |246.15264724028495|
|{2021-10-31 23:59:58, 2021-11-01 00:00:08}|ETH-USD   |652.6977329537222 |
|{2021-11-01 00:00:14, 2021-11-01 00:00:24}|ETH-USD   |246.15264724028495|
|{2021-11-01 00:00:02, 2021-11-01 00:00:12}|ETH-USD   |312.68207468644704|
+------------------------------------------+----------+------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+----------+----------+
|time_window|product_id|avg(price)|
+-----------+----------+----------+
+-----------+----------+----------+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+----------+------------------+
|time_window                               |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:06, 2021-11-01 00:00:16}|ETH-USD   |304.5743498820808 |
|{2021-11-01 00:00:20, 2021-11-01 00:00:30}|ETH-USD   |465.68713493590053|
|{2021-11-01 00:00:04, 2021-11-01 00:00:14}|ETH-USD   |284.8564308578284 |
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |520.8087509788846 |
|{2021-11-01 00:00:18, 2021-11-01 00:00:28}|ETH-USD   |465.68713493590053|
|{2021-11-01 00:00:08, 2021-11-01 00:00:18}|ETH-USD   |301.8717749472921 |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |324.2922689063333 |
|{2021-11-01 00:00:12, 2021-11-01 00:00:22}|ETH-USD   |371.423890916189  |
|{2021-11-01 00:00:16, 2021-11-01 00:00:26}|ETH-USD   |465.68713493590053|
|{2021-11-01 00:00:14, 2021-11-01 00:00:24}|ETH-USD   |371.423890916189  |
|{2021-11-01 00:00:02, 2021-11-01 00:00:12}|ETH-USD   |284.8564308578284 |
+------------------------------------------+----------+------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+----------+----------+
|time_window|product_id|avg(price)|
+-----------+----------+----------+
+-----------+----------+----------+

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+----------+------------------+
|time_window                               |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:04, 2021-11-01 00:00:14}|ETH-USD   |499.83149854513914|
|{2021-11-01 00:00:08, 2021-11-01 00:00:18}|ETH-USD   |331.4024007219104 |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |356.1929386194773 |
|{2021-11-01 00:00:12, 2021-11-01 00:00:22}|ETH-USD   |383.56648769858305|
|{2021-11-01 00:00:16, 2021-11-01 00:00:26}|ETH-USD   |442.8407064908329 |
|{2021-11-01 00:00:14, 2021-11-01 00:00:24}|ETH-USD   |383.56648769858305|
|{2021-11-01 00:00:02, 2021-11-01 00:00:12}|ETH-USD   |499.83149854513914|
+------------------------------------------+----------+------------------+-----------------------------+----------+-----------------+
```----------------------------+----------+------------------+

# Zadanie 3

**Łączenie strumieni (3p)**. Korzystając z łączenia strumieni połącz dane z kanału `ticker` (transakcje kupna `side="buy"`) razem z danymi o transakcjach napływających co sekundę `heartbeat` korzystając z `trade_id` i odpowiedniego id w kanale heartbeat. Wypisz połączony strumień danych.

Na moment tworzenia zadania 15.11.2023 kanał `heartbeat` zwraca błędne dane o dacie (np. `1970-01-04 13:53:57.645339`). Połączenie z kanałem `ticker` pozwala uzyskać poprawne informacje. Cóż za wspaniałe zastosowanie joina!


In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)



stream_ticker = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)

# type  |trade_id |sequence   |time                   |product_id|price   |side|last_size|best_bid|best_ask
buy = (
    stream_ticker
    .select("side", "trade_id", "time")
    .filter(stream_ticker.side == "buy")
)



stream_heartbeat = (
    spark
    .readStream
    .format("ws")
    .option("schema", "heartbeat")
    .load()
)

# |type     |sequence   |last_trade_id|product_id|time                      |
heartbeat = (
    stream_heartbeat
    .select("type", "sequence", "last_trade_id","product_id", "time")
    .withColumnRenamed("time", "invalid_time")
)



joined = (
    buy.join(
        heartbeat,
        expr(
            """
            trade_id = last_trade_id
            """
        )
    ).select("type", "sequence", "last_trade_id","product_id", "time")
)

query = (
    joined
    .writeStream
    .format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(30)
query.stop()

Example outputs:

```
2024-11-14 21:22:07 -------------------------------------------
2024-11-14 21:22:07 Batch: 0
2024-11-14 21:22:07 -------------------------------------------
2024-11-14 21:22:07 +----+--------+-------------+----------+----+
2024-11-14 21:22:07 |type|sequence|last_trade_id|product_id|time|
2024-11-14 21:22:07 +----+--------+-------------+----------+----+
2024-11-14 21:22:07 +----+--------+-------------+----------+----+
2024-11-14 21:22:07 
2024-11-14 21:22:09 -------------------------------------------
2024-11-14 21:22:09 Batch: 1
2024-11-14 21:22:09 -------------------------------------------
2024-11-14 21:22:09 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:09 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:09 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:09 |heartbeat|32476135688|85912304     |BTC-EUR   |2024-11-14 20:22:02.6  |
2024-11-14 21:22:09 |heartbeat|32476136055|85912304     |BTC-EUR   |2024-11-14 20:22:02.6  |
2024-11-14 21:22:09 |heartbeat|69728358993|567222266    |ETH-USD   |2024-11-14 20:22:04.977|
2024-11-14 21:22:09 |heartbeat|69728359593|567222266    |ETH-USD   |2024-11-14 20:22:04.977|
2024-11-14 21:22:09 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:09 
2024-11-14 21:22:11 -------------------------------------------
2024-11-14 21:22:11 Batch: 2
2024-11-14 21:22:11 -------------------------------------------
2024-11-14 21:22:11 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:11 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:11 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:11 |heartbeat|32476136508|85912306     |BTC-EUR   |2024-11-14 20:22:07.172|
2024-11-14 21:22:11 |heartbeat|69728360069|567222273    |ETH-USD   |2024-11-14 20:22:07.084|
2024-11-14 21:22:11 |heartbeat|69728362041|567222285    |ETH-USD   |2024-11-14 20:22:08.026|
2024-11-14 21:22:11 |heartbeat|91518086991|717972972    |BTC-USD   |2024-11-14 20:22:06.906|
2024-11-14 21:22:11 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:11 
2024-11-14 21:22:13 -------------------------------------------
2024-11-14 21:22:13 Batch: 3
2024-11-14 21:22:13 -------------------------------------------
2024-11-14 21:22:13 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:13 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:13 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:13 |heartbeat|69728365922|567222313    |ETH-USD   |2024-11-14 20:22:11.082|
2024-11-14 21:22:13 |heartbeat|32476138632|85912309     |BTC-EUR   |2024-11-14 20:22:10.113|
2024-11-14 21:22:13 |heartbeat|69728365196|567222308    |ETH-USD   |2024-11-14 20:22:09.982|
2024-11-14 21:22:13 |heartbeat|69728364120|567222306    |ETH-USD   |2024-11-14 20:22:09.058|
2024-11-14 21:22:13 |heartbeat|91518100327|717973020    |BTC-USD   |2024-11-14 20:22:10.38 |
2024-11-14 21:22:13 |heartbeat|91518098195|717973017    |BTC-USD   |2024-11-14 20:22:09.735|
2024-11-14 21:22:13 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:13 
2024-11-14 21:22:16 -------------------------------------------
2024-11-14 21:22:16 Batch: 4
2024-11-14 21:22:16 -------------------------------------------
2024-11-14 21:22:16 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:16 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:16 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:16 |heartbeat|32476138912|85912310     |BTC-EUR   |2024-11-14 20:22:10.543|
2024-11-14 21:22:16 |heartbeat|91518102986|717973026    |BTC-USD   |2024-11-14 20:22:11.334|
2024-11-14 21:22:16 |heartbeat|91518104315|717973032    |BTC-USD   |2024-11-14 20:22:12.701|
2024-11-14 21:22:16 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:16 
2024-11-14 21:22:18 -------------------------------------------
2024-11-14 21:22:18 Batch: 5
2024-11-14 21:22:18 -------------------------------------------
2024-11-14 21:22:18 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:18 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:18 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:18 |heartbeat|91518105098|717973036    |BTC-USD   |2024-11-14 20:22:13.576|
2024-11-14 21:22:18 |heartbeat|69728368938|567222340    |ETH-USD   |2024-11-14 20:22:14.845|
2024-11-14 21:22:18 |heartbeat|91518106739|717973055    |BTC-USD   |2024-11-14 20:22:14.879|
2024-11-14 21:22:18 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:18 
2024-11-14 21:22:20 -------------------------------------------
2024-11-14 21:22:20 Batch: 6
2024-11-14 21:22:20 -------------------------------------------
2024-11-14 21:22:20 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:20 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:20 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:20 |heartbeat|69728369654|567222348    |ETH-USD   |2024-11-14 20:22:15.911|
2024-11-14 21:22:20 |heartbeat|69728371970|567222354    |ETH-USD   |2024-11-14 20:22:16.807|
2024-11-14 21:22:20 |heartbeat|91518108346|717973060    |BTC-USD   |2024-11-14 20:22:15.395|
2024-11-14 21:22:20 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:20 
2024-11-14 21:22:22 -------------------------------------------
2024-11-14 21:22:22 Batch: 7
2024-11-14 21:22:22 -------------------------------------------
2024-11-14 21:22:22 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:22 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:22 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:22 |heartbeat|69728373830|567222355    |ETH-USD   |2024-11-14 20:22:17.836|
2024-11-14 21:22:22 |heartbeat|69728374635|567222355    |ETH-USD   |2024-11-14 20:22:17.836|
2024-11-14 21:22:22 |heartbeat|32476141254|85912318     |BTC-EUR   |2024-11-14 20:22:18.872|
2024-11-14 21:22:22 |heartbeat|69728376552|567222361    |ETH-USD   |2024-11-14 20:22:19.982|
2024-11-14 21:22:22 |heartbeat|91518116721|717973119    |BTC-USD   |2024-11-14 20:22:19.59 |
2024-11-14 21:22:22 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:22 
2024-11-14 21:22:25 -------------------------------------------
2024-11-14 21:22:25 Batch: 8
2024-11-14 21:22:25 -------------------------------------------
2024-11-14 21:22:25 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:25 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:25 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:25 |heartbeat|91518123296|717973178    |BTC-USD   |2024-11-14 20:22:21.176|
2024-11-14 21:22:25 |heartbeat|32476142736|85912332     |BTC-EUR   |2024-11-14 20:22:20.396|
2024-11-14 21:22:25 |heartbeat|32476143088|85912332     |BTC-EUR   |2024-11-14 20:22:20.396|
2024-11-14 21:22:25 |heartbeat|69728380330|567222378    |ETH-USD   |2024-11-14 20:22:21.819|
2024-11-14 21:22:25 |heartbeat|32476142142|85912331     |BTC-EUR   |2024-11-14 20:22:20.269|
2024-11-14 21:22:25 |heartbeat|91518120388|717973171    |BTC-USD   |2024-11-14 20:22:20.553|
2024-11-14 21:22:25 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:25 
2024-11-14 21:22:27 -------------------------------------------
2024-11-14 21:22:27 Batch: 9
2024-11-14 21:22:27 -------------------------------------------
2024-11-14 21:22:27 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:27 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:27 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:27 |heartbeat|32476143608|85912332     |BTC-EUR   |2024-11-14 20:22:20.396|
2024-11-14 21:22:27 |heartbeat|69728381588|567222380    |ETH-USD   |2024-11-14 20:22:22.469|
2024-11-14 21:22:27 |heartbeat|91518127834|717973189    |BTC-USD   |2024-11-14 20:22:23.541|
2024-11-14 21:22:27 |heartbeat|91518126378|717973182    |BTC-USD   |2024-11-14 20:22:22.196|
2024-11-14 21:22:27 |heartbeat|69728382876|567222406    |ETH-USD   |2024-11-14 20:22:24.021|
2024-11-14 21:22:27 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:27 
2024-11-14 21:22:29 -------------------------------------------
2024-11-14 21:22:29 Batch: 10
2024-11-14 21:22:29 -------------------------------------------
2024-11-14 21:22:29 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:29 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:29 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:29 |heartbeat|91518141003|717973255    |BTC-USD   |2024-11-14 20:22:26.785|
2024-11-14 21:22:29 |heartbeat|91518133246|717973220    |BTC-USD   |2024-11-14 20:22:24.294|
2024-11-14 21:22:29 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:29 
2024-11-14 21:22:31 -------------------------------------------
2024-11-14 21:22:31 Batch: 11
2024-11-14 21:22:31 -------------------------------------------
2024-11-14 21:22:31 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:31 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:31 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:31 |heartbeat|91518146853|717973268    |BTC-USD   |2024-11-14 20:22:28.651|
2024-11-14 21:22:31 |heartbeat|91518144057|717973263    |BTC-USD   |2024-11-14 20:22:27.768|
2024-11-14 21:22:31 |heartbeat|69728390088|567222439    |ETH-USD   |2024-11-14 20:22:28.558|
2024-11-14 21:22:31 |heartbeat|32476146265|85912338     |BTC-EUR   |2024-11-14 20:22:27.073|
2024-11-14 21:22:31 |heartbeat|32476146788|85912338     |BTC-EUR   |2024-11-14 20:22:27.073|
2024-11-14 21:22:31 |heartbeat|32476146967|85912338     |BTC-EUR   |2024-11-14 20:22:27.073|
2024-11-14 21:22:31 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:31 
2024-11-14 21:22:34 -------------------------------------------
2024-11-14 21:22:34 Batch: 12
2024-11-14 21:22:34 -------------------------------------------
2024-11-14 21:22:34 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:34 |type     |sequence   |last_trade_id|product_id|time                   |
2024-11-14 21:22:34 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:34 |heartbeat|69728391112|567222445    |ETH-USD   |2024-11-14 20:22:29.567|
2024-11-14 21:22:34 |heartbeat|32476147383|85912338     |BTC-EUR   |2024-11-14 20:22:27.073|
2024-11-14 21:22:34 |heartbeat|32476148373|85912338     |BTC-EUR   |2024-11-14 20:22:27.073|
2024-11-14 21:22:34 |heartbeat|91518148401|717973277 
```   |BTC-USD   |2024-11-14 20:22:29.892|
2024-11-14 21:22:34 +---------+-----------+-------------+----------+-----------------------+
2024-11-14 21:22:34