# 集成以往写出来的一些小函数并在一个小数据集上测试
# write some functions and test them on a small dataset

环境设置 set the environments

In [2]:
import math
import chess
import os
import sys
import chess.pgn
import chess.engine
import csv
import math
from tqdm import tqdm
import numpy as np
import pandas as pd
import asyncio
from tqdm import tqdm
# 在代码头部添加（重要！）
from stockfish import Stockfish  # 确保此语句在调用Stockfish之前
# 🔧 关键：强制更换 event loop（解决部分 Windows 异步问题）
if sys.platform.startswith('win'):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


一些文件路径、分析深度等初始设置

In [37]:
csv_file="C:\\Users\\Administrator\\Desktop\\找到初始代码\\output\\chess_com_games_2025-04-21.pgn_processed.csv"
STOCKFISH_PATH ="C:/Users/Administrator/Desktop/stockfish-windows-x86-64-avx2/stockfish/stockfish-windows-x86-64-avx2.exe"
STOCKFISH_DEPTH=20
OUTPUT_CSV = "test_check.csv"  # 输出路径

In [4]:
# 计算胜率差（和原公式一致）
def _calculate_delta(cp_before, cp_after):
    def cp_to_win(cp):
        return 50 + 50 * (2 / (1 + math.exp(-0.00368208 * cp)) - 1)
    return abs(cp_to_win(cp_before) - cp_to_win(cp_after))
# 辅助函数：从 Stockfish 的评估字典中提取 cp 分值
def get_cp(evaluation):
    # evaluation 形如 {'type': 'cp', 'value': 34} 或 {'type': 'mate', 'value': 2}
    if evaluation["type"] == "cp":
        return evaluation["value"]
    elif evaluation["type"] == "mate":
        # 如果是 mate，按正负10000 处理
        return 10000 if evaluation["value"] > 0 else -10000
    return 0

测试上面的能不能用

In [5]:
# 逐步分析棋局准确性（完全按照你原来的评估逻辑）
def generate_mediate_df(csv_file, STOCKFISH_PATH, depth=20):
    raw_df = pd.read_csv(csv_file, encoding='latin1')
    mediate_df = raw_df.copy()
    mediate_df['white_accuracy'] = None
    mediate_df['black_accuracy'] = None

    # 使用 stockfish 包来替代 chess.engine.SimpleEngine
    stockfish = Stockfish(path=STOCKFISH_PATH, depth=depth)
    # 若需要，可设置其他参数，例如线程数、hash 大小等

    with tqdm(total=len(mediate_df),
              desc="🔄 棋局分析进度",
              unit="局",
              bar_format="{l_bar}{bar:30}| {n_fmt}/{total_fmt} [剩余:{remaining}]",
              dynamic_ncols=True) as pbar:

        for idx, row in mediate_df.iterrows():
            pbar.set_postfix_str(f"当前对局ID: {idx}")
            moves = str(row['Moves']).split(',')
            board = chess.Board()
            white_acc = []
            black_acc = []

            for move_uci in moves:
                move_uci = move_uci.strip()
                if not move_uci:
                    continue

                try:
                    move = chess.Move.from_uci(move_uci)
                    if move not in board.legal_moves:
                        # 如果遇到非法走法则跳出本局分析
                        break

                    fen_before=board.fen()
                    stockfish.set_fen_position(fen_before)
                    eval_before=stockfish.get_evaluation()
                    cp_before=get_cp(eval_before)

                    board.push(move)

                    fen_after=board.fen()
                    stockfish.set_fen_position(fen_after)
                    eval_after=stockfish.get_evaluation()
                    cp_after=get_cp(eval_after)
                


                    # 判断本步走法是哪方走的：
                    # 注意：执行 push() 后，board.turn 表示下一步走棋的颜色，
                    # 所以走完走法前的局面时，当前走棋方为与 board.turn 相反的一方
                    moved_color = 'white' if board.turn == chess.BLACK else 'black'

                    # 根据原公式计算走法准确率
                    delta = _calculate_delta(cp_before, cp_after)
                    acc = 103.1668 * math.exp(-0.04354 * delta) - 3.1669
                    clamped_acc = max(0.0, min(100.0, acc))

                    if moved_color == 'white':
                        white_acc.append(clamped_acc)
                    else:
                        black_acc.append(clamped_acc)

                except Exception as e:
                    print(f"\n❌ 对局 {idx} 发生错误：{str(e)}")
                    break

            # 记录本局的平均准确率
            mediate_df.at[idx, 'white_accuracy'] = sum(white_acc) / len(white_acc) if white_acc else None
            mediate_df.at[idx, 'black_accuracy'] = sum(black_acc) / len(black_acc) if black_acc else None
            pbar.update(1)

    return mediate_df

