In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from scipy.interpolate import interp1d
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.init import xavier_uniform_
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from tqdm import tqdm
import pandas as pd
import pandas_market_calendars as mcal

In [None]:
device = torch.device('cuda:3' if torch.cuda.is_available() else 'cpu')
print(f"使用设备：{device}")

# Labeling

In [None]:
import os
import pandas as pd

# 1. 读取主数据和锚点文件
df = pd.read_feather('test_data.feather')
anchors = pd.read_feather('trading_days_anchor_2001_2019.feather')
df['date'] = pd.to_datetime(df['date'])
anchors['date'] = pd.to_datetime(anchors['date'])

# 2. 合并锚点信息
df = df.merge(
    anchors[['date', 'anchor_5', 'anchor_20', 'anchor_60']],
    on=['date'],
    how='left'
).sort_values(['id', 'date']).reset_index(drop=True)

# 3. 定义每种窗口的配置：窗口长度 / 锚点列 / 输出路径 / 输出标签文件
configs = [
    (5,  'anchor_5',  './charts_test/5d_charts',  './labels_test/image_labels_i5.feather'),
    (20, 'anchor_20', './charts_test/20d_charts', './labels_test/image_labels_i20.feather'),
    (60, 'anchor_60', './charts_test/60d_charts', './labels_test/image_labels_i60.feather'),
]

for window, anchor_col, img_dir, out_feather in configs:
    os.makedirs(img_dir, exist_ok=True)
    os.makedirs(os.path.dirname(out_feather), exist_ok=True)

    image_labels = []

    for id_val, grp in df.groupby('id'):
        grp = grp.reset_index(drop=True)
        # 找到所有锚点行索引
        idxs = grp.index[grp[anchor_col] == 1.0].tolist()

        for idx in idxs:
            # 前面必须有 window-1 条数据
            if idx >= window - 1:
                win = grp.iloc[idx - (window - 1): idx + 1]

                # 计算三个 horizon 的 label：看窗口末期的 ret_Xd 是否 > 0
                label_5  = int(win['ret_5d'].iloc[-1]  > 0)
                label_20 = int(win['ret_20d'].iloc[-1] > 0)
                label_60 = int(win['ret_60d'].iloc[-1] > 0)

                # 锚点日期（窗口最后一天）
                anchor_date = pd.to_datetime(win['date'].iloc[-1]).strftime('%Y%m%d')

                # 对应的图像路径
                image_path = os.path.join(img_dir, f'id_{id_val}_{anchor_date}.png')

                if os.path.exists(image_path):
                    image_labels.append({
                        'image_path': image_path,
                        'id':         id_val,
                        'date':       win['date'].iloc[-1],
                        'label_5':    label_5,
                        'label_20':   label_20,
                        'label_60':   label_60,
                    })

    # 保存标签
    labels_df = pd.DataFrame(image_labels)
    labels_df.to_feather(out_feather)
    print(f"{window}d 图像标签已生成并保存到 {out_feather}")


# Testing

In [None]:
class CNN5DModel(nn.Module):
    def __init__(self):
        super(CNN5DModel, self).__init__()
  
        self.block1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(5, 3), stride=(1, 1), dilation=(1, 1), padding=(2, 1)),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))  # 输出尺寸: (64, 16, 15)
        )

        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1)),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1)),
            nn.Flatten(),
            nn.Dropout(0.5)  # 在全连接层前应用Dropout
        )
        
        # 全连接层
        self.fc = nn.Linear(15360, 2)
        
        # 初始化权重
        self._initialize_weights()
        
        self.model = nn.Sequential(self.block1, self.block2, self.fc)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.model(x)


class CNN20DModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 卷积块1
        self.block1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(5, 3), stride=(3, 1), dilation=(2, 1), padding=(3, 1)),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d((2, 1))
        )
        
        # 卷积块2
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(5, 3), padding=(2, 1)),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d((2, 1))
        )
        
        # 卷积块3（包含Flatten和Dropout）
        self.block3 = nn.Sequential(
            nn.Conv2d(128, 256, (5, 3), padding=(3, 1)),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d((2, 1)),
            nn.Flatten(),
            nn.Dropout(p=0.5)  # 全连接层前的Dropout
        )
        
        # 全连接层
        self.fc = nn.Linear(46080, 2)
        
        # 初始化权重
        self._initialize_weights()
        
        self.model = nn.Sequential(
            self.block1,
            self.block2,
            self.block3,
            self.fc
        )

    def _initialize_weights(self):
        """应用Xavier初始化"""
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.model(x)


class CNN60DModel(nn.Module):
    def __init__(self):
        super(CNN60DModel, self).__init__()
        
        # 第一个 CNN 构建模块
        self.block1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1), dilation=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))  # 高度减半至48，宽度保持180
        )

        # 第二个 CNN 构建模块
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1)),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))  # 高度减半至24，宽度保持180
        )

        # 第三个 CNN 构建模块
        self.block3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1)),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))  # 高度减半至12，宽度保持180
        )

        # 第四个 CNN 构建模块（关键修改）
        self.block4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1)),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(negative_slope=0.01, inplace=True),
            nn.MaxPool2d(kernel_size=(2, 3), stride=(2, 3)),
            nn.Flatten(),
            nn.Dropout(p=0.5)  # 全连接层前应用Dropout
        )

        # 全连接层
        self.fc = nn.Linear(184320, 2)
        
        # 初始化权重
        self._initialize_weights()
        
        self.model = nn.Sequential(
            self.block1,
            self.block2,
            self.block3,
            self.block4,
            self.fc
        )

    def _initialize_weights(self):
        """应用Xavier初始化"""
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.model(x)

