# ADZD - lab 4

## Setup

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)

query = (
    stream
    .select("side", "product_id", "last_size", "best_bid", "best_ask", "time")
    .writeStream
    .format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(10)
query.stop()
stream.printSchema()

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



## Zadanie nr 1

Analiza strumienia danych CoinBase (3p). Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. price) w przesuwnych oknach czasowych względem czasu transakcji (kolumna time), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna product_id).

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)


query = (
    stream
    .select("product_id", "price", "time")
    .groupBy(F.window("time", "2 minutes", "1 minutes"), "product_id").mean("price")
    .orderBy("window")
)

query = (
    query
    .writeStream
    .format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(10)
query.stop()

<pre>-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+---------+--------+--------+----+
|side|product_id|last_size|best_bid|best_ask|time|
+----+----------+---------+--------+--------+----+
+----+----------+---------+--------+--------+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |0.00785501|1270.96 |1271.09 |2022-12-03 13:19:07.177|
|sell|ETH-USD   |0.37817826|1270.99 |1271.1  |2022-12-03 13:19:08.052|
|buy |ETH-BTC   |1.37293   |0.07498 |0.07501 |2022-12-03 13:19:04.558|
|sell|ETH-USD   |0.01817596|1270.99 |1271.1  |2022-12-03 13:19:08.052|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|buy |ETH-USD   |0.07524612|1271.01 |1271.1  |2022-12-03 13:19:08.962|
|buy |ETH-USD   |0.00535612|1271.05 |1271.2  |2022-12-03 13:19:09.361|
|buy |ETH-USD   |0.11225   |1271.05 |1271.2  |2022-12-03 13:19:09.361|
|buy |ETH-USD   |0.04705899|1271.06 |1271.18 |2022-12-03 13:19:09.619|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|buy |ETH-USD   |1.30869813|1271.08 |1271.19 |2022-12-03 13:19:10.363|
|buy |ETH-USD   |2.24703548|1271.08 |1271.2  |2022-12-03 13:19:10.363|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|buy |ETH-USD   |0.29315   |1271.12 |1271.25 |2022-12-03 13:19:10.458|
|buy |ETH-USD   |0.59706643|1271.12 |1271.3  |2022-12-03 13:19:10.458|
|buy |ETH-USD   |0.760406  |1271.12 |1271.31 |2022-12-03 13:19:10.458|
|buy |ETH-USD   |0.65      |1271.12 |1271.28 |2022-12-03 13:19:10.458|
|buy |ETH-USD   |8.62E-6   |1271.12 |1271.31 |2022-12-03 13:19:10.458|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |0.18342573|1271.16 |1271.24 |2022-12-03 13:19:11.421|
|sell|ETH-USD   |0.29199427|1271.16 |1271.24 |2022-12-03 13:19:11.421|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |5.73E-6   |1271.15 |1271.23 |2022-12-03 13:19:11.434|
|sell|ETH-USD   |0.26262   |1271.17 |1271.19 |2022-12-03 13:19:11.548|
|sell|ETH-USD   |0.24354427|1271.15 |1271.23 |2022-12-03 13:19:11.434|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 7
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |0.02784   |1271.16 |1271.22 |2022-12-03 13:19:11.725|
|sell|ETH-USD   |0.03341963|1271.06 |1271.18 |2022-12-03 13:19:11.908|
|sell|ETH-USD   |0.01173037|1271.06 |1271.18 |2022-12-03 13:19:11.908|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 8
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |4.8E-6    |1270.99 |1271.1  |2022-12-03 13:19:12.009|
|sell|ETH-USD   |0.24029625|1270.99 |1271.08 |2022-12-03 13:19:12.065|
|sell|ETH-USD   |0.01673   |1270.99 |1271.0  |2022-12-03 13:19:12.268|
|sell|ETH-USD   |0.01114769|1270.99 |1271.08 |2022-12-03 13:19:12.062|
|sell|ETH-USD   |0.03911911|1270.91 |1271.08 |2022-12-03 13:19:12.065|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 9
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |0.23748662|1270.88 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |0.17609276|1270.86 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |0.31813921|1270.8  |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |0.27891639|1270.78 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |3.37464912|1270.78 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |0.01956124|1270.88 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |1.0       |1270.86 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |0.5       |1270.78 |1270.96 |2022-12-03 13:19:13.531|
|sell|ETH-USD   |2.80830772|1270.78 |1270.96 |2022-12-03 13:19:13.531|
+----+----------+----------+--------+--------+-----------------------+

-------------------------------------------
Batch: 10
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|sell|ETH-USD   |0.59706643|1270.79 |1270.92 |2022-12-03 13:19:13.683|
|sell|ETH-USD   |1.16186409|1270.78 |1270.92 |2022-12-03 13:19:13.683|
|sell|ETH-USD   |0.25424943|1270.8  |1270.88 |2022-12-03 13:19:13.72 |
|sell|ETH-USD   |2.39617485|1270.69 |1270.88 |2022-12-03 13:19:13.72 |
|sell|ETH-USD   |1.79910842|1270.78 |1270.92 |2022-12-03 13:19:13.683|
|sell|ETH-USD   |2.06398383|1270.78 |1270.92 |2022-12-03 13:19:13.683|
|sell|ETH-USD   |0.15640433|1270.78 |1270.88 |2022-12-03 13:19:13.72 |
+----+----------+----------+--------+--------+-----------------------+
</pre>

## Zadanie nr 2

Watermarking i dane opóźnione (3p). Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update), w szczególności aktualizacja wcześniejszych okien czasowych po przybyciu danych opóźnionych. Do rozwiązania tego zadania proszę dołączyć przykładowy output i jego opis wyjaśniający na konkretnym przykładzie działanie znaku wodnego i danych opóźnionych.