In [None]:
# ===== 主函数入口 =====
if __name__ == "__main__":
    mediate_df = generate_mediate_df(
        csv_file=csv_file,
        STOCKFISH_PATH=STOCKFISH_PATH,
        depth=STOCKFISH_DEPTH
    )
    mediate_df.to_csv(OUTPUT_CSV, index=False)
    print(f"\n✅ 分析完成！结果已保存至: {OUTPUT_CSV}")

🔄 棋局分析进度:   0%|                              | 0/6 [剩余:?]

求对局数量

In [7]:
#求moves

# 计算每行的移动步数并添加新列
mediate_df['Move Numbers'] = mediate_df['Moves'].apply(
    lambda x: x.count(',') + 1 if isinstance(x, str) and ',' in x and x.strip() else None
)
print(mediate_df.head())
print(mediate_df.columns.tolist())

                                    Event       Site        Date  Round  \
0  Early-Titled-Tuesday-Blitz-May-07-2024  Chess.com  2024.05.07      1   
1  Early-Titled-Tuesday-Blitz-May-07-2024  Chess.com  2024.05.07      1   
2  Early-Titled-Tuesday-Blitz-May-07-2024  Chess.com  2024.05.07      1   
3  Early-Titled-Tuesday-Blitz-May-07-2024  Chess.com  2024.05.07      1   
4  Early-Titled-Tuesday-Blitz-May-07-2024  Chess.com  2024.05.07      1   

         White             Black Result  WhiteElo  BlackElo TimeControl  \
0       Honiex            Senn71    1-0      2581      2201       180+1   
1    Aisen1011         GMSrinath    0-1      2475      2847       180+1   
2     Airquake       Road2GM3000    1-0      2824      2466       180+1   
3  miragha_yev         Eliwood44    1-0      2815      2455       180+1   
4      klimkoj  Happy1712Drummer    1-0      2921      2507       180+1   

       EndTime                     Termination  \
0  8:02:34 PDT       Honiex won by resignation  

In [8]:
print(mediate_df['white_accuracy'].head())
print(mediate_df['black_accuracy'].head())
print(mediate_df['Move Numbers'].head())


0    96.507881
1    91.925971
2    95.902978
3    93.777876
4    95.362976
Name: white_accuracy, dtype: object
0    83.715154
1    96.932798
2    87.022344
3    89.251709
4    88.042223
Name: black_accuracy, dtype: object
0    21.0
1    36.0
2    33.0
3    47.0
4    41.0
Name: Move Numbers, dtype: float64


求获胜者

求一个指定日期的获胜者

In [None]:
def get_event_score_table(event):
    
    # 胜局统计
    white_win = event[event.Result == '1-0'].groupby('White').count()['Round'].reset_index()
    black_win = event[event.Result == '0-1'].groupby('Black').count()['Round'].reset_index()

    # 和局统计
    white_draw = event[event.Result == '1/2-1/2'].groupby('White').count()['Round'].reset_index()
    black_draw = event[event.Result == '1/2-1/2'].groupby('Black').count()['Round'].reset_index()

    # 合并胜局
    result_wins = pd.merge(white_win, black_win, left_on='White', right_on='Black', how='outer')
    result_wins['Player'] = result_wins['White'].combine_first(result_wins['Black'])
    result_wins = result_wins[['Player', 'Round_x', 'Round_y']].fillna(0)
    result_wins.columns = ['Player', 'Won White', 'Won Black']

    # 合并和局
    result_draws = pd.merge(white_draw, black_draw, left_on='White', right_on='Black', how='outer')
    result_draws['Player'] = result_draws['White'].combine_first(result_draws['Black'])
    result_draws = result_draws[['Player', 'Round_x', 'Round_y']].fillna(0)
    result_draws.columns = ['Player', 'Draw White', 'Draw Black']

    # 总结分数
    df_win = pd.merge(result_wins, result_draws, on='Player', how='outer').fillna(0)
    df_win['points'] = df_win['Won White'] + df_win['Won Black'] + 0.5 * df_win['Draw White'] + 0.5 * df_win['Draw Black']

    return df_win.sort_values(by='points', ascending=False)


