In [1]:
import chess
import chess.pgn
import pandas as pd
import random
import sqlite3
from collections import defaultdict
import math

In [None]:
def calculate_piece_lifespans0(game,color):
    moves = list(game.mainline_moves())
    total_steps=len(moves)
    if total_steps==0:
        return []
    replay_board=game.board()
    piece_birth={}
    piece_type_map={}
    piece_name_map={}
    lifespans=[]
    promo_count=defaultdict(int)
    piece_counter=defaultdict(int)
    for square, piece in replay_board.piece_map().items():
        if piece.color == to_side(color):
            piece_counter[piece.piece_type] += 1
            name = f"{chess.piece_name(piece.piece_type)}_{piece_counter[piece.piece_type]}"
            pid=id(piece)
            piece_birth[pid]=0
            piece_type_map[pid]=piece.piece_type
            piece_name_map[pid]=name
    
    for step,move in enumerate(moves,start=1):
        # 先定位被捕获的方块（考虑 en passant）
        if replay_board.is_en_passant(move):
            # 如果当前回放者 turn 是白，白在走，捕获的是 move.to_square - 8
            # recall: replay_board.turn 是正在走棋的一方（在 push 之前）
            if replay_board.turn == chess.WHITE:
                captured_square = move.to_square - 8
            else:
                captured_square = move.to_square + 8
        else:
            captured_square = move.to_square
        captured_piece = replay_board.piece_at(captured_square)
        # 如果捕获的是本方的某个棋子（相对于 color），记录该棋子寿命并从注册表移除
        if captured_piece and captured_piece.color == to_side(color):
            pid = id(captured_piece)
            if pid in piece_birth:
                birth = piece_birth[pid]
                death = step
                lifespan = death - birth + 1            # inclusive counting
                name = piece_name_map.get(pid, chess.piece_name(piece_type_map.get(pid, captured_piece.piece_type)))
                lifespans.append((name, (birth, death), lifespan / total_steps))
                # 清理注册数据
                del piece_birth[pid]
                del piece_type_map[pid]
                del piece_name_map[pid]
        replay_board.push(move)

        if move.promotion:
            promoted_piece=replay_board.piece_at(move.to_square)
            if promoted_piece and promoted_piece.color==to_side(color):
                ptype_name=chess.piece_name(promoted_piece.piece_type)
                promo_count[ptype_name] += 1
                promo_name = f"{ptype_name}_promo_{promo_count[ptype_name]}"
                pid=id(promoted_piece)
                piece_birth[pid]=step
                piece_type_map[pid]=promoted_piece.piece_type
                piece_name_map[pid]=promo_name

    for pid,birth in piece_birth.items():
        death=total_steps
        lifespan = death - birth
        name=piece_name_map.get(pid, chess.piece_name(piece_type_map.get(pid)))
        lifespans.append((name, (birth, death), lifespan / total_steps))
        
        
    return lifespans

