In [30]:
import sqlite3
import adata
import pandas as pd

def init_db():
    """
    初始化SQLite数据库，支持多品种交易。
    创建资产表、价格数据表、交易记录表和账户表。
    返回数据库连接对象。
    """
    # 连接到数据库（如果不存在则创建）
    conn = sqlite3.connect('trading_sim.db')
    c = conn.cursor()
    
    # 创建资产表，用于存储交易品种信息
    c.execute('''
        CREATE TABLE IF NOT EXISTS assets (
            asset_id TEXT PRIMARY KEY,
            asset_name TEXT NOT NULL UNIQUE
        )
    ''')
    
    # 创建价格数据表，支持多品种
    c.execute('''
        CREATE TABLE IF NOT EXISTS price_data (
            date TEXT,
            asset_id TEXT,
            price REAL NOT NULL,
            PRIMARY KEY (date, asset_id),
            FOREIGN KEY (asset_id) REFERENCES assets (asset_id)
        )
    ''')
    
    # 创建交易记录表，添加资产标识
    c.execute('''
        CREATE TABLE IF NOT EXISTS trades (
            trade_id TEXT PRIMARY KEY,
            date TEXT NOT NULL,
            asset_id TEXT NOT NULL,
            action TEXT NOT NULL,
            price REAL NOT NULL,
            quantity REAL NOT NULL,
            fee REAL NOT NULL,
            strategy_id TEXT NOT NULL,
            FOREIGN KEY (asset_id) REFERENCES assets (asset_id)
        )
    ''')
    
    # 创建账户表，添加资产标识
    c.execute('''
        CREATE TABLE IF NOT EXISTS account (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            date TEXT NOT NULL,
            asset_id TEXT NOT NULL,
            cash REAL NOT NULL,
            position REAL NOT NULL,
            net_value REAL NOT NULL,
            strategy_id TEXT NOT NULL,
            FOREIGN KEY (asset_id) REFERENCES assets (asset_id)
        )
    ''')
    
    # 提交更改
    conn.commit()
    return conn



def update_asset(conn, assets):
    """
    更新或添加资产数据到assets表，支持批量操作。
    
    参数：
        conn: SQLite数据库连接对象
        assets: 资产数据列表，每个元素为元组 (asset_id, asset_name)
    
    返回：
        dict: 操作结果，包含成功和失败的资产信息
              {'success': [(asset_id, asset_name), ...], 'failed': [(asset_id, asset_name, error), ...]}
    """
    c = conn.cursor()
    result = {'success': [], 'failed': []}
    
    for asset_id, asset_name in assets:
        try:
            # 检查资产是否存在
            c.execute('SELECT asset_name FROM assets WHERE asset_id = ?', (asset_id,))
            existing_asset = c.fetchone()
            
            if existing_asset:
                # 更新现有资产名称
                c.execute('UPDATE assets SET asset_name = ? WHERE asset_id = ?', (asset_name, asset_id))
                result['success'].append((asset_id, asset_name))
                print(f"Updated asset {asset_id} with new name: {asset_name}")
            else:
                # 插入新资产
                c.execute('INSERT INTO assets (asset_id, asset_name) VALUES (?, ?)', (asset_id, asset_name))
                result['success'].append((asset_id, asset_name))
                print(f"Added new asset {asset_id}: {asset_name}")
        
        except sqlite3.IntegrityError as e:
            # 处理唯一约束冲突（如asset_name重复）
            result['failed'].append((asset_id, asset_name, f"IntegrityError: {e}"))
            print(f"Failed to process {asset_id}: {e} - Asset name '{asset_name}' may already exist.")
        except Exception as e:
            # 捕获其他异常
            result['failed'].append((asset_id, asset_name, f"Error: {e}"))
            print(f"Failed to process {asset_id}: {e}")
    
    try:
        # 提交所有成功操作
        conn.commit()
    except Exception as e:
        # 如果提交失败，回滚所有操作
        conn.rollback()
        print(f"Commit failed: {e}")
        # 将成功列表移动到失败列表
        for asset in result['success']:
            result['failed'].append((asset[0], asset[1], f"Commit failed: {e}"))
        result['success'] = []
    
    return result