In [27]:
#增加了统计所有选手的
def all_event_score_table(event):
    # 获取所有选手（白方 + 黑方）
    all_players = pd.Series(pd.concat([event['White'], event['Black']])).dropna().unique()
    all_players_df = pd.DataFrame({'Player': all_players})

    # 胜局统计
    white_win = event[event.Result == '1-0'].groupby('White').count()['Round'].reset_index()
    white_win.columns = ['Player', 'Won White']

    black_win = event[event.Result == '0-1'].groupby('Black').count()['Round'].reset_index()
    black_win.columns = ['Player', 'Won Black']

    # 和局统计
    white_draw = event[event.Result == '1/2-1/2'].groupby('White').count()['Round'].reset_index()
    white_draw.columns = ['Player', 'Draw White']

    black_draw = event[event.Result == '1/2-1/2'].groupby('Black').count()['Round'].reset_index()
    black_draw.columns = ['Player', 'Draw Black']

    # 将胜局、和局依次合并到所有选手表中（保证每位选手都在）
    df = all_players_df.copy()
    df = df.merge(white_win, on='Player', how='left')
    df = df.merge(black_win, on='Player', how='left')
    df = df.merge(white_draw, on='Player', how='left')
    df = df.merge(black_draw, on='Player', how='left')

    # 缺失值填 0
    df.fillna(0, inplace=True)

    # 计算得分
    df['points'] = df['Won White'] + df['Won Black'] + 0.5 * (df['Draw White'] + df['Draw Black'])

    return df.sort_values(by='points', ascending=False).reset_index(drop=True)

In [10]:
def specific_event_top(csv_file):
    raw_df = pd.read_csv(csv_file, encoding='latin1')
    event_name = input("Please enter the name of the event to query (from the 'Event' column): ")

    event = raw_df[raw_df.Event == event_name]
    if event.empty:
        print("No matching event found. Please check your input.")
        return

    result_table = get_event_score_table(event).head(15)
    print("\nTop 15 players in the selected event:")
    print(result_table)


In [11]:
#运行后先输入要查询的比赛字段，然后就会跳出来
#first enter the event name to query, then the result will be displayed.
specific_event_top(csv_file)


Top 15 players in the selected event:
                Player  Won White  Won Black  Draw White  Draw Black  points
0             Airquake        1.0        0.0         0.0         0.0     1.0
1         Aslanov_Umid        1.0        0.0         0.0         0.0     1.0
2    ChainedDowntohell        0.0        1.0         0.0         0.0     1.0
3   Darko-Dimitrijevic        1.0        0.0         0.0         0.0     1.0
4        Elegance_Riks        0.0        1.0         0.0         0.0     1.0
5            GMSrinath        0.0        1.0         0.0         0.0     1.0
6               Honiex        1.0        0.0         0.0         0.0     1.0
7           KnightDuta        0.0        1.0         0.0         0.0     1.0
8           MITerryble        1.0        0.0         0.0         0.0     1.0
9           Onischuk_V        0.0        1.0         0.0         0.0     1.0
10        SHIVACalypso        1.0        0.0         0.0         0.0     1.0
11             klimkoj        1.0    

求一个数据集中所有小赛季的获胜者

In [12]:
raw_df = pd.read_csv(csv_file, encoding='latin1')
def get_event_winner(raw_df, event_name):
    event = raw_df[raw_df.Event == event_name]
    df_out = get_event_score_table(event)
    
    top_players = df_out[df_out.points == df_out.points.max()]
    if len(top_players) > 1:
        return top_players.Player.values.tolist(), top_players.points.values[0]
    else:
        return top_players.Player.values[0], top_players.points.values[0]

然后是求Avarage Performance Score那些

