<img src="https://hilpisch.com/tpq_logo_bic.png" width="20%" align="right">

# Python for Algorithmic Trading
## Streaming Data with ZeroMQ and Online Statistics

&copy; Dr. Yves J. Hilpisch<br>
AI-Powered by GPT 5.1<br>
The Python Quants GmbH | https://tpq.io<br>
https://hilpisch.com | https://linktr.ee/dyjh


## Notebook Goals

This notebook translates the streaming architecture from Section 8 into runnable examples. You will

- sketch a minimal ZeroMQ PUB/SUB topology for EURUSD ticks,
- implement small scripts for a tick server, a logging client, and a statistics client, and
- use an online mean/variance update to maintain rolling diagnostics without storing the full history.

The code here is designed for local experiments on a single machine; adapting it to a production environment requires hardening around error handling, reconnection, and security.

> **Practical note:** Running all three components (server, logger, stats client) usually works best from separate terminals or notebook kernels. The cells below show the core logic; in practice you would place each role into its own script, mirroring the `code/tick_server.py`, `code/tick_client.py`, and `code/tick_database.py` files from the article.

In [None]:
# ZeroMQ-based EURUSD tick streaming examples.

from datetime import datetime, timezone
import time

import numpy as np
import zmq


## 1. Tick Server (Publisher)

The server publishes a synthetic EURUSD price stream over a `PUB` socket. Any number of subscribers can consume the messages.

In [None]:
def run_tick_server(endpoint: str = "tcp://127.0.0.1:5555",
                    start_price: float = 1.10,
                    sigma: float = 0.0005) -> None:
    """Publish a synthetic EURUSD random-walk price stream via ZeroMQ PUB."""

    ctx = zmq.Context.instance()
    socket = ctx.socket(zmq.PUB)
    socket.bind(endpoint)

    price = start_price
    rng = np.random.default_rng(seed=42)

    try:
        while True:
            shock = rng.normal(0.0, sigma)  #  small random increment
            price = max(0.1, price + shock)  #  keep price positive
            payload = {
                "time": datetime.now(timezone.utc).isoformat(),
                "symbol": "EURUSD",
                "price": float(price),
            }
            socket.send_json(payload)
            time.sleep(0.1)  #  10 ticks per second approx.
    except KeyboardInterrupt:
        pass
    finally:
        socket.close(0)
        ctx.term()


## 2. Clients (Subscribers)

We illustrate two simple subscribers: one that prints incoming ticks and one that maintains online statistics for log-returns.

In [None]:
def run_print_client(endpoint: str = "tcp://127.0.0.1:5555") -> None:
    """Subscribe to the tick stream and print messages as they arrive."""

    ctx = zmq.Context.instance()
    socket = ctx.socket(zmq.SUB)
    socket.connect(endpoint)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")  #  receive all symbols

    try:
        while True:
            tick = socket.recv_json()
            print(f"{tick['time']}  {tick['symbol']}  {tick['price']:.5f}")
    except KeyboardInterrupt:
        pass
    finally:
        socket.close(0)
        ctx.term()


In [None]:
def online_mean_var(count: int, mean: float, m2: float, x: float) -> tuple[int, float, float]:
    """Update running mean and second moment with new observation x."""

    count += 1
    delta = x - mean
    mean += delta / count
    delta2 = x - mean
    m2 += delta * delta2
    return count, mean, m2


def run_stats_client(endpoint: str = "tcp://127.0.0.1:5555") -> None:
    """Subscribe to ticks and maintain online stats for log-returns."""

    ctx = zmq.Context.instance()
    socket = ctx.socket(zmq.SUB)
    socket.connect(endpoint)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")

    last_price: float | None = None
    count = 0
    mean = 0.0
    m2 = 0.0

    try:
        while True:
            tick = socket.recv_json()
            price = float(tick["price"])
            if last_price is not None:
                log_ret = np.log(price) - np.log(last_price)
                count, mean, m2 = online_mean_var(count, mean, m2, log_ret)
                if count % 50 == 0:
                    var = m2 / max(count - 1, 1)
                    print(f"n={count:4d}, mean={mean:.6f}, volâ‰ˆ{np.sqrt(var):.6f}")
            last_price = price
    except KeyboardInterrupt:
        pass
    finally:
        socket.close(0)
        ctx.term()


## 3. Putting It Together

To experiment interactively:

1. Start the server in one terminal or notebook process by calling `run_tick_server()`.
2. In a second process, run `run_print_client()` to see the raw tick stream.
3. In a third process, run `run_stats_client()` to monitor online statistics for log-returns.

The ZeroMQ sockets decouple producers and consumers: additional analytics, logging, or trading components can subscribe to the same stream without changing the server code.

<img src="https://hilpisch.com/tpq_logo_bic.png" width="20%" align="right">