In [None]:
import logging

from ezib_async import ezIBAsync, util

In [None]:
util.logToConsole("DEBUG")
logging.getLogger("ib_async").setLevel("CRITICAL")

In [None]:
# initialize ezIBpy
ezib = ezIBAsync()
await ezib.connectAsync(ibclient=103, ibport=4002)

In [None]:
ezib.contracts

In [None]:
# create a contract
contract = await ezib.createStockContract("AAPL")
contract

In [None]:
await ezib.requestMarketData()

In [None]:
ezib.ib.ticker(contract)

In [None]:
# create a contract
contract = await ezib.createFuturesContract("ES", exchange="CME", expiry="202506")

In [None]:

# submit a bracket order (entry=0 = MKT order)
order = ezib.createBracketOrder(contract, quantity=1, entry=0, target=6000., stop=5500.)


In [None]:
ezib.ib.openOrders()

In [None]:
# see the positions
util.df(ezib.ib.positions())

In [None]:
# disconnect
ezib.cancelMarketData()
ezib.disconnect()

In [None]:
ezib.isConnected

In [2]:
from dataclasses import dataclass
from typing import Dict

@dataclass(slots=True)
class CalculatedTicker:
    symbol: str
    last: float
    pct_change: float
    sma_5: float  # 示例：计算后的5周期均线

In [6]:
from timeit import timeit
from ib_async import Ticker

# 测试 dataclass vs dict 的写入和读取
ticker = Ticker(last=150.0, prevLast=145.0)
ticker.symbol = 'AAPL'

def test_dataclass():
    calc = CalculatedTicker(symbol=ticker.symbol, last=ticker.last, pct_change=3.45, sma_5=148.0)
    return calc.pct_change

def test_dict():
    calc = {'symbol': ticker.symbol, 'last': ticker.last, 'pct_change': 3.45, 'sma_5': 148.0}
    return calc['pct_change']

print("dataclass 访问速度:", timeit(test_dataclass, number=1_000_000))  # 约 0.05s
print("dict 访问速度:", timeit(test_dict, number=1_000_000))          # 约 0.07s

dataclass 访问速度: 0.6268051250008284
dict 访问速度: 0.351843040996755


In [15]:
import numpy as np
class CircularBuffer:
    def __init__(self, size):
        self.size = size
        self.buffer = np.zeros(size, dtype=[
            ('datetime', 'datetime64[ns]'),
            ('ask', 'float32'),
            ('bid', 'float32'),
            ('askSize', 'uint16')
        ])
        self.index = 0
        
    def append(self, data):
        if self.index < self.size:
            self.buffer[self.index] = data
            self.index +=1
        else:
            
            self.buffer[:-1] = self.buffer[1:]
            self.buffer[-1] = data

# 测试性能
cb = CircularBuffer(100_000_000)
%time cb.append((np.datetime64('now'), 150.5, 150.4, 500))  # ≈200ns/次

CPU times: user 23 μs, sys: 29 μs, total: 52 μs
Wall time: 90.1 μs


In [34]:
ez_data = ezIBAsync().marketData
ez_data

{0:           bid  bidsize  ask  asksize  last  lastsize
 datetime                                            
 0           0        0    0        0     0         0}

In [38]:
ez_data[0].values

array([[0, 0, 0, 0, 0, 0]])

In [40]:
from datetime import datetime
import time
import pandas as pd

def test_insert_performance():
    # Test ezIBpy marketData dictionary
    ez_data = ezIBAsync().marketData
    start = time.time()
    for i in range(10000):
        tickerId = i % 100  # Simulate 100 different instruments
        if tickerId not in ez_data:
            ez_data[tickerId] = ez_data[0].copy()  # Initialize like original
        # Simulate tick update
        new_row = pd.Series(ez_data[0].iloc[0].values, index=ez_data[0].columns)
        ez_data[tickerId].loc[datetime.now()] = new_row
    print(f"Dictionary insert time: {time.time()-start}")

test_insert_performance()
# Test CircularBuffer
cb = CircularBuffer(10000)
start = time.time()
for i in range(10000):
    data = (np.datetime64('now'), 1.23, 1.24, 100)  # sample data
    cb.append(data)
print(f"CircularBuffer insert time: {time.time()-start}")

Dictionary insert time: 9.242734909057617
CircularBuffer insert time: 0.006803750991821289


In [41]:
from ezib_async import ezIBAsync
def test_access_performance():
    # Setup
    ez_data = ezIBAsync().marketData
    cb = CircularBuffer(1000)
    # [Fill both with test data...]
    
    # Test latest value access
    start = time.time()
    latest = ez_data[0].iloc[-1]  # Dictionary+DataFrame access
    print(f"Dictionary access time: {time.time()-start}")

    start = time.time()
    latest = cb.buffer[-1]  # Direct array access
    print(f"CircularBuffer access time: {time.time()-start}")

