# Simulate real-time data reading from historical data on a file

This notebook will read the ./tradesMarch.csv file to read trading events, and will simulate real-time data by inserting the events directly to QuestDB using multiple process in parallel. This dataset is the same that is powering the demo at `https://demo.questdb.io`, with the only difference that the demo machine tracks live data. 

The CSV file contains about one million rows of trades observed over an hour of historical cryptocurrency data. To simulate real-time behaviour, the script will override the original date with the current date and will wait 50ms between events before sending to QuestDB. You can override those configurations by changing the constants in the script.

This script will keep sending data until you click stop or exit the notebook, or until the TOTAL_EVENTS number is reached. If the number of events on the CSV is smaller than the total events configured, the script will sumply loop over the file again.

The data is stored in a table named `trades`, with the schema below. If table does not exist, it will be automatically created on the first write.

```sql
CREATE TABLE 'trades' ( 
    symbol SYMBOL CAPACITY 256 CACHE,
    side SYMBOL CAPACITY 256 CACHE,
    price DOUBLE,
    amount DOUBLE,
    timestamp TIMESTAMP
) timestamp(timestamp) PARTITION BY DAY WAL;
```

To see the live data on your database, you can open a new tab on your browser and navigate to `http://localhost:9000`. You can then execute a simple query like `SELECT * FROM trades -10;` to see the latest 10 trades. Or you could execute a sligthly more sophisticated query like `select timestamp, symbol, side, sum(price * amount) from trades sample by 1m;`  to get the totals for each symbol at 1 minute intervals.

For more realistic queries, please open in a new tab the [Examples-of-market-data-queries notebook](/notebooks/Examples-of-market-data-queries.ipynb), where you will find some queries adapted from the demo machine that should return results for your dataset.

If you want to see your live data on a real-time dashboard, please navigate in a new tab to [the demo dashboard](http://localhost:3000/d/trades-crypto-currency/trades-crypto-currency?orgId=1&refresh=250ms) powered by Grafana. The user is `admin` and password `quest`



In [4]:
from questdb.ingress import Sender, IngressError, TimestampNanos
import os
import sys
import csv
import time
from multiprocessing import Pool
from datetime import datetime

HTTP_ENDPOINT = os.getenv('QUESTDB_HTTP_ENDPOINT', 'questdb:9000')
REST_TOKEN = os.getenv('QUESTDB_REST_TOKEN')

TOTAL_EVENTS = 1000000  # Total events across all senders
DELAY_MS = 50  # Delay between events in milliseconds
NUM_SENDERS = 2  # Number of senders to execute in parallel
CSV_FILE = './tradesMarch.csv'  # Path to the CSV file
TIMESTAMP_FROM_FILE = False  # Whether to use the timestamp from the CSV file

def send(sender_id, total_events, delay_ms=DELAY_MS, csv_file=CSV_FILE, http_endpoint=HTTP_ENDPOINT, auth=REST_TOKEN):
    sys.stdout.write(f"Sender {sender_id} will send {total_events} events\n")

    try:
        if auth is not None:
            conf = f'https::addr={http_endpoint};tls_verify=unsafe_off;token={auth};'
        else:
            conf = f'http::addr={http_endpoint};'
            
        with Sender.from_conf(conf) as sender, open(csv_file, mode='r') as file:
            csv_reader = csv.DictReader(file)
            events_sent = 0
            csv_rows = list(csv_reader)  # Load the CSV data once into memory for looping
            sys.stdout.write(f"Sender {sender_id} started sending events\n")
            while events_sent < total_events:
                row = csv_rows[events_sent % len(csv_rows)]  # Loop over the CSV rows

                if TIMESTAMP_FROM_FILE:
                    timestamp_dt = datetime.strptime(row['timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ")
                    timestamp_nanos = TimestampNanos(int(timestamp_dt.timestamp() * 1e9))  # Convert to nanoseconds
                else:
                    timestamp_nanos = TimestampNanos.now()  # Get current time in nanoseconds
                
                # Ingest the row with the current timestamp
                sender.row(
                    'trades',
                    symbols={'symbol': row['symbol'], 'side': row['side']},
                    columns={
                        'price': float(row['price']),
                        'amount': float(row['amount']),
                    },
                    at=timestamp_nanos  # Send timestamp in nanoseconds
                )

                events_sent += 1

                # Delay after each event
                if delay_ms > 0:
                    time.sleep(delay_ms / 1000.0)  # Convert milliseconds to seconds

            sys.stdout.write(f"Sender {sender_id} finished sending {events_sent} events\n")

    except IngressError as e:
        sys.stderr.write(f'Sender {sender_id} got error: {e}\n')

def parallel_send(total_events, num_senders: int):
    events_per_sender = total_events // num_senders
    remaining_events = total_events % num_senders

    sender_events = [events_per_sender] * num_senders
    for i in range(remaining_events):  # Distribute the remaining events
        sender_events[i] += 1

    with Pool(processes=num_senders) as pool:
        sender_ids = range(num_senders)
        pool.starmap(send, [(sender_id, sender_events[sender_id]) for sender_id in sender_ids])

if __name__ == '__main__':
    sys.stdout.write(f'Ingestion started. Connecting to {HTTP_ENDPOINT}\n')
    parallel_send(TOTAL_EVENTS, NUM_SENDERS)

Ingestion started. Connecting to host.docker.internal:9000
Sender 0 will send 500000 events
Sender 1 will send 500000 events
Sender 1 started sending events
Sender 0 started sending events


KeyboardInterrupt: 