In [None]:
#raw_df已经加载好了
raw_df['RatingDiff'] = raw_df['WhiteElo'] - raw_df['BlackElo']
raw_df['ExpectedWinRate_White'] = 1 / (1 + 10 ** (-raw_df['RatingDiff'] / 400))  # 白方预期胜率
raw_df['ExpectedWinRate_Black'] = 1 - raw_df['ExpectedWinRate_White']             # 黑方预期胜率 
# 生成预测结果（白方预期胜率>0.5则预测白方胜,结果就是1-0）
raw_df['PredictedResult'] = np.select(
    [raw_df['ExpectedWinRate_White'] > 0.5, raw_df['ExpectedWinRate_White'] < 0.5],
    ['1-0', '0-1'],
    default='1/2-1/2'  # 平局预测
)

# 定义实际得分计算函数
def get_actual_score(result, role):
    if result == '1-0':               #White win
        return 1.0 if role == 'White' else 0.0    #White player's score is 1,Blackplayer's score is 0
    elif result == '0-1':             #Black win
        return 0.0 if role == 'White' else 1.0
    elif result == '1/2-1/2':         #draw both get 0.5
        return 0.5
    else:
        return np.nan  # 异常结果标记为缺失值

##拆分白方和黑方视角
#白方视角
white_games = raw_df[['White', 'WhiteElo', 'Result', 'PredictedResult','ExpectedWinRate_White']].copy()
white_games.rename(columns={'White': 'Player','WhiteElo': 'Elo','ExpectedWinRate_White':'ExpectedWinRate'}, inplace=True)
white_games['ActualScore'] = white_games['Result'].apply(
    lambda x: get_actual_score(x, role='White')  # 关键修改：明确角色
)
white_games['Role'] = 'White'  # 添加角色标识
#修改后的黑方视角处理（增加ExpectedWinRate_Black）
black_games = raw_df[['Black', 'BlackElo', 'Result', 'PredictedResult', 'ExpectedWinRate_Black']].copy()
black_games.rename(columns={
    'Black': 'Player',
    'BlackElo': 'Elo',
    'ExpectedWinRate_Black': 'ExpectedWinRate'  # 统一预期胜率列名
}, inplace=True)
black_games['Role'] = 'Black'
#合并所有对局
all_games = pd.concat([white_games, black_games])
print(all_games.head(30))

## 计算全局预测准确率
overall_accuracy = (raw_df['PredictedResult'] == raw_df['Result']).mean()
print(f"全局预测准确率: {overall_accuracy:.2%}")

# 分选手预测准确率
player_accuracy = all_games.groupby('Player').apply(
    lambda x: (x['PredictedResult'] == x['Result']).mean()
).sort_values(ascending=False)

###判断一个选手在某个赛季表现到底怎么样   win=1 lose=0 draw=0.5 expected=a score=1-a/0.5-a/0-a
# 实际得分列的计算
def get_actual_score(Result: str, Role: str) -> float:
    if Result=='1/2-1/2':
        return 0.5
    elif (Role=='White' and Result=='1-0') or (Role=='Black' and Result=='0-1'):
        return 1.0
    else:
        return 0.0
all_games['ActualScore'] = all_games.apply(
    lambda row:get_actual_score(row['Result'], row['Role']), 
    axis=1
)

# 计算表现分（实际得分 - 预期胜率）
all_games['Performance Score'] = all_games['ActualScore'] - all_games['ExpectedWinRate']

print(all_games.head())

# 分组计算统计量
player_stats=all_games.groupby('Player')['Performance Score'].agg(
    TotalGames='count',  # 总参赛次数
    TotalScore='sum',    # 表现分总和
    AverageScore='mean'  # 平均表现分
).reset_index()

#生成最终结果
player_stats['Average Performance Score']=player_stats['AverageScore'].round(2)
perform_scores=player_stats[['Player', 'Average Performance Score']]

                Player   Elo Result PredictedResult  ExpectedWinRate  \