test_access_performance()

Dictionary access time: 0.001390218734741211
CircularBuffer access time: 8.106231689453125e-06


In [43]:
class OptimizedCircularBuffer:
    def __init__(self, size):
        self.size = size
        # 使用单独的数组存储不同类型的数据
        self.datetime = np.zeros(size, dtype='datetime64[ns]')
        self.ask = np.zeros(size, dtype='float32')
        self.bid = np.zeros(size, dtype='float32')
        self.askSize = np.zeros(size, dtype='uint16')
        self.index = 0
        self.is_full = False
        
    def append(self, data):
        datetime, ask, bid, askSize = data
        if not self.is_full:
            self.datetime[self.index] = datetime
            self.ask[self.index] = ask
            self.bid[self.index] = bid
            self.askSize[self.index] = askSize
            self.index += 1
            if self.index >= self.size:
                self.is_full = True
                self.index = 0
        else:
            # 使用循环缓冲区模式，避免数据移动
            self.datetime[self.index] = datetime
            self.ask[self.index] = ask
            self.bid[self.index] = bid
            self.askSize[self.index] = askSize
            self.index = (self.index + 1) % self.size
            
    def get_latest(self):
        # 返回最新的数据点
        if self.is_full:
            idx = (self.index - 1) % self.size
        else:
            idx = max(0, self.index - 1)
        return (self.datetime[idx], self.ask[idx], self.bid[idx], self.askSize[idx])

In [44]:
from dataclasses import dataclass
import numpy as np

@dataclass
class MarketDataPoint:
    datetime: np.datetime64
    ask: float
    bid: float
    askSize: int

class DataClassCircularBuffer:
    def __init__(self, size):
        self.size = size
        # 使用NumPy数组存储dataclass实例
        self.buffer = np.array([MarketDataPoint(
            np.datetime64('1970-01-01'), 0.0, 0.0, 0) for _ in range(size)])
        self.index = 0
        self.is_full = False
        
    def append(self, data):
        point = MarketDataPoint(*data)
        self.buffer[self.index] = point
        self.index = (self.index + 1) % self.size
        if not self.is_full and self.index == 0:
            self.is_full = True
            
    def get_latest(self):
        if self.is_full:
            idx = (self.index - 1) % self.size
        else:
            idx = max(0, self.index - 1)
        return self.buffer[idx]

In [46]:
import numba as nb

@nb.jitclass([
    ('size', nb.int32),
    ('datetime', nb.types.Array(nb.int64, 1, 'C')),  # 使用int64存储datetime64
    ('ask', nb.types.Array(nb.float32, 1, 'C')),
    ('bid', nb.types.Array(nb.float32, 1, 'C')),
    ('askSize', nb.types.Array(nb.uint16, 1, 'C')),
    ('index', nb.int32),
    ('is_full', nb.boolean)
])
class NumbaCircularBuffer:
    def __init__(self, size):
        self.size = size
        self.datetime = np.zeros(size, dtype=np.int64)
        self.ask = np.zeros(size, dtype=np.float32)
        self.bid = np.zeros(size, dtype=np.float32)
        self.askSize = np.zeros(size, dtype=np.uint16)
        self.index = 0
        self.is_full = False
        
    def append(self, datetime, ask, bid, askSize):
        self.datetime[self.index] = datetime
        self.ask[self.index] = ask
        self.bid[self.index] = bid
        self.askSize[self.index] = askSize
        self.index = (self.index + 1) % self.size
        if not self.is_full and self.index == 0:
            self.is_full = True

AttributeError: module 'numba' has no attribute 'jitclass'

In [47]:
import time
import numpy as np
import pandas as pd
from datetime import datetime

