# Combining

In [3]:
import os
from pathlib import Path

def concatenate_python_files(repo_path):
    # Initialize a string to store all Python code
    all_python_code = ""
    
    # Walk through the repository directory
    for root, dirs, files in os.walk(repo_path):
        for file in files:
            if file.endswith(".py"):  # Check if the file is a Python file
                file_path = os.path.join(root, file)
                with open(file_path, 'r', encoding='utf-8') as f:
                    all_python_code += f.read() + "\n\n"
    
    # Create a combined_script.txt file in the root of the repository
    combined_script_path = os.path.join(repo_path, "combined_script.txt")
    with open(combined_script_path, 'w', encoding='utf-8') as f:
        f.write(all_python_code)
    print(f"Combined script created at: {combined_script_path}")

main_folder_path = Path('.')  # '.' refers to the current directory, adjust if necessary
repos = [d for d in main_folder_path.iterdir() if d.is_dir()]  # List all directories/repositories

for repo in repos:
    concatenate_python_files(repo)

Combined script created at: OtreeZTS/combined_script.txt
Combined script created at: london_fastapi/combined_script.txt
Combined script created at: london_trader_front/combined_script.txt
Combined script created at: order-book-simulation/combined_script.txt
Combined script created at: trader_london/combined_script.txt
Combined script created at: tmp/combined_script.txt


# learning REST

In [18]:
import requests

# The URL where the server is listening
url = 'http://127.0.0.1:11112/catch'

# Some data to send
data = {'key': 'value'}

# Send a POST request with JSON data
response = requests.post(url, json=data)

# Print the response from the server
print("Server response:", response.json())

Server response: {'message': 'Data received successfully!'}


# Commands that start repos

- frontend
    - yarn dev
- test backend 
    - uvicorn main:app
- backend
    - python -m traderabbit.main_process

In [1]:
import subprocess
import threading

def start_service(command, cwd):
    """Starts a service in a new thread."""
    subprocess.run(command, cwd=cwd)

# Commands and directories for each service
services = [
    (["yarn", "dev"], "london_trader_front"),  # Frontend
    (["uvicorn", "main:app"], "london_fastapi"),  # Test Backend
    (["python", "-m", "traderabbit.main_process"], "trader_london"),  # Backend
]

# Start each service in its own thread
threads = []
for command, directory in services:
    thread = threading.Thread(target=start_service, args=(command, directory))
    threads.append(thread)
    thread.start()

print("All services have been started.")

All services have been started.