0               Honiex  2581    1-0             1-0         0.899117   
1            Aisen1011  2475    0-1             0-1         0.105137   
2             Airquake  2824    1-0             1-0         0.887036   
3          miragha_yev  2815    1-0             1-0         0.888184   
4              klimkoj  2921    1-0             1-0         0.915535   
5              MaxLeto  2438    0-1             0-1         0.130150   
6           NovakRatko  2423    0-1             0-1         0.138859   
7           MITerryble  3038    1-0             1-0         0.949034   
8          KINGOF2SOAP  2459    0-1             0-1         0.111816   
9         SHIVACalypso  2544    1-0             0-1         0.068279   
10        Aslanov_Umid  2651    1-0             1-0         0.850490   
11  Darko-Dimitrijevic  2282    1-0             0-1         0.160825   
12           cycygogne  2173    0-1             0-1         0.09

  player_accuracy = all_games.groupby('Player').apply(


In [23]:
print(perform_scores.head(30))

                Player  Average Performance Score
0             Airquake                       0.11
1            Aisen1011                      -0.11
2         Aslanov_Umid                       0.15
3    ChainedDowntohell                       0.09
4          ClauSanLuis                      -0.15
5   Darko-Dimitrijevic                       0.84
6         EgorYakovlev                      -0.84
7        Elegance_Riks                       0.13
8            Eliwood44                      -0.11
9            GMSrinath                       0.11
10         GameOverBro                      -0.05
11    Happy1712Drummer                      -0.08
12              Honiex                       0.10
13         KINGOF2SOAP                      -0.11
14          KnightDuta                       0.14
15          MITerryble                       0.05
16             MaxLeto                      -0.13
17          NovakRatko                      -0.14
18          Onischuk_V                       0.11


In [24]:
perform_scores.to_csv('perform_scores0.csv', index=False)
all_games.to_csv('all_games0.csv', index=False)


In [28]:
score_table = all_event_score_table(raw_df)
print(score_table)

                Player  Won White  Won Black  Draw White  Draw Black  points
0               Honiex        1.0        0.0         0.0         0.0     1.0
1         SHIVACalypso        1.0        0.0         0.0         0.0     1.0
2           Onischuk_V        0.0        1.0         0.0         0.0     1.0
3           KnightDuta        0.0        1.0         0.0         0.0     1.0
4        Elegance_Riks        0.0        1.0         0.0         0.0     1.0
5            GMSrinath        0.0        1.0         0.0         0.0     1.0
6   Darko-Dimitrijevic        1.0        0.0         0.0         0.0     1.0
7         Aslanov_Umid        1.0        0.0         0.0         0.0     1.0
8    ChainedDowntohell        0.0        1.0         0.0         0.0     1.0
9              klimkoj        1.0        0.0         0.0         0.0     1.0
10            Airquake        1.0        0.0         0.0         0.0     1.0
11         miragha_yev        1.0        0.0         0.0         0.0     1.0

In [32]:
player_all = pd.merge(perform_scores, score_table, on='Player', how='inner')
print(player_all.head(30))
player_all.to_csv("player_all0.csv",index=False)

                Player  Average Performance Score  Won White  Won Black  \
0             Airquake                       0.11        1.0        0.0   
1            Aisen1011                      -0.11        0.0        0.0   
2         Aslanov_Umid                       0.15        1.0        0.0   
3    ChainedDowntohell                       0.09        0.0        1.0   
4          ClauSanLuis                      -0.15        0.0        0.0   
5   Darko-Dimitrijevic                       0.84        1.0        0.0   
6         EgorYakovlev                      -0.84        0.0        0.0   
7        Elegance_Riks                       0.13        0.0        1.0   
8            Eliwood44                      -0.11        0.0        0.0   
9            GMSrinath                       0.11        0.0        1.0   
10         GameOverBro                      -0.05        0.0        0.0   
11    Happy1712Drummer                      -0.08        0.0        0.0   
12              Honiex   

In [36]:
mediate_df.to_csv("mediate_df0.csv",index=False)

求acpl的，下面这段有问题

In [None]:
#有问题一直跑不出来结果的

# 配置参数
STOCKFISH_DEPTH = 10
MISSED_WIN_THRESHOLD = 1000
THREADS = 4
HASH_SIZE = 2048  # MB

def get_score_diff(stockfish, fen, move_uci):
    """获取指定走法与最佳走法的评分差异"""
    stockfish.set_fen_position(fen)
    # 获取最佳走法评估
    best_move = stockfish.get_best_move(fen)
    if not best_move:
        return 0
    
    # 获取最佳走法评分
    best_eval = stockfish.get_evaluation()
    best_score = best_eval['value'] if best_eval['type'] == 'cp' else 10000 * (1 if best_eval['value'] > 0 else -1)    # 获取实际走法评分
    #实际走法评分
    stockfish.set_fen_position(fen)
    stockfish.make_moves_from_current_position([move_uci])
    actual_eval = stockfish.get_evaluation()
    actual_score = actual_eval['value'] if actual_eval['type'] == 'cp' else 10000* (1 if actual_eval['value'] > 0 else -1)
    
    # 计算差异（从行棋方角度）
    if stockfish.get_fen_position().split()[1] == 'b':  # 如果是黑方行棋
        return best_score - actual_score
    else:
        return actual_score - best_score
    
def calculate_acpl(moves_str, stockfish):
    """基于stockfish库的ACPL计算"""
    board = chess.Board()
    white_loss = white_moves = 0
    black_loss = black_moves = 0
    
    try:
        # 优化：根据UCI格式处理走法
        if ',' in moves_str:
            moves = [m.strip() for m in moves_str.split(',') if m.strip()]
        else:
            # 假设是空格分隔的UCI格式
            moves = [m.strip() for m in moves_str.split() if m.strip()]
            
        for move_uci in moves:
            if board.is_game_over():
                break
            
            try:
                move = chess.Move.from_uci(move_uci)
                if move not in board.legal_moves:
                    continue
            except:
                continue
            
            # 获取当前局面FEN和行棋方
            current_fen = board.fen()
            current_color = 'white' if board.turn == chess.WHITE else 'black'
            
            # 计算评分差异
            score_diff = get_score_diff(stockfish, current_fen, move_uci)
            score_diff=abs(score_diff)
            if score_diff >= MISSED_WIN_THRESHOLD:
                score_diff = 0
                
            # 累计损失
            if current_color == 'white':
                white_loss += score_diff
                white_moves += 1
            else:
                black_loss += score_diff
                black_moves += 1
                
            # 执行走子
            board.push(move)
            
    except Exception as e:
        print(f"处理错误: {str(e)}")
    
    # 计算平均值
    white_acpl = white_loss / white_moves if white_moves > 0 else 0
    black_acpl = black_loss / black_moves if black_moves > 0 else 0
    return round(white_acpl, 1), round(black_acpl, 1)

if __name__ == "__main__":
    # 初始化Stockfish引擎
    stockfish = Stockfish(
        path=STOCKFISH_PATH,
        depth=STOCKFISH_DEPTH,#thinking time
        parameters={
            "Threads": THREADS,
            "Hash": HASH_SIZE,
            "UCI_Elo": 3200  # 可选：设置引擎强度
        }
    )
    
   
    
    # 处理数据
    white_acpl_list = []
    black_acpl_list = []

    # 添加批处理和进度显示
    batch_size = 10  # 每批处理的棋局数
    total_batches = (len(mediate_df) + batch_size - 1) // batch_size

    with tqdm(total=len(mediate_df), desc="ACPL计算进度", unit="局") as pbar:
        for i in range(0, len(mediate_df), batch_size):
            batch = mediate_df.iloc[i:i+batch_size]
            
            for idx, row in batch.iterrows():
                moves_str = row['Moves'] if isinstance(row['Moves'], str) else ''
                w_acpl, b_acpl = calculate_acpl(moves_str, stockfish)
                
                white_acpl_list.append(w_acpl)
                black_acpl_list.append(b_acpl)
                
                pbar.update(1)
                pbar.set_postfix_str(f"白ACPL: {w_acpl} | 黑ACPL: {b_acpl}")
    
    # 保存结果
    mediate_df["White_ACPL"] = white_acpl_list
    mediate_df["Black_ACPL"] = black_acpl_list
    mediate_df.to_csv("analyzed_games.csv", index=False)
    print(mediate_df.head())
    print(mediate_df.columns.tolist())


ACPL计算进度:   0%|          | 0/13 [00:06<?, ?局/s]


KeyboardInterrupt: 