In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [2]:
sheet = pd.read_excel('data_input/WTI_季度.xlsx', sheet_name='Sheet1').rename(columns={'DataTime': 'Date'})
sheet.set_index('Date', inplace=True) 
sheet = sheet.reset_index().rename(columns={'index': 'Date'})

df1 = pd.ExcelFile('eta/Wti_月度_映射残差_去库幅度_合并数据.xlsx').parse('Sheet1')
df1['Date'] = pd.to_datetime(df1['Date'], errors='coerce')
df1 = df1.sort_values('Date', ascending=True)
df1 = df1.drop(columns=['实际值'])
df1 = df1.dropna() 
df1.head(1)

# 根据日期筛选数据
df2 = sheet[sheet['Date'].isin(df1['Date'])]
df2.head(1)

Unnamed: 0,Date,WTI原油期货价格/月度/3MMA,EIA能源月报统计全球石油去库幅度/3MMA超季节性/3年
405,2025-05-31,63.6596,-0.1355


In [3]:
import pandas as pd
from typing import Tuple, Union, Optional, Literal

# ========== 工具：把“两列 DataFrame”转成 Series ==========
def _to_series(
    df_or_ser: Union[pd.DataFrame, pd.Series],
    *,
    date_col: str = "date",
    value_col: str = "value",
    name: str = "value",
) -> pd.Series:
    """
    把形如 [date, value] 的 DataFrame ⇒ Series；Series 保持原样
    """
    if isinstance(df_or_ser, pd.Series):
        return df_or_ser.rename(name)

    if not {date_col, value_col}.issubset(df_or_ser.columns):
        raise KeyError(f"DataFrame 必须包含列: {date_col}, {value_col}")

    ser = df_or_ser[[date_col, value_col]].copy()
    ser[date_col] = pd.to_datetime(ser[date_col])
    ser = ser.set_index(date_col)[value_col].astype(float).rename(name)
    return ser


# ========== 1. 计算映射 & 残差 ==========
def mapping_residual(
    y_df: Union[pd.DataFrame, pd.Series],
    x_df: Union[pd.DataFrame, pd.Series],
    y_axis: Tuple[float, float],
    x_axis: Tuple[float, float],
    lead: Union[int, pd.DateOffset] = 0,
    *,
    freq: Optional[str] = None,
    fill_method: Optional[Literal["ffill", "bfill"]] = None,
    param_method: Literal["axis", "ols"] = "axis",
    date_col: str = "date",
    value_col: str = "value",
) -> pd.DataFrame:
    """
    两列 [date, value] DataFrame 或 Series ➜ 映射值 / 残差
    其它参数说明见上一版文档
    """
    y = _to_series(y_df, date_col=date_col, value_col=value_col, name="Y")
    x = _to_series(x_df, date_col=date_col, value_col=value_col, name="X")

    if freq:
        y = y.asfreq(freq)
        x = x.asfreq(freq)
        if fill_method:
            y = y.fillna(method=fill_method)
            x = x.fillna(method=fill_method)

    # ---- 领先处理 ----
    if isinstance(lead, pd.DateOffset):
        x_shift = x.copy()
        x_shift.index = x_shift.index + lead
    else:
        x_shift = x.shift(lead)

    common_idx = y.index.intersection(x_shift.index)
    y, x_shift = y.loc[common_idx], x_shift.loc[common_idx]

    # ---- 求 a, b ----
    if param_method == "axis":
        (L1, L2), (R1, R2) = y_axis, x_axis
        a = (L2 - L1) / (R2 - R1)
        b = L2 - a * R2
    else:  # OLS
        import numpy as np

        mask = ~(y.isna() | x_shift.isna())
        X_ = np.vstack([x_shift[mask].values, np.ones(mask.sum())]).T
        a, b = np.linalg.lstsq(X_, y[mask].values, rcond=None)[0]

    # ---- 计算映射 & 残差 ----
    y_map = a * x_shift + b
    residual = y - y_map

    out = pd.DataFrame({"Y": y, "X_shift": x_shift, "Y_map": y_map, "residual": residual})
    out.attrs.update(
        {
            "slope": a,
            "intercept": b,
            "lead": lead,
            "freq": freq or y.index.inferred_freq,
            "param_method": param_method,
            "start": y.index.min(),
            "end": y.index.max(),
        }
    )
    return out