In [None]:
model = CNN5DModel()
model = model.to(device)  # 将模型移动到 GPUc
model.eval()  # 设置模型为评估模式

In [None]:
# 加载训练好的模型
model = CNN5DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i5r5.pth'))
model = model.to(device)  # 将模型移动到 GPUc
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i5.feather',  # 新数据集的标签文件
    img_dir='./charts_test/5d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i5.feather')
new_df['pre_label_5'] = new_df['image_path'].map(predictions)
new_df['5_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['5_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_5'] == new_df['label_5']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i5.feather')

In [None]:
df_finished = pd.read_feather('./labels_test/image_labels_i5.feather')
df_finished

In [None]:
# 加载训练好的模型
model = CNN5DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i5r20.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i5.feather',  # 新数据集的标签文件
    img_dir='./charts_test/5d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i5.feather')
new_df['pre_label_20'] = new_df['image_path'].map(predictions)
new_df['20_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['20_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_20'] == new_df['label_20']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i5.feather')

In [None]:
# 加载训练好的模型
model = CNN5DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i5r60.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i5.feather',  # 新数据集的标签文件
    img_dir='./charts_test/5d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i5.feather')
new_df['pre_label_60'] = new_df['image_path'].map(predictions)
new_df['60_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['60_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_60'] == new_df['label_60']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i5.feather')

In [None]:
# 加载训练好的模型
model = CNN20DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i20r5.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i20.feather',  # 新数据集的标签文件
    img_dir='./charts_test/20d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i20.feather')
new_df['pre_label_5'] = new_df['image_path'].map(predictions)
new_df['5_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['5_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_5'] == new_df['label_5']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i20.feather')

In [None]:
# 加载训练好的模型
model = CNN20DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i20r20.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i20.feather',  # 新数据集的标签文件
    img_dir='./charts_test/20d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i20.feather')
new_df['pre_label_20'] = new_df['image_path'].map(predictions)
new_df['20_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['20_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_20'] == new_df['label_20']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i20.feather')

In [None]:
# 加载训练好的模型
model = CNN20DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i20r60.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i20.feather',  # 新数据集的标签文件
    img_dir='./charts_test/20d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i20.feather')
new_df['pre_label_60'] = new_df['image_path'].map(predictions)
new_df['60_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['60_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_60'] == new_df['label_60']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i20.feather')

In [None]:
# 加载训练好的模型
model = CNN60DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i60r5.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i60.feather',  # 新数据集的标签文件
    img_dir='./charts_test/60d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i60.feather')
new_df['pre_label_5'] = new_df['image_path'].map(predictions)
new_df['5_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['5_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_5'] == new_df['label_5']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i60.feather')

In [None]:
# 加载训练好的模型
model = CNN60DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i60r20.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i60.feather',  # 新数据集的标签文件
    img_dir='./charts_test/60d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i60.feather')
new_df['pre_label_20'] = new_df['image_path'].map(predictions)
new_df['20_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['20_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_20'] == new_df['label_20']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i60.feather')

In [None]:
# 加载训练好的模型
model = CNN60DModel()
model.load_state_dict(torch.load('./labels_train/best_model_i60r60.pth'))
model = model.to(device)  # 将模型移动到 GPU
model.eval()  # 设置模型为评估模式

# 定义新的数据集类
class StockImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_frame = pd.read_feather(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels_frame.iloc[idx]['image_path'].split('/')[-1])
        image = Image.open(img_path)  # 读取图像
        if self.transform:
            image = self.transform(image)
        return image, img_path  # 返回图像和路径

# 数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建新的数据集和数据加载器
dataset = StockImageDataset(
    csv_file='./labels_test/image_labels_i60.feather',  # 新数据集的标签文件
    img_dir='./charts_test/60d_charts',  # 新数据集的图像文件夹
    transform=transform
)

new_loader = DataLoader(dataset, batch_size=128, shuffle=False)

# 在新数据集上生成预测标签和概率
predictions = {}
probabilities_0 = {}  # 存储类别 0 的概率
probabilities_1 = {}  # 存储类别 1 的概率

with torch.no_grad():  # 禁用梯度计算
    for data, paths in tqdm(new_loader, desc="Generating Predictions"):
        data = data.to(device)
        output = model(data)  # 模型输出，形状为 [batch_size, num_classes]
        probs = torch.softmax(output, dim=1)  # 使用 softmax 将输出转换为概率
        pred = output.argmax(dim=1).cpu().numpy()  # 获取预测标签

        for path, p, prob in zip(paths, pred, probs.cpu().numpy()):
            predictions[path] = int(p)
            probabilities_0[path] = prob[0]  # 类别 0 的概率
            probabilities_1[path] = prob[1]  # 类别 1 的概率

# 将预测结果和概率添加到原始标签文件
new_df = pd.read_feather('./labels_test/image_labels_i60.feather')
new_df['pre_label_60'] = new_df['image_path'].map(predictions)
new_df['60_prob_0'] = new_df['image_path'].map(probabilities_0)  # 类别 0 的概率
new_df['60_prob_1'] = new_df['image_path'].map(probabilities_1)  # 类别 1 的概率

# 计算预测正确率
correct_predictions = (new_df['pre_label_60'] == new_df['label_60']).sum()
total_predictions = len(new_df)
accuracy = correct_predictions / total_predictions

print(f"预测完成，正确率为：{accuracy:.2%}")

# 覆盖保存到原文件
new_df.to_feather('./labels_test/image_labels_i60.feather')