* W pierwszej serii wysyłane wiadomości o znacznikach czasowych 0s, 14s, 7s
* W drugiej serii wysyłane są wiadomości o znacznikach czasowych 15s, 8s, 21s
* W trzeciej serii wysyłane są wiadomości o znacznikach czasowych 4s, 17s

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .option("url", "ws://mock:8025")
    .load()
)


query = (
    stream
    .select("product_id", "price", "time")
    .withWatermark("time", "30 seconds")
    .groupBy(F.window("time", "10 seconds", "5 seconds"), "product_id").mean("price")
)

query = (
    query
    .writeStream
    .format("console")
    .outputMode("update")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(40)
query.stop()

<pre>-------------------------------------------                                     
Batch: 0
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:05, 2021-11-01 00:00:15}|ETH-USD   |467.35938412487957|
|{2021-11-01 00:00:20, 2021-11-01 00:00:30}|ETH-USD   |10.449063507192879|
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |676.3155958423453 |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |97.41027825173232 |
|{2021-10-31 23:59:55, 2021-11-01 00:00:05}|ETH-USD   |773.2059697461615 |
|{2021-11-01 00:00:15, 2021-11-01 00:00:25}|ETH-USD   |29.466142708446498|
+------------------------------------------+----------+------------------+

-------------------------------------------                                     
Batch: 2
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+

-------------------------------------------                                     
Batch: 3
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |722.7747719341368 |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |196.95469604221879|
|{2021-10-31 23:59:55, 2021-11-01 00:00:05}|ETH-USD   |817.6791349778364 |
|{2021-11-01 00:00:15, 2021-11-01 00:00:25}|ETH-USD   |151.65860568002824|
+------------------------------------------+----------+------------------+
</pre>

Jako przykład działania weźmy pod uwagę Batch 1:

2021-10-31 23:59:55, 2021-11-01 00:00:05 -> 0s, timestamp update

2021-11-01 00:00:00, 2021-11-01 00:00:10 -> 0s, 7s timestamps updates

2021-11-01 00:00:05, 2021-11-01 00:00:15 -> 7s, 14s timestamps updates