[2K[1G[1myarn run v1.22.21[22m


Traceback (most recent call last):
  File "/Users/venvoo/opt/anaconda3/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/venvoo/opt/anaconda3/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/venvoo/Library/CloudStorage/GoogleDrive-venvoooo@gmail.com/My Drive/trader/trader_london/traderabbit/main_process.py", line 4, in <module>
    from traderabbit.trader import Trader
  File "/Users/venvoo/Library/CloudStorage/GoogleDrive-venvoooo@gmail.com/My Drive/trader/trader_london/traderabbit/trader.py", line 2, in <module>
    import aio_pika
ModuleNotFoundError: No module named 'aio_pika'


Port 3000 is in use, trying another one...

  [32m[1mVITE[22m v5.0.11[39m  [2mready in [0m[1m609[22m[2m[0m ms[22m

  [32m➜[39m  [1mLocal[22m:   [36mhttp://localhost:[1m3001[22m/[39m
[2m  [32m➜[39m  [1mNetwork[22m[2m: use [22m[1m--host[22m[2m to expose[22m


INFO:     Started server process [29743]
INFO:uvicorn.error:Started server process [29743]
INFO:     Waiting for application startup.
INFO:uvicorn.error:Waiting for application startup.
INFO:     Application startup complete.
INFO:uvicorn.error:Application startup complete.
ERROR:    [Errno 48] error while attempting to bind on address ('127.0.0.1', 8000): address already in use
ERROR:uvicorn.error:[Errno 48] error while attempting to bind on address ('127.0.0.1', 8000): address already in use
INFO:     Waiting for application shutdown.
INFO:uvicorn.error:Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:uvicorn.error:Application shutdown complete.


# AppMap

In [3]:
import os
import sys

import appmap

r = appmap.Recording()
with r:
    import trader_london.traderabbit
    # print(sample.C().hello_world(), file=sys.stderr)

with os.fdopen(sys.stdout.fileno(), "w", closefd=False) as stdout:
    stdout.write(appmap.generation.dump(r))
    stdout.flush()




# RabbitMQ Logging

In [4]:
import requests
import json

# Configuration
RABBITMQ_API_URL = "http://localhost:15672/api"
USERNAME = "guest"
PASSWORD = "guest"
OUTPUT_FILE = "combined_rabbitmq_config.json"

def fetch_rabbitmq_data(endpoint):
    """
    Fetch data from a specific RabbitMQ Management HTTP API endpoint.
    """
    response = requests.get(f"{RABBITMQ_API_URL}/{endpoint}", auth=(USERNAME, PASSWORD))
    response.raise_for_status()  # Raise an exception for HTTP errors
    return response.json()

def combine_rabbitmq_data():
    """
    Fetch exchanges, queues, and bindings data and combine them into a single JSON structure.
    """
    exchanges = fetch_rabbitmq_data("exchanges")
    queues = fetch_rabbitmq_data("queues")
    bindings = fetch_rabbitmq_data("bindings")

    combined_data = {
        "exchanges": exchanges,
        "queues": queues,
        "bindings": bindings
    }
    return combined_data

def save_data_to_file(data, filename):
    """
    Save the combined RabbitMQ configuration data to a file in JSON format.
    """
    with open(filename, 'w') as file:
        json.dump(data, file, indent=4)  # Pretty print for readability

def main():
    try:
        combined_data = combine_rabbitmq_data()
        save_data_to_file(combined_data, OUTPUT_FILE)
        print(f"Combined RabbitMQ configuration data saved to {OUTPUT_FILE}")
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from RabbitMQ Management API: {e}")

if __name__ == "__main__":
    main()

Combined RabbitMQ configuration data saved to combined_rabbitmq_config.json


# RabbitMQ HandsOn

In [17]:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

In [18]:
channel = connection.channel()

In [19]:
channel.queue_declare(queue='hello')

<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=1', 'queue=hello'])>"])>

In [8]:
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

 [x] Sent 'Hello World!'


In [20]:
def callback(ch, method, properties, body):
    print(f" [x] Received {body}")

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

'ctag1.ed6b9ffd3153418885dc5b314cb1c87c'

# PyLOB

In [13]:
import sqlite3

# Connect to the SQLite database file
conn = sqlite3.connect('/Users/venvoo/Library/CloudStorage/GoogleDrive-venvoooo@gmail.com/My Drive/trader/PyLOB/src/lob.db')

# Create a cursor object using the connection
cursor = conn.cursor()

In [14]:
from PyLOB.src.PyLOB.orderbook import OrderBook

# create a LOB object
tx = OrderBook(conn)

In [16]:
help(tx)

Help on OrderBook in module PyLOB.src.PyLOB.orderbook object:

class OrderBook(builtins.object)
 |  OrderBook(db, tick_size=0.0001)
 |  
 |  Methods defined here:
 |  
 |  __init__(self, db, tick_size=0.0001)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  betterPrice(self, side, price, comparedPrice)
 |      # return whether comparedPrice has better matching chance than price
 |  
 |  cancelOrder(self, side, idNum, time=None)
 |  
 |  clipPrice(self, price)
 |      Clips the price according to the ticksize
 |  
 |  getBestAsk(self, instrument)
 |  
 |  getBestBid(self, instrument)
 |  
 |  getPrice(self, instrument, side, direction='asc')
 |  
 |  getVolumeAtPrice(self, instrument, side, price)
 |  
 |  getWorstAsk(self, instrument)
 |  
 |  getWorstBid(self, instrument)
 |  
 |  modifyOrder(self, idNum, orderUpdate, time=None, verbose=False)
 |  
 |  orderGetSide(self, idNum)
 |  
 |  print(self, instrument)
 |  
 |  processMatchesDB(self, quote, crsr,

In [22]:
tx.updateTime()

# Pyziabm

In [26]:
import pyziabm.pyziabm as pzi

ModuleNotFoundError: No module named 'pyziabm.runner2017mpi_r4'

In [24]:
pzi.Runner()

AttributeError: module 'pyziabm' has no attribute 'Runner'

# gym-trading

In [8]:
import sys
sys.path.append('/gym_trading')

# numba

In [1]:
import numpy as np
import time
from numba import jit

# Define the function without Numba
def sum_of_squares(arr):
    result = 0
    for x in arr:
        result += x * x
    return result

# Define the same function with Numba optimization
@jit(nopython=True)
def sum_of_squares_numba(arr):
    result = 0
    for x in arr:
        result += x * x
    return result

# Create a large array for testing
large_array = np.random.rand(10000000)  # 10 million random numbers

# Measure performance without Numba
start_time = time.time()
sum_of_squares(large_array)
without_numba_time = time.time() - start_time
print("Without Numba:", without_numba_time, "seconds")

# Measure performance with Numba (first run, includes compilation)
start_time = time.time()
sum_of_squares_numba(large_array)
with_numba_first_run_time = time.time() - start_time
print("With Numba (first run, includes compilation):", with_numba_first_run_time, "seconds")

# Measure performance with Numba (subsequent run, compilation already done)
start_time = time.time()
sum_of_squares_numba(large_array)
with_numba_subsequent_run_time = time.time() - start_time
print("With Numba (subsequent run):", with_numba_subsequent_run_time, "seconds")

Without Numba: 0.9154167175292969 seconds
With Numba (first run, includes compilation): 0.8144698143005371 seconds
With Numba (subsequent run): 0.010838031768798828 seconds


# noise trader

In [3]:
from trader_london.external_traders.noise_trader import get_signal_noise, settings_noise, settings, get_noise_rule_unif

In [6]:
signals = get_signal_noise(None, settings_noise)

In [None]:
orders = get_noise_rule_unif(book_format, signals, noise_state, settings_noise, settings)

# MongoDB to MotherDuck

In [1]:
from pymongo import MongoClient
import polars as pl

# Connect to MongoDB
client = MongoClient('localhost', 27017)
db = client['trader']
cursor = db.message.find()

# Convert cursor to list of dictionaries
data = list(cursor)

In [41]:
from pymongo import MongoClient
import polars as pl
import json
import uuid
import datetime
import duckdb
import os
import tempfile

MD_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzZXNzaW9uIjoidmVudm9vb28uZ21haWwuY29tIiwiZW1haWwiOiJ2ZW52b29vb0BnbWFpbC5jb20iLCJ1c2VySWQiOiI4NjIyNzk3YS05ZDkzLTQ0MTYtYjhlNy0wNTdiMDFkNjc1NjEiLCJpYXQiOjE3MDYwNTk3MjgsImV4cCI6MTczNzYxNzMyOH0.QhvzD9nk5C3L-28u5gfZXqJosOvIpV-S6ElKwp5HeU4'
DATA_FILE = tempfile.NamedTemporaryFile(suffix='.parquet', delete=False).name

def handle_datetime(value):
    return value.isoformat() if isinstance(value, datetime.datetime) else value

def handle_bytes(value):
    return str(uuid.UUID(bytes=value)) if len(value) == 16 else value

def handle_json(value):
    return json.dumps(value, default=str) if isinstance(value, (dict, list)) else value

def flatten_item(item):
    flat_dict = {}
    for key, value in item.items():
        if key == "_id":
            flat_dict["id"] = str(value)
        elif key == "content":
            if isinstance(value, dict):
                for sub_key, sub_value in value.items():
                    if sub_key in ["order_book", "active_orders", "history"]:
                        flat_dict[sub_key] = handle_json(sub_value)
                    elif isinstance(sub_value, datetime.datetime):
                        flat_dict[sub_key] = handle_datetime(sub_value)
                    else:
                        flat_dict[sub_key] = sub_value
        else:
            if isinstance(value, datetime.datetime):
                flat_dict[key] = handle_datetime(value)
            elif isinstance(value, bytes):
                flat_dict[key] = handle_bytes(value)
            elif isinstance(value, (dict, list)):
                flat_dict[key] = handle_json(value)
            else:
                flat_dict[key] = value
    return flat_dict

# Connect to DuckDB
client = duckdb.connect(f"md:?motherduck_token={MD_TOKEN}")

# Check if the table exists and get the latest timestamp
table_exists = client.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'trading_data'").fetchone()[0] > 0
latest_timestamp = None
if table_exists:
    latest_timestamp = client.execute("SELECT max(timestamp) FROM trading_data").fetchone()[0]

# Connect to MongoDB
mongo_client = MongoClient("localhost", 27017)

# Fetch data from MongoDB based on the latest timestamp
query = {}
if latest_timestamp:
    query = {"timestamp": {"$gt": latest_timestamp}}
data = list(mongo_client["trader"].message.find(query))

flattened_data = [flatten_item(item) for item in data]
df = pl.DataFrame(flattened_data)

# Write DataFrame to Parquet
df.write_parquet(DATA_FILE)

# Write DataFrame to Parquet only if it is not empty
if not df.is_empty():
    df.write_parquet(DATA_FILE)

    # Create or append to the table
    if table_exists:
        client.execute(f"INSERT INTO trading_data SELECT * FROM read_parquet('{DATA_FILE}')")
    else:
        client.execute(f"CREATE TABLE trading_data AS SELECT * FROM read_parquet('{DATA_FILE}')")
else:
    print("No new data to process.")

No new data to process.


In [33]:
table_exists = client.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'trading_data'").fetchone()[0] > 0
latest_timestamp = None
if table_exists:
    latest_timestamp = client.execute("SELECT max(timestamp) FROM trading_data").fetchone()[0]

# Filter DataFrame based on the latest timestamp
if latest_timestamp:
    df = df.filter(df['timestamp'] > latest_timestamp)

# Write DataFrame to Parquet
df.write_parquet(DATA_FILE)

# Create or append to the table
if table_exists:
    client.execute(f"INSERT INTO trading_data SELECT * FROM read_parquet('{DATA_FILE}')")
else:
    client.execute(f"CREATE TABLE trading_data AS SELECT * FROM read_parquet('{DATA_FILE}')")

# Orchestration

In [2]:
from pymongo import MongoClient
from analysis.ingest import flatten_item

mongo_client = MongoClient("localhost", 27017)
data = list(mongo_client["trader"].message.find())

In [6]:
import polars as pl
flattended_data = [flatten_item(item) for item in data]
df = pl.DataFrame(flattended_data)

In [7]:
import asyncio
from prefect import flow, task
from analysis.parameterize import generate_and_store_parameters
from analysis.run import create_trading_session

# Define bounds for each parameter
bounds = {
    'num_informed_traders': (1, 10),
    'activity_frequency': (0.5, 5.5),
    'trade_intensity_informed': (0.1, 1.0),
    'trade_direction_informed': (0, 1)  # Example for binary decision, will need post-processing
}

# Resolution for Sobol sequence
resolution = 3

@task
async def create_and_wait_session(trader_data):
    session_info = create_trading_session(trader_data)
    session_id = session_info.get('data', {}).get('trading_session_uuid')
    print(f"Trading Session {session_id} Initiated")

    # Calculate the wait time based on the trading day duration plus a buffer
    simulation_length = trader_data.get('trading_day_duration', 1)  # Default to 10 minutes if not specified
    buffer_time = 0.1  # Additional buffer time in minutes
    total_wait_time = (simulation_length + buffer_time) * 60  # Convert minutes to seconds

    # Wait for the duration of the session plus buffer
    await asyncio.sleep(total_wait_time)
    print(f"Trading Session {session_id} Completed")

@flow
def run_trading_sessions(ts):
    loop = asyncio.get_event_loop()
    tasks = []
    for trader_data in ts:
        task = loop.create_task(create_and_wait_session(trader_data))
        tasks.append(task)
        # Wait for the current session to complete before starting the next
        loop.run_until_complete(task)

def main():
    ts = generate_and_store_parameters(bounds=bounds, resolution=resolution)
    run_trading_sessions(ts)

if __name__ == "__main__":
    main()

id,trading_session_id,text,type,order_book,active_orders,history,spread,midpoint,transaction_price,timestamp,content
str,str,str,str,str,str,str,f64,f64,f64,str,str
"""663761d7e5f21c…","""7c416aaf-a4a6-…","""book is update…","""update""","""{""bids"": [{""x""…","""[{""id"": ""b'\\x…","""[]""",,,,"""2024-05-05T11:…",
"""663761d7e5f21c…","""7c416aaf-a4a6-…","""book is update…","""update""","""{""bids"": [{""x""…","""[{""id"": ""b'\\x…","""[]""",,,,"""2024-05-05T11:…",
"""663761d7e5f21c…","""7c416aaf-a4a6-…","""book is update…","""update""","""{""bids"": [], ""…","""[{""id"": ""b'\\x…","""[{""_id"": ""768c…",,,2000.0,"""2024-05-05T11:…",
"""663761d7e5f21c…","""7c416aaf-a4a6-…","""book is update…","""update""","""{""bids"": [], ""…","""[{""id"": ""b'\\x…","""[{""_id"": ""768c…",,,2000.0,"""2024-05-05T11:…",
"""663761d7e5f21c…","""7c416aaf-a4a6-…","""book is update…","""update""","""{""bids"": [], ""…","""[{""id"": ""b'\\x…","""[{""_id"": ""768c…",,,2000.0,"""2024-05-05T11:…",
…,…,…,…,…,…,…,…,…,…,…,…
"""663aace0cedf1a…","""03a31f38-c842-…","""book is update…","""update""","""{""bids"": [{""x""…","""[{""id"": ""b'9u\…","""[{""_id"": ""c466…",2.0,1998.0,1999.0,"""2024-05-07T23:…",
"""663aace6cedf1a…","""03a31f38-c842-…","""book is update…","""update""","""{""bids"": [{""x""…","""[{""id"": ""b'9u\…","""[{""_id"": ""c466…",2.0,1998.0,1999.5,"""2024-05-07T23:…",
"""663aace6cedf1a…","""03a31f38-c842-…","""book is update…","""update""","""{""bids"": [{""x""…","""[{""id"": ""b'9u\…","""[{""_id"": ""c466…",2.0,1998.0,1999.5,"""2024-05-07T23:…",
"""663aaceecedf1a…","""03a31f38-c842-…","""book is update…","""update""","""{""bids"": [{""x""…","""[{""id"": ""b'9u\…","""[{""_id"": ""c466…",2.0,1998.0,1999.5,"""2024-05-07T23:…",


In [1]:
import pika

def test_rabbitmq():
    try:
        # Connect to RabbitMQ on localhost
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()

        # Declare a queue
        channel.queue_declare(queue='test_queue')

        # Send a message
        channel.basic_publish(exchange='',
                              routing_key='test_queue',
                              body='Hello RabbitMQ!')

        print("Message sent to RabbitMQ")

        # Set up a consumer and receive a message
        def callback(ch, method, properties, body):
            print(f"Received: {body}")

        channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)

        # Start consuming (non-blocking)
        channel.start_consuming()

        # Close the connection
        connection.close()

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    test_rabbitmq()

An error occurred: Timeout during AMQP handshake'localhost'/(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672)); ssl=False