def calculate_piece_lifespans(game, color):
    moves = list(game.mainline_moves())
    total_steps = len(moves)
    if total_steps == 0:
        return []

    replay_board=game.board()

    # 我们使用基于 board-square 的追踪，不用 id(piece)
    # square_to_pid: 当前棋盘上某个格子的“唯一棋子标识名” (e.g. "pawn_2")
    # pid_birth: pid -> birth_step
    # pid_type, pid_name 用于记录类型与输出名
    square_to_pid={}
    pid_birth={}
    pid_type={}
    pid_name_map={}
    lifespans=[]
    promo_count=defaultdict(int)
    piece_counter=defaultdict(int)

    # 注册初始棋子（出生 time = 0），用起始格子生成唯一 id
    for sq,piece in replay_board.piece_map().items():
        if piece.color==to_side(color):
            piece_counter[piece.piece_type] += 1
            name=f"{chess.piece_name(piece.piece_type)}_{piece_counter[piece.piece_type]}"
            pid=name  # 用字符串作为 pid，便于后续写入列名
            square_to_pid[sq]=pid
            pid_birth[pid]=0
            pid_type[pid]=piece.piece_type
            pid_name_map[pid]=name

    # 逐步回放
    for step, move in enumerate(moves, start=1):
        # 先计算被捕获的格子（考虑 en passant）
        if replay_board.is_en_passant(move):
            captured_square=move.to_square - 8 if replay_board.turn==chess.WHITE else move.to_square+8
        else:
            captured_square=move.to_square

        # 如果在 captured_square 上有我方的 pid（表示我方被吃）
        if captured_square in square_to_pid:
            pid=square_to_pid[captured_square]
            # 只关心本 color 的棋子：由于 square_to_pid 只注册了 color 的棋子，这里便为本方被吃事件
            birth=pid_birth.get(pid, 0)
            death=step
            lifespan=death - birth             # 用 death - birth，使 ratio 最大为 1
            name=pid_name_map.get(pid, pid)
            lifespans.append((name, (birth, death), lifespan/total_steps))
            # 从当前棋盘追踪表中移除被吃掉的 pid
            # 注意：被吃掉的格子在 push 之后也会变为空，这里先移除映射
            del pid_birth[pid]
            del pid_type[pid]
            del pid_name_map[pid]
            del square_to_pid[captured_square]

        # 现在执行推子，棋盘状态更新
        from_sq=move.from_square
        to_sq=move.to_square
        # 在 move 之前，某个 pid 可能位于 from_sq（如果是我方棋子）
        pid_moving=None
        if from_sq in square_to_pid:
            pid_moving=square_to_pid[from_sq]
            # 移动 pid 映射：从 from_sq -> to_sq
            del square_to_pid[from_sq]
            square_to_pid[to_sq]=pid_moving
        else:
            # 如果没有 pid 在 from_sq，表示这个 move 不是我们追踪颜色那方的棋子（对方走棋）
            # 但仍可能是对方的升变/吃子，不影响我们对本 color 的映射
            pass

        replay_board.push(move)

        # 如果此次 move 是升变并且生成的是本 color 的新棋子
        if move.promotion:
            promoted_piece=replay_board.piece_at(move.to_square)
            if promoted_piece and promoted_piece.color == to_side(color):
                ptype_name = chess.piece_name(promoted_piece.piece_type)
                promo_count[ptype_name] += 1
                promo_name = f"{ptype_name}_promo_{promo_count[ptype_name]}"
                pid = promo_name
                #新 pid，出生时间是当前 step（promotion 生效在 push 之后）
                pid_birth[pid]=step
                pid_type[pid]=promoted_piece.piece_type
                pid_name_map[pid]=promo_name
                # 该新 pid 位于 to_sq（覆盖前面的映射）
                square_to_pid[to_sq]=pid

    # 最后：所有仍在 pid_birth 的 pid 都是“活到终局”的
    for pid, birth in pid_birth.items():
        death=total_steps
        lifespan=death - birth
        name=pid_name_map.get(pid, pid)
        lifespans.append((name, (birth, death),lifespan/total_steps))

    return lifespans


def flatten_lifespans_into_features(lifespans_list, features_dict, prefix=''):
    """
    lifespans_list: list of (name, (birth, death), ratio)
    features_dict: dict to write into (e.g. features['white'])
    prefix: optional string to prefix the keys (unused if you already want exact names)
    This will write keys like 'queen_1_life_ratio' and 'queen_promo_1_life_ratio'
    and also 'queen_1_span' if you want to preserve birth/death tuple string.
    """
    for name, (birth, death), ratio in lifespans_list:
        key_ratio=f"{name}LifeRatio"
        key_span=f"{name}Span"   # optional, stores (a,b) as tuple or string
        features_dict[key_ratio]=ratio
        features_dict[key_span]=(birth, death)  # or f"{birth}-{death}"

PIECE_COLUMNS = [
    "king_1", "queen_1",
    "rook_1", "rook_2",
    "bishop_1", "bishop_2",
    "knight_1", "knight_2",
    "pawn_1", "pawn_2", "pawn_3", "pawn_4",
    "pawn_5", "pawn_6", "pawn_7", "pawn_8"
]