def update_price_data(conn, price_df):
    """
    更新或添加资产价格数据到price_data表，使用DataFrame输入。
    
    参数：
        conn: SQLite数据库连接对象
        price_df: pandas DataFrame，包含列 ['fund_code', 'trade_date', 'close', ...]
                  使用 fund_code 作为 asset_id，trade_date 作为 date，close 作为 price
    
    返回：
        dict: 操作结果，包含成功和失败的价格记录
              {'success': [(date, asset_id, price), ...], 'failed': [(date, asset_id, price, error), ...]}
    """
    c = conn.cursor()
    result = {'success': [], 'failed': []}
    
    # 验证DataFrame
    required_columns = ['fund_code', 'trade_date', 'close']
    if not all(col in price_df.columns for col in required_columns):
        raise ValueError(f"DataFrame must contain columns: {required_columns}")
    
    # 选择并重命名所需列
    price_df = price_df[required_columns].copy()
    price_df = price_df.rename(columns={
        'fund_code': 'asset_id',
        'trade_date': 'date',
        'close': 'price'
    })
    
    # 确保数据类型正确
    price_df['date'] = price_df['date'].astype(str)
    price_df['asset_id'] = price_df['asset_id'].astype(str)
    price_df['price'] = price_df['price'].astype(float)
    
    for _, row in price_df.iterrows():
        date, asset_id, price = row['date'], row['asset_id'], row['price']
        try:
            # 验证价格有效性
            if price <= 0:
                raise ValueError("Price must be positive")
            
            # 检查资产是否存在
            c.execute('SELECT asset_id FROM assets WHERE asset_id = ?', (asset_id,))
            if not c.fetchone():
                raise ValueError(f"Asset {asset_id} does not exist in assets table")
            
            # 尝试插入或更新价格数据
            c.execute('''
                INSERT OR REPLACE INTO price_data (date, asset_id, price)
                VALUES (?, ?, ?)
            ''', (date, asset_id, price))
    
        
        except (sqlite3.IntegrityError, ValueError) as e:
            # 处理约束冲突或无效数据
            result['failed'].append((date, asset_id, price, f"Error: {e}"))
            print(f"Failed to process price for {asset_id} on {date}: {e}")
        except Exception as e:
            # 捕获其他异常
            result['failed'].append((date, asset_id, price, f"Error: {e}"))
            print(f"Failed to process price for {asset_id} on {date}: {e}")
    
    try:
        # 提交所有成功操作
        conn.commit()
    except Exception as e:
        # 如果提交失败，回滚所有操作
        conn.rollback()
        print(f"Commit failed: {e}")

def update_all_assets_prices(conn, start_date='2024-01-01', end_date='2025-04-01', k_type=1):
    """
    从assets表获取所有资产，批量获取市场数据并更新price_data表。
    
    参数：
        conn: SQLite数据库连接对象
        start_date: 数据开始日期（格式：YYYY-MM-DD）
        end_date: 数据结束日期（格式：YYYY-MM-DD）
        k_type: K线类型（默认为1，表示日K线）
    
    返回：
        dict: 操作结果，包含成功和失败的价格记录
              {'success': [(date, asset_id, price), ...], 'failed': [(date, asset_id, price, error), ...]}
    """
    c = conn.cursor()
    
    # 获取所有资产
    c.execute('SELECT asset_id FROM assets')
    assets = [row[0] for row in c.fetchall()]
    if not assets:
        print("No assets found in assets table.")
        return {'success': [], 'failed': [('N/A', 'N/A', 0, 'No assets found')]}
    
    # 收集所有资产的价格数据
    all_price_data = []
    for asset_id in assets:
        try:
            # 替换为实际的 adata.fund.market.get_market_etf 调用
            etf = adata.fund.market.get_market_etf(asset_id, start_date='2024-01-01', end_date='2025-04-01', k_type=1)
            
            if etf.empty:
                print(f"No data returned for asset {asset_id}")
                continue
                
            all_price_data.append(etf)
            print(f"Fetched data for {asset_id}: {len(etf)} records")
        
        except Exception as e:
            print(f"Failed to fetch data for {asset_id}: {e}")
            continue
    
    if not all_price_data:
        print("No price data fetched for any assets.")
        return {'success': [], 'failed': [('N/A', 'N/A', 0, 'No price data fetched')]}
    
    # 合并所有数据
    combined_price_data = pd.concat(all_price_data, ignore_index=True)
    
    # 更新价格数据
    result = update_price_data(conn, combined_price_data)
    return result

In [31]:
connect = init_db()
c = connect.cursor()

assets =  [
    ('561300', '沪深300增强'),
    ('159726', '恒生高股息'),
    ('515100', '红利低波'),
    ('513500', '标普500'),
    ('161119', '易方达新综债LOF'),
    ('518880', '黄金ETF'),
    ('164824', '印度基金LOF'),
    ('159985', '豆粕ETF'),
    ('513330', '恒生互联网')
]

update_asset(connect, assets)

Updated asset 561300 with new name: 沪深300增强
Updated asset 159726 with new name: 恒生高股息
Updated asset 515100 with new name: 红利低波
Updated asset 513500 with new name: 标普500
Updated asset 161119 with new name: 易方达新综债LOF
Updated asset 518880 with new name: 黄金ETF
Updated asset 164824 with new name: 印度基金LOF
Updated asset 159985 with new name: 豆粕ETF
Updated asset 513330 with new name: 恒生互联网


