akshare文件的读取，RSI指标的计算，GET接口处理

In [None]:
from flask import Flask, request, jsonify
import akshare as ak
import pandas as pd
import numpy as np
import json

app = Flask(__name__)

def get_im_minute_rsi(code: str, start_time: str, end_time: str) -> dict:
    """
    获取IM期货分钟级RSI信号
    :param code: 合约代码，如"IM0"
    :param start_time: 开始时间，如"2025-12-11 09:30:00"
    :param end_time: 结束时间，如"2025-12-11 10:00:00"
    """
    try:
        # 1. 使用分时行情数据接口
        # 获取1分钟K线数据
        df = ak.futures_zh_minute_sina(symbol=code, period="1")
        
        if df.empty:
            return {"code": 400, "msg": "获取数据失败", "data": []}
        
        # 2. 计算RSI
        close_prices = df['close']
        delta = close_prices.diff()
        gain = delta.where(delta > 0, 0).rolling(14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        rs = gain / (loss + 1e-10)
        df['rsi'] = 100 - (100 / (1 + rs))
        
        # 3. 时间筛选
        df['datetime'] = pd.to_datetime(df['datetime'])
        start_dt = pd.to_datetime(start_time)
        end_dt = pd.to_datetime(end_time)
        
        mask = (df['datetime'] >= start_dt) & (df['datetime'] <= end_dt)
        df_filtered = df[mask]
        
        if df_filtered.empty:
            return {"code": 400, "msg": "无数据", "data": []}
        
        # 4. 生成信号
        data = []
        for _, row in df_filtered.iterrows():
            rsi = row.get('rsi', np.nan)
            data.append({
                "eob": row['datetime'].strftime("%Y-%m-%d %H:%M:%S"),
                "B": False if pd.isna(rsi) else rsi < 30,
                "S": False if pd.isna(rsi) else rsi > 70
            })
        
        return {"code": 200, "msg": "请求成功", "data": data}
    
    except Exception as e:
        return {"code": 500, "msg": f"错误: {str(e)}", "data": []}

@app.route('/get_rsi', methods=['GET'])
def get_rsi():
    """
    GET接口：获取RSI信号
    参数：
    - code: 合约代码（如IM0）
    - start_time: 开始时间
    - end_time: 结束时间
    """
    # 从GET参数中获取数据
    code = request.args.get('code', 'IM0')
    start_time = request.args.get('start_time')
    end_time = request.args.get('end_time')
    
    # 参数校验
    if not start_time or not end_time:
        return jsonify({
            "code": 400,
            "msg": "参数错误：start_time和end_time为必填参数",
            "data": []
        })
    
    # 调用函数获取RSI数据
    result = get_im_minute_rsi(code, start_time, end_time)
    return jsonify(result)

# 测试用例
if __name__ == '__main__':
    app.run(debug=True, port=5000)

#示例URL: http://127.0.0.1:5000/get_rsi?code=IM0&start_time=2025-12-11 13:30:00&end_time=2025-12-11 14:00:00

In [None]:
from flask import Flask, request, jsonify
# 导入Flask框架的核心组件：
# Flask - 创建Web应用的主类
# request - 处理HTTP请求的对象，可以获取参数、请求头等
# jsonify - 将Python字典转换为JSON格式的HTTP响应

import akshare as ak
# 导入akshare库，这是一个获取金融数据的Python库
# 别名为ak，方便使用，比如 ak.futures_zh_minute_sina()

import pandas as pd
# 导入pandas库，别名为pd
# 用于数据处理和分析，特别是DataFrame表格数据

import numpy as np
# 导入numpy库，别名为np
# 用于数值计算，提供数组操作和数学函数

import json
# 导入Python标准库的json模块
# 用于JSON数据的编码和解码

生成实例部分

In [None]:
app = Flask(__name__)
# 创建Flask应用实例
# __name__ 是Python的特殊变量，表示当前模块的名字
# 如果是直接运行这个文件，__name__ 就是 "__main__"
# 如果是被导入，__name__ 就是模块名

In [None]:
def get_im_minute_rsi(code: str, start_time: str, end_time: str) -> dict:
    """
    获取IM期货分钟级RSI信号
    :param code: 合约代码，如"IM0"
    :param start_time: 开始时间，如"2025-12-11 09:30:00"
    :param end_time: 结束时间，如"2025-12-11 10:00:00"
    """

In [None]:
def get_im_minute_rsi(code: str, start_time: str, end_time: str) -> dict:
    """
    获取IM期货分钟级RSI信号
    :param code: 合约代码，如"IM0"
    :param start_time: 开始时间，如"2025-12-11 09:30:00"
    :param end_time: 结束时间，如"2025-12-11 10:00:00"
    """

In [None]:
try:
    # 尝试执行下面的代码
    # 如果发生异常，会被except捕获
    # 1. 使用分时行情数据接口
    # 获取1分钟K线数据
    df = ak.futures_zh_minute_sina(symbol="IM0", period="1")
    # 调用akshare库的期货分钟数据接口
    # symbol=code：传入合约代码
    # period="1"：获取1分钟级别的数据
    if df.empty:
        print({"code": 400, "msg": "获取数据失败", "data": []})
    # 检查DataFrame是否为空
    # df.empty：pandas属性，如果DataFrame没有数据返回True
    # 如果数据为空，返回错误响应
    # 2. 计算RSI
        close_prices = df['close']
        # 从DataFrame中提取'close'列（收盘价）
        
        delta = close_prices.diff()
        # diff()：计算价格差，如 close[1] - close[0]
        # 得到价格变化值
        
        gain = delta.where(delta > 0, 0).rolling(14).mean()
        # 计算涨幅
        # delta.where(delta > 0, 0)：如果delta>0保留原值，否则设为0
        # .rolling(14)：创建14期的滑动窗口
        # .mean()：计算移动平均值
        
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        # 计算跌幅
        # delta.where(delta < 0, 0)：如果delta<0保留原值，否则设为0
        # 前面加负号转为正数
        
        rs = gain / (loss + 1e-10)
        # 计算相对强度RS
        # 1e-10：一个很小的数，防止除零错误
        
        df['rsi'] = 100 - (100 / (1 + rs))
        # 计算RSI值
        # 公式：RSI = 100 - 100 / (1 + RS)
        # 将计算结果添加到DataFrame的新列'rsi'中
except:
    pass