设计思路：
Student类：负责加载和标准化单个文件的数据，返回一个dataframe
main函数：遍历所有文件，将ID和NAME添加到dataframe，纵向合并所有frame到一个大的frame

In [10]:
import pandas as pd
import os
import re

In [11]:
stock_book = pd.read_excel("股票.xlsx", dtype=str)
stock_book

Unnamed: 0,stock,code
0,中公教育,2607
1,天宜新材,688033
2,润和软件,300339
3,正丹股份,300641
4,科力装备,301552
5,软通动力,301236
6,通化东宝,600867
7,长城军工,601606
8,浪潮信息,977
9,京东方A,725


In [12]:
rollbook = pd.read_excel('点名册.xlsx')
all_files = set([f for f in os.listdir('预测汇总/') if f.endswith('.xlsx')])
rollbook_files = set([f"期中考试_{row['name']}_{row['id']}.xlsx" for _, row in rollbook.iterrows()])
print(f'未提交人员：{rollbook_files - all_files}')
print(f'提交了但不在点名册上的人员：{all_files - rollbook_files}')

未提交人员：{'期中考试_余晓晨_36020251155199.xlsx', '期中考试_陈颖_15420251152758.xlsx'}
提交了但不在点名册上的人员：set()


In [13]:
col_map = {
    'date': ['交易日期', 'date','日期'], 
    # 'stock': ['股票名称', '公司名称', '股票简称'],
    'code': ['股票代码', 'code', 'stock_code'], 
    'prediction': ['rise', '涨跌', 'Unnamed: 4', '预测涨跌', '预测结果', '预测涨跌结果', 'RISE', '涨跌（1/0）', '涨跌情况', '涨跌（1涨0跌）', '涨(1)跌(0)', '涨跌（涨=1/跌=0）', 'Rise', '涨1/跌0', 'pred_up', '数值预测', '涨=1/跌=0', '是否上涨', 'rise1', '涨跌预测'], 
    }

In [14]:
class Student:
    def __init__(self, name, id):
        self.name = name
        self.id = str(id)

    def _process_stock_name(self, df):
        code_series = df['code'].astype(str)
        # 删除所有英文字母和点号
        code_series = code_series.str.replace(r'[a-zA-Z\.]', '', regex=True)
        # 删除空白字符
        code_series = code_series.str.strip()
        # 在左侧补0至6位
        df['code'] = code_series.str.zfill(6)
        return df

    def _process_trading_date(self, df):
        """
        处理交易日期，统一转换为YYYY-MM-DD格式
        支持多种输入格式：
        - Excel序列号（如45978）
        - 时间戳字符串（如2025-11-17 00:00:00）
        - 其他常见日期格式
        """
        # 确保date列是字符串类型，便于统一处理
        df['date'] = df['date'].astype(str)
        
        def convert_date(date_str):
            try:
                # 首先尝试判断是否为Excel序列号（纯数字，且数值较大）
                if date_str.isdigit() and len(date_str) >= 4:
                    # Excel日期序列号（1900日期系统）
                    excel_serial = int(date_str)
                    # Excel的基准日期是1899-12-30
                    base_date = pd.Timestamp('1899-12-30')
                    result_date = base_date + pd.Timedelta(days=excel_serial)
                    return result_date.strftime('%Y-%m-%d')
                date_str_clean = date_str.split()[0].strip()
                for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d', '%Y.%m.%d',
                        '%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S']:
                    try:
                        parsed_date = pd.to_datetime(date_str_clean, format=fmt, errors='raise')
                        return parsed_date.strftime('%Y-%m-%d')
                    except:
                        continue
                parsed_date = pd.to_datetime(date_str_clean, errors='coerce')
                if pd.notna(parsed_date):
                    return parsed_date.strftime('%Y-%m-%d')
                return pd.NaT
            except Exception:
                return pd.NaT
        
        df['date'] = df['date'].apply(convert_date)
        df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.date
        return df

    def load_prediction(self):
        file_path = f'预测汇总/期中考试_{self.name}_{self.id}.xlsx'
        try:
            df = pd.read_excel(file_path, dtype={col: str for col in col_map['code']})
        except Exception as e:
            print('文件读取失败')
            df = pd.read_excel('blank.xlsx', dtype={col: str for col in col_map['code']})

        rename_dict = {
            old: new 
            for new, olds in col_map.items()
            for old in olds
        }
        df = df.rename(columns=rename_dict)
        df['id'] = self.id
        df['name'] = self.name

        # 标准化code列
        df = self._process_stock_name(df)
        df = self._process_trading_date(df)

        # 3. 检查必需列是否都存在
        required_cols = ['date', 'code', 'prediction']
        missing = [col for col in required_cols if col not in df.columns]

        if missing:
            print(f"[警告] {self.id} {self.name} 中缺少或未成功映射的列：{missing}")
            # 给缺失列补上一列空值，防止后面选列时报错
            for col in missing:
                df[col] = pd.NA

        return df[['id', 'name', 'date', 'code', 'prediction']]


In [15]:
all_data_frames = []
for index, row in rollbook.iterrows():
    student = Student(row['name'], row['id'])
    df = student.load_prediction()
    all_data_frames.append(df)
    
final_df = pd.concat(all_data_frames, ignore_index=True)
final_df = pd.merge(
    final_df,
    stock_book[['stock', 'code']],
    on='code',
    how='left'
)

文件读取失败
[警告] 36020251155199 余晓晨 中缺少或未成功映射的列：['prediction']
文件读取失败
[警告] 15420251152758 陈颖 中缺少或未成功映射的列：['prediction']


  warn("Workbook contains no default style, apply openpyxl's default")


[警告] 15220222201909 段靖淳 中缺少或未成功映射的列：['prediction']


  final_df = pd.concat(all_data_frames, ignore_index=True)


In [16]:
def calculate_score_clean(df):
    mask_true = df['name'] == '真实涨跌'
    df_true = df.loc[mask_true, ['date', 'code', 'prediction']].rename(columns={'prediction': 'true_val'})
    df_students = df.loc[~mask_true].copy()
    merged = pd.merge(
        df_students, 
        df_true, 
        on=['date', 'code'], 
        how='left'
    )
    condition = (merged['true_val'] == -1) | (merged['prediction'] == merged['true_val'])    
    merged['score'] = condition.astype(int) # True变1，False变0
    merged.loc[merged['true_val'] == -999, 'score'] = 0
    result = merged.groupby(['id', 'name'])['score'].sum().reset_index()
    result = result.rename(columns={'score': 'correct_count'})
    return result

In [17]:
pivot_df = final_df.pivot(index=['id', 'name'], columns=[ 'code', 'stock', 'date'], values='prediction').reset_index()
pivot_df.to_excel("预测汇总.xlsx")

In [18]:
score_df = calculate_score_clean(final_df)
score_df.to_excel("得分统计.xlsx")