# 测试不同实现的性能
def test_performance():
    # 测试参数
    size = 10000
    iterations = 100000
    
    # 创建测试数据
    test_data = [(np.datetime64('now'), 150.5, 150.4, 500) for _ in range(iterations)]
    
    # 测试原始CircularBuffer
    cb = CircularBuffer(size)
    start = time.time()
    for data in test_data:
        cb.append(data)
    print(f"Original CircularBuffer append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = cb.buffer[-1]
    print(f"Original CircularBuffer access time: {time.time()-start}")
    
    # 测试优化版CircularBuffer
    ocb = OptimizedCircularBuffer(size)
    start = time.time()
    for data in test_data:
        ocb.append(data)
    print(f"Optimized CircularBuffer append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = ocb.get_latest()
    print(f"Optimized CircularBuffer access time: {time.time()-start}")
    
    # 测试DataFrame
    df = pd.DataFrame(columns=['datetime', 'ask', 'bid', 'askSize'])
    df.set_index('datetime', inplace=True)
    start = time.time()
    for data in test_data:
        df.loc[data[0]] = data[1:]
        if len(df) > size:
            df = df.iloc[-size:]
    print(f"DataFrame append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = df.iloc[-1]
    print(f"DataFrame access time: {time.time()-start}")

test_performance()

Original CircularBuffer append time: 71.61098909378052
Original CircularBuffer access time: 0.023614883422851562
Optimized CircularBuffer append time: 0.11003303527832031
Optimized CircularBuffer access time: 0.08566021919250488
DataFrame append time: 4.571037292480469
DataFrame access time: 1.754770040512085


In [52]:
import numpy as np
import pandas as pd
import time
from datetime import datetime
from dataclasses import dataclass

# 原始 CircularBuffer 实现
class CircularBuffer:
    def __init__(self, size):
        self.size = size
        self.buffer = np.zeros(size, dtype=[
            ('datetime', 'datetime64[ns]'),
            ('ask', 'float32'),
            ('bid', 'float32'),
            ('askSize', 'uint16')
        ])
        self.index = 0
        self.is_full = False
        
    def append(self, data):
        if self.index < self.size:
            self.buffer[self.index] = data
            self.index += 1
            if self.index >= self.size:
                self.is_full = True
                self.index = 0
        else:
            # 使用循环缓冲区模式，避免数据移动
            self.buffer[self.index] = data
            self.index = (self.index + 1) % self.size
    
    def get_latest(self):
        if self.is_full:
            idx = (self.index - 1) % self.size
        else:
            idx = max(0, self.index - 1)
        return self.buffer[idx]

# 优化版 CircularBuffer - 使用分离的数组
class OptimizedCircularBuffer:
    def __init__(self, size):
        self.size = size
        # 使用单独的数组存储不同类型的数据
        self.datetime = np.zeros(size, dtype='datetime64[ns]')
        self.ask = np.zeros(size, dtype='float32')
        self.bid = np.zeros(size, dtype='float32')
        self.askSize = np.zeros(size, dtype='uint16')
        self.index = 0
        self.is_full = False
        
    def append(self, data):
        datetime, ask, bid, askSize = data
        if not self.is_full and self.index < self.size:
            self.datetime[self.index] = datetime
            self.ask[self.index] = ask
            self.bid[self.index] = bid
            self.askSize[self.index] = askSize
            self.index += 1
            if self.index >= self.size:
                self.is_full = True
                self.index = 0
        else:
            # 使用循环缓冲区模式
            self.datetime[self.index] = datetime
            self.ask[self.index] = ask
            self.bid[self.index] = bid
            self.askSize[self.index] = askSize
            self.index = (self.index + 1) % self.size
            
    def get_latest(self):
        if self.is_full:
            idx = (self.index - 1) % self.size
        else:
            idx = max(0, self.index - 1)
        return (self.datetime[idx], self.ask[idx], self.bid[idx], self.askSize[idx])

# 使用 dataclass 的实现
@dataclass(slots=True)
class MarketDataPoint():
    datetime: np.datetime64
    ask: float
    bid: float
    askSize: int

class DataClassCircularBuffer:
    def __init__(self, size):
        self.size = size
        self.buffer = [MarketDataPoint(
            np.datetime64('1970-01-01'), 0.0, 0.0, 0) for _ in range(size)]
        self.index = 0
        self.is_full = False
        
    def append(self, data):
        datetime, ask, bid, askSize = data
        self.buffer[self.index] = MarketDataPoint(datetime, ask, bid, askSize)
        self.index = (self.index + 1) % self.size
        if not self.is_full and self.index == 0:
            self.is_full = True
            
    def get_latest(self):
        if self.is_full:
            idx = (self.index - 1) % self.size
        else:
            idx = max(0, self.index - 1)
        return self.buffer[idx]

# 使用 Numba 的实现 (正确导入)
try:
    from numba.experimental import jitclass
    from numba import int32, int64, float32, uint16, boolean, types
    
    spec = [
        ('size', int32),
        ('datetime', types.Array(int64, 1, 'C')),  # 使用int64存储datetime64
        ('ask', types.Array(float32, 1, 'C')),
        ('bid', types.Array(float32, 1, 'C')),
        ('askSize', types.Array(uint16, 1, 'C')),
        ('index', int32),
        ('is_full', boolean)
    ]
    
    @jitclass(spec)
    class NumbaCircularBuffer:
        def __init__(self, size):
            self.size = size
            self.datetime = np.zeros(size, dtype=np.int64)
            self.ask = np.zeros(size, dtype=np.float32)
            self.bid = np.zeros(size, dtype=np.float32)
            self.askSize = np.zeros(size, dtype=np.uint16)
            self.index = 0
            self.is_full = False
            
        def append(self, datetime_val, ask, bid, askSize):
            self.datetime[self.index] = datetime_val
            self.ask[self.index] = ask
            self.bid[self.index] = bid
            self.askSize[self.index] = askSize
            self.index = (self.index + 1) % self.size
            if not self.is_full and self.index == 0:
                self.is_full = True
                
        def get_latest(self):
            if self.is_full:
                idx = (self.index - 1) % self.size
            else:
                idx = max(0, self.index - 1)
            return (self.datetime[idx], self.ask[idx], self.bid[idx], self.askSize[idx])
except ImportError:
    print("Numba jitclass not available, skipping NumbaCircularBuffer implementation")

# 使用字典的实现 (可能比 dataclass 更快)
class DictCircularBuffer:
    def __init__(self, size):
        self.size = size
        self.buffer = [{'datetime': np.datetime64('1970-01-01'), 
                         'ask': 0.0, 'bid': 0.0, 'askSize': 0} 
                       for _ in range(size)]
        self.index = 0
        self.is_full = False
        
    def append(self, data):
        datetime, ask, bid, askSize = data
        self.buffer[self.index] = {
            'datetime': datetime, 'ask': ask, 'bid': bid, 'askSize': askSize
        }
        self.index = (self.index + 1) % self.size
        if not self.is_full and self.index == 0:
            self.is_full = True
            
    def get_latest(self):
        if self.is_full:
            idx = (self.index - 1) % self.size
        else:
            idx = max(0, self.index - 1)
        return self.buffer[idx]

# 测试不同实现的性能
def test_performance():
    # 测试参数
    size = 10000
    iterations = 100000
    
    # 创建测试数据
    test_data = [(np.datetime64('now'), 150.5, 150.4, 500) for _ in range(iterations)]
    
    # 测试原始CircularBuffer
    cb = CircularBuffer(size)
    start = time.time()
    for data in test_data:
        cb.append(data)
    print(f"Original CircularBuffer append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = cb.get_latest()
    print(f"Original CircularBuffer access time: {time.time()-start}")
    
    # 测试优化版CircularBuffer
    ocb = OptimizedCircularBuffer(size)
    start = time.time()
    for data in test_data:
        ocb.append(data)
    print(f"Optimized CircularBuffer append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = ocb.get_latest()
    print(f"Optimized CircularBuffer access time: {time.time()-start}")
    
    # 测试DataClass版本
    dcb = DataClassCircularBuffer(size)
    start = time.time()
    for data in test_data:
        dcb.append(data)
    print(f"DataClass CircularBuffer append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = dcb.get_latest()
    print(f"DataClass CircularBuffer access time: {time.time()-start}")
    
    # 测试Dict版本
    dictcb = DictCircularBuffer(size)
    start = time.time()
    for data in test_data:
        dictcb.append(data)
    print(f"Dict CircularBuffer append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = dictcb.get_latest()
    print(f"Dict CircularBuffer access time: {time.time()-start}")
    
    # 测试Numba版本 (如果可用)
    try:
        ncb = NumbaCircularBuffer(size)
        # Numba需要单独处理datetime64，转换为int64
        numba_test_data = [(dt.astype(np.int64), a, b, s) for dt, a, b, s in test_data]
        
        start = time.time()
        for dt, a, b, s in numba_test_data:
            ncb.append(dt, a, b, s)
        print(f"Numba CircularBuffer append time: {time.time()-start}")
        
        start = time.time()
        for _ in range(iterations):
            latest = ncb.get_latest()
        print(f"Numba CircularBuffer access time: {time.time()-start}")
    except NameError:
        print("Numba implementation not available for testing")
    
    # 测试DataFrame
    df = pd.DataFrame(columns=['ask', 'bid', 'askSize'])
    df.index.name = 'datetime'
    
    start = time.time()
    for dt, a, b, s in test_data:
        df.loc[dt] = [a, b, s]
        if len(df) > size:
            df = df.iloc[-size:]
    print(f"DataFrame append time: {time.time()-start}")
    
    start = time.time()
    for _ in range(iterations):
        latest = df.iloc[-1]
    print(f"DataFrame access time: {time.time()-start}")

# 运行测试
test_performance()

Original CircularBuffer append time: 0.13551592826843262
Original CircularBuffer access time: 0.06046795845031738
Optimized CircularBuffer append time: 0.1038358211517334
Optimized CircularBuffer access time: 0.08225893974304199
DataClass CircularBuffer append time: 0.07165694236755371
DataClass CircularBuffer access time: 0.028245210647583008
Dict CircularBuffer append time: 0.10088610649108887
Dict CircularBuffer access time: 0.03517627716064453
Numba CircularBuffer append time: 0.6526598930358887
Numba CircularBuffer access time: 0.4390900135040283
DataFrame append time: 5.1386120319366455
DataFrame access time: 2.458376884460449


In [59]:
dcb = DataClassCircularBuffer(100)
dcb()

TypeError: 'DataClassCircularBuffer' object is not callable