# Streaming Data Acquisition - From Raw Stream to Clean CSV

---

## Phase 1: Setup and Architecture

### Install the necessary libraries:
1. **websocket-client**: To establish the persistent connection.
2. **pandas**: To process, clean, and export the collected data.

In [1]:
!pip install websocket-client pandas
print("Dependencies installed successfully.")


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Dependencies installed successfully.


In [2]:
## Import all required Python modules

In [3]:
import websocket
import json
import time
import threading
import pandas as pd
from datetime import datetime

print("Modules imported.")

Modules imported.


### Define Global Configuration and Storage
This cell sets the target URL and initializes the empty list that will collect all the raw messages pushed from the server.

In [4]:
# --- Configuration ---
# Target URL for the Binance BTC/USDT trade stream (using the stable fstream endpoint)
STREAM_URL = "wss://fstream.binance.com/ws/btcusdt@trade" 

# --- Global Storage ---
# This list will collect every single message (event) pushed by the server.
raw_data_collector = []

print(f"Target Stream URL set: {STREAM_URL}")
print("Raw data collector initialized.")

Target Stream URL set: wss://fstream.binance.com/ws/btcusdt@trade
Raw data collector initialized.


### Define the `on_open` Callback
This function runs only once when the persistent connection is successfully established.

In [5]:
def on_open(ws):
    """Callback: Executed when the connection handshake is successful."""
    print("--- Connection Established. Starting Real-Time Data Stream ---")
    
print("on_open callback defined.")

on_open callback defined.


### Define the `on_error` and `on_close` Callbacks
These functions handle connection failures and the graceful shutdown of the stream.

In [6]:
def on_error(ws, error):
    """Callback: Executed if an error occurs during the connection."""
    print(f"--- Connection Error: {error} ---")

def on_close(ws, close_status_code, close_msg):
    """Callback: Executed when the connection is intentionally closed."""
    print("--- Connection Closed ---")
    
print("on_error and on_close callbacks defined.")

on_error and on_close callbacks defined.


### Define the `on_message` Callback (Raw Capture Logic)
This is the most critical function. It runs for every incoming message. For the first run, we will not filter anything.

In [7]:
def on_message(ws, message):
    """Callback: Executed for EVERY message pushed by the server."""
    global raw_data_collector
    
    try:
        data = json.loads(message)
        
        # 1. Store the RAW, unfiltered message dictionary
        raw_data_collector.append(data)
        
        # 2. Extract and format key fields for immediate screen output
        trade_time = datetime.fromtimestamp(data['E'] / 1000).strftime('%H:%M:%S')
        price = float(data.get('p', 0))
        volume = float(data.get('q', 0))
        
        # Print everything that comes in, including potential 0-value keep-alives
        print(f"[{trade_time}] RAW Trade: Price=${price:,.2f} | Volume={volume}")

    except Exception as e:
        # Ignore errors from malformed JSON (rare, but possible)
        pass 
    
print("on_message callback defined for RAW data capture.")

on_message callback defined for RAW data capture.


---

## Phase 2: RAW Stream Capture and Observation

### Create the WebSocket Client Object

In [8]:
# Clear the collector list before starting a new run
raw_data_collector = []

# Create the WebSocketApp instance, linking the URL to the callback functions
ws = websocket.WebSocketApp(
    STREAM_URL,
    on_message=on_message,
    on_open=on_open,
    on_error=on_error,
    on_close=on_close
)

print("WebSocket client object created, ready to connect.")

WebSocket client object created, ready to connect.


### Start, Run, and Close the Stream Listener
This cell starts the connection in a separate thread and uses time.sleep() to control the duration.

In [None]:
print("Starting RAW stream listener for 15 seconds. OBSERVE THE OUTPUT CAREFULLY.")

# Run the socket connection in a separate thread
wst = threading.Thread(target=ws.run_forever)
wst.daemon = True
wst.start()

# Hold the main thread execution for 15 seconds
time.sleep(15) 

print("Time's up. Attempting to close connection...")

# Close the connection gracefully.
ws.close() 

print(f"\nFinished RAW capture. Collected {len(raw_data_collector)} total raw messages.")

Starting RAW stream listener for 15 seconds. OBSERVE THE OUTPUT CAREFULLY.
--- Connection Established. Starting Real-Time Data Stream ---
[00:06:22] RAW Trade: Price=$114,244.00 | Volume=0.047
[00:06:22] RAW Trade: Price=$114,244.00 | Volume=0.002
[00:06:22] RAW Trade: Price=$114,244.00 | Volume=0.008
[00:06:22] RAW Trade: Price=$114,244.00 | Volume=0.002
[00:06:22] RAW Trade: Price=$114,244.00 | Volume=0.807
[00:06:22] RAW Trade: Price=$114,244.00 | Volume=0.002
[00:06:23] RAW Trade: Price=$114,243.90 | Volume=0.364
[00:06:23] RAW Trade: Price=$114,244.00 | Volume=0.002
[00:06:24] RAW Trade: Price=$114,244.00 | Volume=0.002
[00:06:24] RAW Trade: Price=$114,244.00 | Volume=0.019
[00:06:24] RAW Trade: Price=$114,243.90 | Volume=0.009
[00:06:24] RAW Trade: Price=$114,244.00 | Volume=0.002
[00:06:24] RAW Trade: Price=$114,243.90 | Volume=0.002
[00:06:25] RAW Trade: Price=$114,243.90 | Volume=0.001
[00:06:25] RAW Trade: Price=$114,244.00 | Volume=0.044
[00:06:25] RAW Trade: Price=$114,244.