# ========== 2. 还原未来 Ŷ ==========
def restore_y(
    x_future: Union[pd.DataFrame, pd.Series, float],
    residual_pred: Union[pd.DataFrame, pd.Series, float],
    y_axis: Tuple[float, float],
    x_axis: Tuple[float, float],
    *,
    align: Literal["union", "left", "right"] = "union",
    date_col: str = "date",
    value_col: str = "value",
) -> Union[pd.Series, float]:
    """
    两列 DataFrame / Series / 标量 ➜ Ŷ
    其它参数说明见上一版文档
    """
    # ---- 若为标量，直接返回 ----
    if isinstance(x_future, (int, float)) and isinstance(residual_pred, (int, float)):
        (L1, L2), (R1, R2) = y_axis, x_axis
        a = (L2 - L1) / (R2 - R1)
        b = L2 - a * R2
        return a * float(x_future) + b + float(residual_pred)

    # ---- 转 Series，保持索引 ----
    x_series = _to_series(x_future, date_col=date_col, value_col=value_col, name="X_future")
    r_series = _to_series(residual_pred, date_col=date_col, value_col=value_col, name="residual_pred")

    # ---- 对齐索引 ----
    if align == "union":
        idx = x_series.index.union(r_series.index)
    elif align == "left":
        idx = x_series.index
    else:  # 'right'
        idx = r_series.index

    X_aligned = x_series.reindex(idx)
    R_aligned = r_series.reindex(idx)

    # ---- 线性系数 ----
    (L1, L2), (R1, R2) = y_axis, x_axis
    a = (L2 - L1) / (R2 - R1)
    b = L2 - a * R2

    return a * X_aligned + b + R_aligned


In [4]:
from typing import Sequence, Iterable, Union
import numpy as np

def restore_y_batch(
    x_values:     Union[Sequence[float], np.ndarray],
    residuals:    Union[Sequence[float], np.ndarray],
    y_axis:       tuple[float, float],
    x_axis:       tuple[float, float],
    *,
    return_type:  str = "list",          # "list" | "numpy"
):
    """
    批量还原 Ŷ = a·X + b + ε̂

    Parameters
    ----------
    x_values : Sequence[float] or np.ndarray
        多个映射指标值 (X) ，长度 = n
    residuals : Sequence[float] or np.ndarray
        对应时间点的映射残差 ε̂，长度必须与 x_values 相同
    y_axis : (L1, L2)
        目标指标 Y 在图表上的轴端点
    x_axis : (R1, R2)
        映射指标 X 在图表上的轴端点
    return_type : "list" | "numpy", default "list"
        选择返回 Python list 还是 NumPy array

    Returns
    -------
    list[float] or np.ndarray
        逐点还原得到的 Ŷ
    """
    # —— 斜率 & 截距 ——
    L1, L2 = y_axis
    R1, R2 = x_axis
    a = (L2 - L1) / (R2 - R1)
    b = L2 - a * R2

    # —— 输入校验 ——
    if len(x_values) != len(residuals):
        raise ValueError("x_values 与 residuals 长度不一致。")

    # —— 计算 ——
    if isinstance(x_values, np.ndarray) or isinstance(residuals, np.ndarray):
        # 使用 NumPy 向量化
        x_arr = np.asarray(x_values, dtype=float)
        res_arr = np.asarray(residuals, dtype=float)
        y_hat = a * x_arr + b + res_arr
        return y_hat if return_type == "numpy" else y_hat.tolist()
    else:
        # 纯 Python 列表推导
        return [a * x + b + r for x, r in zip(x_values, residuals)]


In [5]:
# 1️⃣ 传入 Python 列表
x_vals = df2['EIA能源月报统计全球石油去库幅度/3MMA超季节性/3年']
res_vals = df1['WTI原油期货价格/月度/3MMA映射残差/EIA能源月报统计全球石油去库幅度/3MMA超季节性/3年（领先2月）']

y_preds = restore_y_batch(
    x_values   = x_vals,
    residuals  = res_vals,
    y_axis     = (60, 110),
    x_axis     = (-2, 3),
    return_type= "list",      # 默认；想要 np.ndarray 就写 "numpy"
)
print(y_preds)   # [78.0, 82.0, 77.0]  (示例)

[58.705, 55.134, 52.64500000000001]
