In [9]:
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

In [10]:
import time
import json, os, shutil

In [11]:
import ast

In [12]:
project_number = 1072423212419
project_id = "crypto-busting-375023"
location = "europe-central2"
subscription_id = "bda-coinbase-topic-sub"
topic_id = "bda-coinbase-topic"
timeout = 5.0

message_dir = "/home/bda_crypto_busters/messages"
spark_dir = f"/Local Disk/{message_dir}"

In [13]:
messages = []

In [14]:
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()
    messages.append(message.data)

In [15]:
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

In [16]:
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Listening for messages on projects/crypto-busting-375023/subscriptions/bda-coinbase-topic-sub..

Received Message {
  data: b'{"side":"","price":"0.08734","product_id":"DOGE-US...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"side":"","price":"23201.38","product_id":"BTC-US...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"side":"","price":"0.3761","product_id":"ADA-USD"...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"side":"","price":"25.54","product_id":"SOL-USD",...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"side":"","price":"1657.31","product_id":"ETH-USD...'
  ordering_key: ''
  attributes: {}
}.


In [17]:
decoded_messages = [ast.literal_eval(message.decode("UTF-8")) for message in messages]
all_messages = os.listdir(message_dir)

with open(os.path.join(message_dir, f"{len(all_messages)}.json"), "w") as outfile:
    json.dump(decoded_messages, outfile)
    
    

In [18]:
len(os.listdir(message_dir))

1

In [19]:
print(decoded_messages[0])

{'side': '', 'price': '0.08734', 'product_id': 'DOGE-USD', 'time': '2023-01-21T16:56:32.936989Z'}


In [20]:
from pyspark.sql.types import *

In [21]:
spark = SparkSession.builder.appName("Read Pub/Sub Stream to JSON").master("yarn").getOrCreate()

In [31]:
JSONschema = StructType([ 
    StructField("side", StringType(), True), 
    StructField("price", FloatType(), False), 
    StructField("product_id", StringType(), False),
    StructField("time", StringType(), False), 
])

In [32]:
sdf = (
    spark.readStream
    .schema(JSONschema)
    .format("json")
    .load(os.path.join(spark_dir, "*.json"))
)

In [33]:
print(sdf.isStreaming)

True


In [34]:
sdf.printSchema()

root
 |-- side: string (nullable = true)
 |-- price: float (nullable = true)
 |-- product_id: string (nullable = true)
 |-- time: string (nullable = true)



In [35]:
query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

23/01/21 17:04:27 WARN org.apache.spark.sql.streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b237ec97-573a-4ef4-9e39-e3f51532df7a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/01/21 17:04:27 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [26]:
df = sdf.groupBy("product_id").count()

In [27]:
(
    df.writeStream
    .format("memory")
    .outputMode("complete")
    .queryName("products_count")
    .start()
)

23/01/21 17:03:23 WARN org.apache.spark.sql.streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0cfcc57b-c04d-407c-a676-15435d93b87a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/01/21 17:03:23 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f664d022820>

In [28]:
spark.sql("select * from products_count").show()

+----------+-----+
|product_id|count|
+----------+-----+
+----------+-----+