--- Connection Closed ---


### Commentary - Why the $0 Trades?

--- Observation: The Data Quality Problem ---

You should have observed several messages showing Price=$0.00 or Volume=0.

These are **Keep-Alive (Ping/Pong) Messages**. They are network heartbeats, NOT valid trades.

If we analyzed this raw data, the 0-values would skew our average price and volume calculations.

Our next step is to apply a **Filter** to discard them.

---

## Phase 3: Data Transformation and Export

### Load Raw Data into Pandas
This is the start of the Transformation (T) phase from ETL.

In [10]:
# Convert the list of dictionaries collected in Phase 2 into a Pandas DataFrame
df_raw = pd.DataFrame(raw_data_collector)

print(f"DataFrame loaded with {len(df_raw)} raw messages.")
print("Raw DataFrame columns:")
print(df_raw.columns.tolist())
display(df_raw.head())

DataFrame loaded with 56 raw messages.
Raw DataFrame columns:
['e', 'E', 'T', 's', 't', 'p', 'q', 'X', 'm']


Unnamed: 0,e,E,T,s,t,p,q,X,m
0,trade,1759190782372,1759190782371,BTCUSDT,6666783623,114244.0,0.047,MARKET,False
1,trade,1759190782436,1759190782436,BTCUSDT,6666783624,114244.0,0.002,MARKET,False
2,trade,1759190782466,1759190782466,BTCUSDT,6666783625,114244.0,0.008,MARKET,False
3,trade,1759190782763,1759190782763,BTCUSDT,6666783626,114244.0,0.002,MARKET,False
4,trade,1759190782807,1759190782807,BTCUSDT,6666783627,114244.0,0.807,MARKET,False


### Data Preparation (Type Conversion)
The price and volume columns were read as strings or objects. We must convert them to numeric types before filtering.

In [11]:
# Convert price ('p') and volume ('q') columns to numeric types.
df_raw['p'] = pd.to_numeric(df_raw['p'], errors='coerce')
df_raw['q'] = pd.to_numeric(df_raw['q'], errors='coerce')

print("Price ('p') and Volume ('q') columns converted to numeric (float).")

Price ('p') and Volume ('q') columns converted to numeric (float).


### Apply the Cleaning Filter
This cell applies the filter to eliminate the keep-alive messages.

In [12]:
print(f"Messages before filtering: {len(df_raw)}")

# --- APPLY THE DATA QUALITY FILTER ---
# A valid trade must have a positive price AND a positive volume.
df_clean = df_raw[(df_raw['p'] > 0) & (df_raw['q'] > 0)].copy()

print(f"Messages after filtering: {len(df_clean)}")
print(f"Successfully filtered out {len(df_raw) - len(df_clean)} keep-alive messages.")

display(df_clean.head())

Messages before filtering: 56
Messages after filtering: 54
Successfully filtered out 2 keep-alive messages.


Unnamed: 0,e,E,T,s,t,p,q,X,m
0,trade,1759190782372,1759190782371,BTCUSDT,6666783623,114244.0,0.047,MARKET,False
1,trade,1759190782436,1759190782436,BTCUSDT,6666783624,114244.0,0.002,MARKET,False
2,trade,1759190782466,1759190782466,BTCUSDT,6666783625,114244.0,0.008,MARKET,False
3,trade,1759190782763,1759190782763,BTCUSDT,6666783626,114244.0,0.002,MARKET,False
4,trade,1759190782807,1759190782807,BTCUSDT,6666783627,114244.0,0.807,MARKET,False


### Add Human-Readable Columns
The raw data uses short keys like 'E' (Event Time) and 's' (Symbol). We create descriptive columns for the final CSV.

In [13]:
# Convert 'E' (milliseconds since epoch) to a readable timestamp
df_clean['Trade_Time'] = pd.to_datetime(df_clean['E'], unit='ms')

# Create descriptive columns
df_clean['Price_USD'] = df_clean['p']
df_clean['Volume_BTC'] = df_clean['q']

print("Human-readable columns (Trade_Time, Price_USD, Volume_BTC) created.")

Human-readable columns (Trade_Time, Price_USD, Volume_BTC) created.


### Select Final Columns
Select only the essential, clean, and formatted columns for the final file.

In [14]:
# Select and rename final columns for a clean output structure
df_final = df_clean[['Trade_Time', 'Price_USD', 'Volume_BTC', 's']].rename(
    columns={'s': 'Symbol'}
)

print("Final DataFrame structure prepared.")
display(df_final.head())

Final DataFrame structure prepared.


Unnamed: 0,Trade_Time,Price_USD,Volume_BTC,Symbol
0,2025-09-30 00:06:22.372,114244.0,0.047,BTCUSDT
1,2025-09-30 00:06:22.436,114244.0,0.002,BTCUSDT
2,2025-09-30 00:06:22.466,114244.0,0.008,BTCUSDT
3,2025-09-30 00:06:22.763,114244.0,0.002,BTCUSDT
4,2025-09-30 00:06:22.807,114244.0,0.807,BTCUSDT


### Export to CSV
This is the Load (L) phase of ETL.

In [15]:
# Define the output filename
output_filename = 'btc_usdt_stream_clean.csv'

# Export the clean DataFrame to a CSV file without the pandas index
df_final.to_csv(output_filename, index=False)

print(f"--- Load Phase Complete ---")
print(f"Clean, filtered stream data successfully exported to: {output_filename}")

--- Load Phase Complete ---
Clean, filtered stream data successfully exported to: btc_usdt_stream_clean.csv