{'success': [('561300', '沪深300增强'),
  ('159726', '恒生高股息'),
  ('515100', '红利低波'),
  ('513500', '标普500'),
  ('161119', '易方达新综债LOF'),
  ('518880', '黄金ETF'),
  ('164824', '印度基金LOF'),
  ('159985', '豆粕ETF'),
  ('513330', '恒生互联网')],
 'failed': []}

In [32]:
update_all_assets_prices(connect)

Fetched data for 159726: 300 records
Fetched data for 159985: 300 records
Fetched data for 161119: 300 records
Fetched data for 164824: 300 records
Fetched data for 513330: 300 records
Fetched data for 513500: 300 records
Fetched data for 515100: 300 records
Fetched data for 518880: 300 records
Fetched data for 561300: 300 records


In [None]:
def run_simulation(conn, strategy_id, strategy_func, initial_cash=10000, start_date='2024-01-01', end_date='2024-12-31', fee_rate=0.002, period='daily'):
    """
    运行交易模拟，根据策略函数的权重进行净值结算和交易记录。
    
    参数：
        conn: SQLite数据库连接对象
        strategy_id: 策略唯一标识
        strategy_func: 策略函数，返回DataFrame {'asset_id': weight}
        initial_cash: 初始现金
        start_date, end_date: 模拟日期范围
        fee_rate: 每笔交易手续费率
        period: 交易周期（'daily' 或 'weekly'）
    
    返回：
        None（更新account和trades表）
    """
    c = conn.cursor()
    
    # 初始化账户
    c.execute('DELETE FROM account WHERE strategy_id = ?', (strategy_id,))
    c.execute('DELETE FROM trades WHERE strategy_id = ?', (strategy_id,))
    
    # 获取所有资产
    c.execute('SELECT asset_id FROM assets')
    asset_ids = [row[0] for row in c.fetchall()]
    
    # 初始化账户状态
    account = {
        'cash': initial_cash,
        'positions': {asset_id: 0 for asset_id in asset_ids},
        'net_value': initial_cash
    }
    
    # 获取交易日期
    c.execute('SELECT DISTINCT date FROM price_data WHERE date >= ? AND date <= ? ORDER BY date', (start_date, end_date))
    trading_dates = [row[0] for row in c.fetchall()]
    
    for i, date in enumerate(trading_dates):
        # 根据周期决定是否执行
        should_execute = False
        if period == 'daily':
            should_execute = True
        elif period == 'weekly' and i % 5 == 0:
            should_execute = True
            
        if not should_execute:
            continue
        
        # 获取当前价格
        c.execute('SELECT asset_id, price FROM price_data WHERE date = ?', (date,))
        current_prices = dict(c.fetchall())
        if not current_prices:
            continue
        
        # 调用策略函数获取新权重
        weights_df = strategy_func(conn, date)
        if weights_df.empty:
            continue
        
        # 计算目标持仓
        total_value = account['cash'] + sum(account['positions'][aid] * current_prices.get(aid, 0) for aid in asset_ids)
        target_positions = {}
        for _, row in weights_df.iterrows():
            asset_id = row['asset_id']
            weight = row['weight']
            if asset_id in current_prices:
                target_value = total_value * weight
                target_positions[asset_id] = target_value / current_prices[asset_id]
        
        # 执行交易以匹配目标持仓
        for asset_id in asset_ids:
            current_qty = account['positions'].get(asset_id, 0)
            target_qty = target_positions.get(asset_id, 0)
            qty_diff = target_qty - current_qty
            
            if abs(qty_diff) < 1e-6:  # 忽略微小差异
                continue
            
            price = current_prices.get(asset_id)
            if not price:
                continue
                
            trade_id = str(uuid.uuid4())
            fee = abs(qty_diff * price * fee_rate)
            
            if qty_diff > 0 and account['cash'] >= qty_diff * price + fee:
                # 买入
                account['cash'] -= qty_diff * price + fee
                account['positions'][asset_id] += qty_diff
                c.execute('''
                    INSERT INTO trades (trade_id, date, asset_id, action, price, quantity, fee, strategy_id)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (trade_id, date, asset_id, 'buy', price, qty_diff, fee, strategy_id))
            
            elif qty_diff < 0 and current_qty >= abs(qty_diff):
                # 卖出
                account['cash'] += abs(qty_diff) * price - fee
                account['positions'][asset_id] -= abs(qty_diff)
                c.execute('''
                    INSERT INTO trades (trade_id, date, asset_id, action, price, quantity, fee, strategy_id)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (trade_id, date, asset_id, 'sell', price, abs(qty_diff), fee, strategy_id))
        
        # 更新账户净值
        net_value = account['cash'] + sum(account['positions'][aid] * current_prices.get(aid, 0) for aid in asset_ids)
        for asset_id in asset_ids:
            c.execute('''
                INSERT INTO account (date, asset_id, cash, position, net_value, strategy_id)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (date, asset_id, account['cash'], account['positions'][asset_id], net_value, strategy_id))
        
        account['net_value'] = net_value
    
    conn.commit()