<a href="https://colab.research.google.com/github/lucprosa/dataeng-basic-course/blob/main/spark_streaming/examples/coinbase_producer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://colab.research.google.com/github/lucprosa/dataeng-basic-course/blob/main/spark_streaming/dataproc/producer_collab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



```
# Isto está formatado como código
```

## Producer Coinbase

- To be executed in Google Colab
- Connect to GCLOUD
- Fetch data from Coinbase via WebSocket
- Copy data to GC bucket

In [1]:
from google.colab import auth
auth.authenticate_user()

project_id = 'data-eng-dev-437916'
!gcloud config set project {project_id}

Updated property [core/project].


In [None]:
!pip install websocket-client

In [None]:
!rm -r /content/btc_stream
!mkdir -p /content/btc_stream/landing

In [None]:
import websocket
import json
import os
import time
from datetime import datetime

OUTPUT_DIR = "/content/btc_stream/landing"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def on_message(ws, message):
    data = json.loads(message)
    if data.get("type") == "ticker":
        timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
        filename = f"{OUTPUT_DIR}/btc_{timestamp}.json"
        with open(filename, "w") as f:
            json.dump(data, f)
        time.sleep(5)
        landing_path=f"gs://edit-data-eng-dev/datalake/landing/btc/btc_{timestamp}.json"
        !gsutil cp {filename} {landing_path}

def on_open(ws):
    subscribe_message = {
        "type": "subscribe",
        "channels": [{"name": "ticker", "product_ids": ["BTC-USD", "ETH-USD", "DOGE-USD"]}]
    }
    ws.send(json.dumps(subscribe_message))

ws = websocket.WebSocketApp(
    "wss://ws-feed.exchange.coinbase.com",
    on_open=on_open,
    on_message=on_message
)

ws.run_forever()

In [None]:
from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.read.json("/content/btc_stream/landing/*").show(1000, False)

In [2]:
!apt-get install openjdk-11-jdk -y
!pip install pyspark gcsfs

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.27+6~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [4]:
from pyspark.sql import SparkSession

GCS_JAR = "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.9/gcs-connector-hadoop3-2.2.9-shaded.jar"

spark = SparkSession.builder \
    .appName("GCSStreamingDemo") \
    .config("spark.jars", GCS_JAR) \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .getOrCreate()

In [5]:
df = spark.read.json("gs://edit-data-eng-dev/datalake/landing/btc")
df.show()

+---------+-------------+---------+-------------+--------+----------+--------+---------+---------+----------+------------+----+--------------------+---------+------+--------------+----------------+
| best_ask|best_ask_size| best_bid|best_bid_size|high_24h| last_size| low_24h| open_24h|    price|product_id|    sequence|side|                time| trade_id|  type|    volume_24h|      volume_30d|
+---------+-------------+---------+-------------+--------+----------+--------+---------+---------+----------+------------+----+--------------------+---------+------+--------------+----------------+
|107511.51|   0.03519702|107511.50|   0.04657998|  110040|0.00017045|107378.1|109834.12|107511.51|   BTC-USD|108077026285| buy|2025-07-04T19:07:...|845094015|ticker| 3243.85984948| 160661.52509002|
|  2487.23|   0.10279113|  2487.03|   0.20104301|  2604.5| 0.0178256|    2475|   2587.2|  2487.23|   ETH-USD| 83103319763| buy|2025-07-04T19:07:...|662988458|ticker|77056.33623492|3811262.58320407|
+---------

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("BTCPriceStream").getOrCreate()

schema = "type STRING, sequence LONG, product_id STRING, price STRING, time STRING"

df = spark.readStream.schema(schema).json("gs://edit-data-eng-dev/datalake/landing/btc/")

stream = df.select("time", "product_id", col("price").cast("double")) \
  .writeStream \
  .outputMode("append") \
  .queryName("btc_price_stream4") \
  .format("memory") \
  .start()

In [10]:
df = spark.sql("select * from btc_price_stream4")
df.show()

+--------------------+----------+---------+
|                time|product_id|    price|
+--------------------+----------+---------+
|2025-07-04T19:07:...|   BTC-USD|107511.51|
|2025-07-04T19:07:...|   ETH-USD|  2487.23|
+--------------------+----------+---------+



In [None]:

# analysis

# Latest Bitcoin price
# Calculate average BTC price per minute
# Calculate standard deviation of price over time
# How many price tickets per minute?
# Find anomalies (price == nulls or with strange values)

In [None]:
from pyspark.sql.functions import *
df.groupBy(window("time", "30 seconds")).agg(stddev("price").alias("volatility")).show()

In [None]:
df.groupBy(window("time", "30 seconds"), "product_id") \
  .agg(stddev("price").alias("volatility")).show()