In [None]:
import os
import re
import json
import math
import datetime
from datetime import timezone, timedelta
import asyncio
import logging
import random
from typing import List, Dict, Optional, Tuple
from collections import defaultdict
import asyncpg
import aiohttp
import concurrent.futures
from dotenv import load_dotenv
import requests
import psycopg2
from decimal import Decimal
from typing import Optional, Dict, Any
#from binance.client import Client
import pandas as pd


# 加载 .env 文件
load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 全局缓存锁，防止并发重复插入
cache_lock = asyncio.Lock()

DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
SCHEMA_NAME = os.getenv("DB_SCHEMA")
binance_api_key = os.getenv("binance_api_key")
binance_api_secret = os.getenv("binance_api_secret")

In [None]:
class GPTClient:
    def __init__(self, api_key: str, model: str = "deepseek-chat", company: str = "deepseek"):
        self.api_key = api_key
        self.model = model
        self.company = company
        self.base_urls = {
            'openai': 'https://api.openai.com/v1',
            'deepseek': "https://api.deepseek.com/v1",
            'kimi': "https://api.moonshot.cn/v1",
            'ali': "https://dashscope.aliyuncs.com/compatible-mode/v1"
        }

    def get_url_and_headers(self):
        if self.company not in self.base_urls:
            raise ValueError(f"Unsupported LLM provider: {self.company}")
        url = f"{self.base_urls[self.company]}/chat/completions"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        return url, headers

    async def send_request(self, data: dict) -> dict:
        url, headers = self.get_url_and_headers()
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers, json=data) as resp:
                if resp.status == 200:
                    try:
                        resp_json = await resp.json()
                    except Exception as e:
                        logger.error(f"[LLM] JSON解析错误: {e}")
                        return {}
                    if "choices" not in resp_json or not resp_json["choices"]:
                        logger.error(f"[LLM] 返回结果中没有 choices 字段: {resp_json}")
                        return {}
                    content = resp_json['choices'][0]['message'].get('content')
                    if not content:
                        logger.error(f"[LLM] 返回结果中没有 content 字段: {resp_json}")
                        return {}
                    content = content.strip()
                    try:
                        return json.loads(content)
                    except json.JSONDecodeError as e:
                        logger.error(f"[LLM] JSON解析失败: {e}, 内容: {content}")
                        return {}
                else:
                    error_text = await resp.text()
                    logger.error(f"[LLM] 请求失败: {resp.status}: {error_text}")
                    return {}

    async def classify_attitude(self, text: str, token_symbol: str) -> dict:
        system_prompt = (
            f"你是一个加密货币分析师。请判断用户推文对于代币${token_symbol}是看涨(BULL)、看跌(BEAR)还是中立(NEUTRAL)。你要记住这人是个KOL，发任何东西都会具有很大的影响力，所以中性的推文一般都是在看涨或者在炫耀自己有多厉害以此涨粉。\n"
            "只输出 JSON 对象，例如:\n"
            "{\n"
            '  "attitude": "BULL",\n'
            '  "reason": "这条推文为何看涨"\n'
            "}\n"
            "不要额外字符。"
        ).strip()
        user_prompt = f"推文内容:\n{text}\n请返回上述 JSON 格式（不加其他字符）。"
        data = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            "response_format": {"type": "json_object"}
        }
        result = await self.send_request(data)
        logger.info(f"[GPT] 返回结果: {result}")
        return result

def get_random_gpt_client() -> GPTClient:
    providers = []
    for i in range(1, 11):
        val = os.getenv(f"API_PROVIDER_{i}")
        if val:
            parts = val.split(',')
            if len(parts) >= 3:
                api_key = parts[0].strip()
                model = parts[1].strip()
                company = parts[2].strip()
                providers.append({
                    'api_key': api_key,
                    'model': model,
                    'company': company
                })
            else:
                logger.warning(f"API_PROVIDER_{i} 格式不正确")
    if not providers:
        logger.error("未配置任何 API_PROVIDER")
        raise Exception("未配置任何 API_PROVIDER")
    chosen = random.choice(providers)
    logger.info(f"[GPT] 选用 API_PROVIDER: {chosen['company']} - {chosen['model']}")
    return GPTClient(api_key=chosen['api_key'], model=chosen['model'], company=chosen['company'])