2021-11-01 00:00:10, 2021-11-01 00:00:20 -> 14s timestamp update

2021-11-01 00:00:15, 2021-11-01 00:00:25 -> no update

2021-11-01 00:00:20, 2021-11-01 00:00:30 -> no update

# Zadanie nr 3

Łączenie strumieni (3p). Rozdziel sztucznie dane CoinBase z kanału ticker na dwa strumienie (wykorzystując filtrowanie subskrypcji): jeden strumień dla side="sell", drugi dla side="buy". Następnie stwórz zapytanie, które łączy te strumienie i wypisuje transakcje dla danego product_id, które występowały po sobie w ciągu 1s.

In [5]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)


sell = (
    stream
    .select("side", "product_id", "price", "time")
    .filter(stream.side == "sell")
    .withColumnRenamed("side", "sell_side")
    .withColumnRenamed("product_id", "sell_product_id")
    .withColumnRenamed("price", "sell_price")
    .withColumnRenamed("time", "sell_time")
)


buy = (
    stream
    .select("side", "product_id", "price", "time")
    .filter(stream.side == "buy")
    .withColumnRenamed("side", "buy_side")
    .withColumnRenamed("product_id", "buy_product_id")
    .withColumnRenamed("price", "buy_price")
    .withColumnRenamed("time", "buy_time")
)


joined = (
    sell.join(
        buy,
        F.expr(
            """
            buy_product_id = sell_product_id AND
            buy_time >= sell_time AND
            buy_time <= sell_time + interval 1 seconds
            """
        )
    )
)

query = (
    joined
    .writeStream
    .format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(60)
query.stop()


<pre>-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---------+---------------+----------+---------+--------+--------------+---------+--------+
|sell_side|sell_product_id|sell_price|sell_time|buy_side|buy_product_id|buy_price|buy_time|
+---------+---------------+----------+---------+--------+--------------+---------+--------+
+---------+---------------+----------+---------+--------+--------------+---------+--------+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+---------+---------------+----------+-----------------------+--------+--------------+---------+-----------------------+
|sell_side|sell_product_id|sell_price|sell_time              |buy_side|buy_product_id|buy_price|buy_time               |
+---------+---------------+----------+-----------------------+--------+--------------+---------+-----------------------+
|sell     |ETH-USD        |1271.03   |2022-12-03 14:12:56.522|buy     |ETH-USD       |1271.12  |2022-12-03 14:12:57.245|
|sell     |ETH-USD        |1270.97   |2022-12-03 14:12:57.297|buy     |ETH-USD       |1271.03  |2022-12-03 14:12:57.684|
|sell     |ETH-USD        |1270.96   |2022-12-03 14:12:57.297|buy     |ETH-USD       |1271.03  |2022-12-03 14:12:57.684|
|sell     |ETH-USD        |1270.98   |2022-12-03 14:12:57.297|buy     |ETH-USD       |1271.03  |2022-12-03 14:12:57.684|
|sell     |ETH-USD        |1270.97   |2022-12-03 14:12:57.297|buy     |ETH-USD       |1271.03  |2022-12-03 14:12:57.684|
|sell     |ETH-USD        |1271.07   |2022-12-03 14:13:04.361|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:04.591|
|sell     |ETH-USD        |1271.07   |2022-12-03 14:13:04.361|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:04.591|
|sell     |ETH-USD        |1271.03   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.01   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.0    |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.02   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.01   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1270.98   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.08  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.03   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.17  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.01   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.17  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.0    |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.17  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.02   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.17  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.01   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.17  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1270.98   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.17  |2022-12-03 14:13:06.289|
|sell     |ETH-USD        |1271.03   |2022-12-03 14:13:05.289|buy     |ETH-USD       |1271.2   |2022-12-03 14:13:06.289|
+---------+---------------+----------+-----------------------+--------+--------------+---------+-----------------------+
only showing top 20 rows
</pre>