def to_side(color):
    """Normalize color input: accepts 'white'/'black' or chess.WHITE/chess.BLACK."""
    if color=='white' or color==chess.WHITE:
        return chess.WHITE
    if color=='black' or color==chess.BLACK:
        return chess.BLACK
    raise ValueError("color must be 'white' or 'black' or chess.WHITE/chess.BLACK")
def initialize_features():
    features={}
    for piece_name in PIECE_COLUMNS:
        features[f"{piece_name}LifeRatio"]=math.nan
        features[f"{piece_name}Span"]=None
    return features

def extract_features(game, game_id, player_id_white, player_id_black):
    board=game.board()

    features = {
        'white': initialize_features(),
        'black': initialize_features(),
        'game_id': game_id,
        'player_white': player_id_white,
        'player_black': player_id_black
    }

    for move in game.mainline_moves():
        board.push(move)
        
    white_lifespans=calculate_piece_lifespans(game,'white')
    black_lifespans=calculate_piece_lifespans(game, 'black')

    flatten_lifespans_into_features(white_lifespans,features['white'])
    flatten_lifespans_into_features(black_lifespans,features['black'])

    features['white']['game_id']=game_id
    features['white']['player']=player_id_white
    features['black']['game_id']=game_id
    features['black']['player']=player_id_black

    return features['white'],features['black']

In [None]:
# 手动造棋盘并走几步：白先吃黑的兵，然后黑再吃白的马
board = chess.Board()
moves = [
    "d4", "d5",
    "Bf4", "Nc6",
    "Nc3", "e5",
    "dxe5", "d4",
    "Ne4", "Bf5",
    "Ng3", "Bb4+",  #黑象进攻
    "c3", "dxc3",   #黑兵吃兵
    "Qxd8+", "Rxd8",
    "bxc3", "Bxc3#", #黑象吃白兵将死
]


for san in moves:
    board.push(board.parse_san(san))

game = chess.pgn.Game.from_board(board)

white_lifespans = calculate_piece_lifespans(game, "white")
black_lifespans = calculate_piece_lifespans(game, "black")

print("=== White lifespans ===")
for name, (birth, death), ratio in white_lifespans:
    print(f"{name:10} lifespan=({birth}, {death})  ratio={ratio:.2f}")

print("\n=== Black lifespans ===")
for name, (birth, death), ratio in black_lifespans:
    print(f"{name:10} lifespan=({birth}, {death})  ratio={ratio:.2f}")


=== White lifespans ===
pawn_6     lifespan=(0, 14)  ratio=0.78
queen_1    lifespan=(0, 16)  ratio=0.89
pawn_7     lifespan=(0, 18)  ratio=1.00
pawn_1     lifespan=(0, 18)  ratio=1.00
pawn_2     lifespan=(0, 18)  ratio=1.00
pawn_3     lifespan=(0, 18)  ratio=1.00
pawn_4     lifespan=(0, 18)  ratio=1.00
pawn_5     lifespan=(0, 18)  ratio=1.00
pawn_8     lifespan=(0, 18)  ratio=1.00
rook_1     lifespan=(0, 18)  ratio=1.00
knight_1   lifespan=(0, 18)  ratio=1.00
bishop_1   lifespan=(0, 18)  ratio=1.00
king_1     lifespan=(0, 18)  ratio=1.00
bishop_2   lifespan=(0, 18)  ratio=1.00
knight_2   lifespan=(0, 18)  ratio=1.00
rook_2     lifespan=(0, 18)  ratio=1.00

=== Black lifespans ===
pawn_4     lifespan=(0, 7)  ratio=0.39
queen_1    lifespan=(0, 15)  ratio=0.83
pawn_5     lifespan=(0, 17)  ratio=0.94
rook_1     lifespan=(0, 18)  ratio=1.00
knight_1   lifespan=(0, 18)  ratio=1.00
bishop_1   lifespan=(0, 18)  ratio=1.00
king_1     lifespan=(0, 18)  ratio=1.00
bishop_2   lifespan=(0, 18)  rat