In [10]:
import chess
import chess.pgn
import pandas as pd
import random
import sqlite3

In [None]:
def to_side(color):
    """Normalize color input: accepts 'white'/'black' or chess.WHITE/chess.BLACK."""
    if color == 'white' or color == chess.WHITE:
        return chess.WHITE
    if color == 'black' or color == chess.BLACK:
        return chess.BLACK
    raise ValueError("color must be 'white' or 'black' or chess.WHITE/chess.BLACK")


In [None]:
def extract_features(game, game_id, player_id_white, player_id_black):
    board = game.board()

    features = {
        'white': initialize_features(),
        'black': initialize_features(),
        'game_id': game_id,
        'player_id_white': player_id_white,
        'player_id_black': player_id_black
    }

    for move in game.mainline_moves():
        board.push(move)
        
        if board.turn != chess.WHITE:
            features['white'] = update_features(features['white'], board, "white",move)
        else:
            features['black'] = update_features(features['black'], board, 'black',move)

    features['white']['game_id'] = game_id
    features['white']['player_id'] = player_id_white
    features['black']['game_id'] = game_id
    features['black']['player_id'] = player_id_black

    #增加的：calculate_center_control normalized:
    features['white']['center_control_score_mean']=compute_mean_score(features['white'], 'center_control_scores_list')
    features['black']['center_control_score_mean']=compute_mean_score(features['black'], 'center_control_scores_list')
    #calculate_piece_activity normalized:
    features['white']['piece_activity_score_mean']=compute_mean_score(features['white'],'piece_activity_scores_list')
    features['black']['piece_activity_score_mean']=compute_mean_score(features['black'],'piece_activity_scores_list')
    features['white']['king_safety_score_mean']=compute_mean_score(features['white'],'king_safety_scores_list')
    features['black']['king_safety_score_mean']=compute_mean_score(features['black'],'king_safety_scores_list')


    return features['white'], features['black']

# Additional function implementations...



def initialize_features():
    # Initialize all feature scores to zero
    return {
        'center_control_scores_list': [],
        'piece_activity_scores_list': [],
        'king_safety_scores_list': [],
        #havent write functions yet
        'attacking_moves_scores_list': [],
        'captures_scores_list': [],
        'pawn_structure_scores_list': [],

        #new
        'castled': False,
        'lost_castling_rights': False
    }

def update_features(feature_dict, board,color,move):
    # Update each feature score based on the current board state
    feature_dict['center_control_scores_list'].append(calculate_center_control(board,color))
    feature_dict['piece_activity_scores_list'].append(calculate_piece_activity(board,color))
    feature_dict['king_safety_scores_list'].append(calculate_king_safety(board,color,feature_dict,move))
    #need to be added after
    #feature_dict['attacking_moves_score'] += calculate_attacking_moves(board,color)
    #feature_dict['captures_score'] += calculate_captures(board,color)
    #feature_dict['pawn_structure_score'] += calculate_pawn_structure(board,color)
    return feature_dict

def calculate_center_control(board,color):
    side=to_side(color)
    enemy=chess.BLACK if side==chess.WHITE else chess.WHITE
    center_control=0
    center_squares = [
    chess.C3, chess.D3, chess.E3, chess.F3,
    chess.C4, chess.D4, chess.E4, chess.F4,
    chess.C5, chess.D5, chess.E5, chess.F5,
    chess.C6, chess.D6, chess.E6, chess.F6
    ]

    for square in center_squares:
        piece=board.piece_at(square)
        rank = chess.square_rank(square)
        if color == 'white':
            weight=1 + 0.2 * (rank - 3)  # rank 4 是中线
        else:
            weight=1 + 0.2 * (4 - rank)  # 黑方从 rank 4 向下推进
        
        control_score=0
        threat_score=0

        if piece and piece.color==color:
            control_score+=weight
        attackers=board.attackers(chess.WHITE if color=='white' else chess.BLACK, square)
        for attacker_sq in attackers:
            if piece and piece.color!=color:
                threat_score+=get_piece_value(piece.piece_type)
            else:
                control_score+=1
        center_control+=(control_score+threat_score)
    return center_control

#增加的：
def compute_mean_score(feature_dict, key):
    scores = feature_dict.get(key, [])
    return sum(scores) / len(scores) if scores else 0
#增加的：
def get_piece_value(piece_type:chess.PieceType) -> float:
    values={
        chess.PAWN:1.0,
        chess.KNIGHT:3.0,
        chess.BISHOP:3.0,
        chess.ROOK:5.0,
        chess.QUEEN:9.0,
        chess.KING:0.0
    }
    return values.get(piece_type,0)

#github fixed:
def calculate_piece_activity(board, color):
    activity_score = 0
    side=to_side(color)
    # Using items() to retrieve both the square and the piece
    for square, piece in board.piece_map().items():
        if piece.color == side:
            # Calculate the number of attacked squares from this square,but also need to make sure it's legal
            legal_moves=[m for m in board.legal_moves if m.from_square==square]
            activity_score += get_piece_value(piece.piece_type)*len(board.attacks(square))
    return activity_score


#fixed:   7   dimensions of KingSafety
#calculate_castling(board, color)
#calculate_king_tropism(board, color)
#calculate_defenders(board, color)
#calculate_pawn_shield(board, color)
#calculate_pawn_storm(board, color)
#calculate_zone_control(board, color)
#calculate_diagonal_exposure(board,color)
#calculate_escape_squares(board,color) 
def calculate_king_safety(board,color,feature_dict,move):
    king_safety_score=0

    king_safety_score+=calculate_castling(board,color,feature_dict,move)
    king_safety_score+=calculate_king_tropism(board, color)
    king_safety_score+=calculate_king_defenders(board, color)
    king_safety_score+=calculate_pawn_shield(board, color)
    #delete
    #king_safety_score+=calculate_pawn_storm(board, color)
    king_safety_score+=calculate_zone_control(board, color)
    king_safety_score+=calculate_diagonal_exposure(board,color)  #斜着的线
    king_safety_score+=calculate_escape_squares(board,color)   #国王是否有足够的逃生格子

    return king_safety_score


#castling part
def calculate_castling(board, color, feature_dict, move):
    side=to_side(color)

    #already castled, plus score since then
    if feature_dict['castled']:
        return 30

    #current move is castling, pluse score
    if board.is_castling(move):
        feature_dict['castled'] = True
        return 30

    #havent catstled and cant castle
    if not feature_dict['castled'] and not board.has_castling_rights(side):
        feature_dict['lost_castling_rights'] = True
        return -20

    #havent castled but can castle
    return 0

#king tropsim part
def calculate_king_tropism(board,color):
    king_color=to_side(color)
    king_square=board.king(king_color)
    king_tropism_score=0
    if king_square is None:
        return 0

    #iterate all the pieces on the board
    for square,piece in board.piece_map().items():
        #skip the same color pieces
        if piece.color==king_color:
            continue
        #for enemy's pieces,calculate the distance to our king
        dist=chess.square_distance(square,king_square)
        if dist==0:  #king itself, skip
            continue

        piece_value=get_piece_value(piece.piece_type)
        #enemy's QUEEN ROOK BISHOP KNIGHT PAWN
        if piece.piece_type in [chess.QUEEN, chess.ROOK, chess.BISHOP, chess.KNIGHT, chess.PAWN]:
            #use "(dist**2)" to better describe: more close, more dangerous
            king_tropism_score-= 10*piece_value/(dist**2)
    
    #limit range:
    king_tropism_score = max(-300, min(0, king_tropism_score))
    return king_tropism_score

#king's defender pieces part:   1--4 score
def calculate_king_defenders(board,color):
    king_color=to_side(color)
    king_zone=get_king_zone(board,color,radius=1)
    king_defenders_score=0
    for sq in king_zone:
        attackers=board.attackers(king_color,sq)
        king_defenders_score+=len(attackers)  #no matter what kind of defender is, only plus 1 score
    king_defenders_score=min(king_defenders_score,4)
    return king_defenders_score

def get_king_zone(board,color,radius=1):
    king_color=to_side(color)
    king_square=board.king(king_color)
    if king_square is None:
        return []
    zone=[sq for sq in chess.SQUARES if chess.square_distance(sq,king_square)<=radius]
    return zone

#pawn sheild part
def is_open_file(board,file_index,color):
    for row in range(8):
        sq=chess.square(file_index,row)
        piece=board.piece_at(sq)
        if piece and piece.color==color: #have self-pieces in this line then is not open-file
            return False
    return True
   
def calculate_pawn_shield(board,color):
    #find where is the king
    side=to_side(color)
    king_square=board.king(side)
    if king_square is None:
        return 0   #actually not possible
    #the king's column & row
    king_file=chess.square_file(king_square)
    king_rank=chess.square_rank(king_square)

    #only calculate when king are in initial 2 rows
    #chess.square_rank() 返回 0..7 indicates to row 1..8 on board：
    if side==chess.WHITE:
        if king_rank>1:
            return 0
        target_rank=king_rank+1
    if side==chess.BLACK:
        if king_rank<6:
            return 0
        target_rank=king_rank-1

    #check the pawns which their column are next to king's:
    candidate_files=[king_file]   #0--7，indicates to a--h
    if king_file-1>=0:#king not in a
        candidate_files.append(king_file-1) #king's left column
    if king_file+1<=7:   #king not in h
        candidate_files.append(king_file+1)  #king's right column
    
    pawn_shield_score=0
    for file in candidate_files:
        sq=chess.square(file,target_rank)
        piece=board.piece_at(sq)
        if piece and piece.piece_type==chess.PAWN and piece.color==side:
            pawn_shield_score+=5
        else:
            pawn_shield_score-=5
            if is_open_file(board,file,color):
                pawn_shield_score-=5   #open file punishment
    
    pawn_shield_score=max(-30,min(15,pawn_shield_score))    #this score's range:-30 to +15
    return pawn_shield_score

#king zone control part
#forward_extension can choose 
def calculate_zone_control(board, color,forward_extension=2):
    king_color=to_side(color)
    enemy_color=chess.BLACK if king_color == chess.WHITE else chess.WHITE
    king_square=board.king(king_color)
    if king_square is None:
        return 0
    
    king_zone=get_king_zone(board,color,radius=1)
    #additional squares
    king_file=chess.square_file(king_square)
    king_rank=chess.square_rank(king_square)
    forward_ranks=[]
    if king_color==chess.WHITE:
        for r in range(king_rank+1,min(8, king_rank + 1 + forward_extension)):
            forward_ranks.append(r)
    else:
        for r in range(max(0, king_rank - forward_extension), king_rank):
            forward_ranks.append(r)
    for f in range(max(0,king_file-1),min(8,king_file+2)):
        for r in forward_ranks:
            sq=chess.square(f,r)
            if sq not in king_zone:
                king_zone.append(sq)

    #iterate every square in king_zone:
    zone_control_score=0
    for sq in king_zone:
        attackers=board.attackers(enemy_color,sq)
        for attacker in attackers:
            piece=board.piece_at(attacker)
            if piece:
                zone_control_score-=5*get_piece_value(piece.piece_type)

    #limit range:
    zone_control_score = max(-100, min(0, zone_control_score))
    return zone_control_score

#diagonal exposure part: the extension of open-file punishment
def calculate_diagonal_exposure(board,color):
    side = to_side(color)
    king_square=board.king(side)
    if king_square is None:
        return 0
    king_file=chess.square_file(king_square)
    king_rank=chess.square_rank(king_square)

    diagonal_exposure_score=0

    #directions
    directions=[(1,0),(-1,0),  #king's row
                (1,1),(-1,1), #king's to enemy diagonals
                (1,-1),(-1,-1) #king's to back diagonals
                ]
    for direction_file,direction_rank in directions:
        file,rank=king_file,king_rank
        our_flag=False #if have pieces from us in that direction

        for dist in range(1,8):
            file=king_file+direction_file*dist
            rank=king_rank+direction_rank*dist
            #break when out of board
            if not (0<=file<8 and 0<=rank<8):
                break
            sq=chess.square(file,rank)
            piece=board.piece_at(sq)
            if piece:
                if piece.color==side:
                    our_flag=True  #detect out our piece
                    break
                else:
                    #have enemy in king's row, see if it can attack king(only ROOK QUEEN)
                    if direction_rank == 0 and piece.piece_type in [chess.ROOK, chess.QUEEN]:
                        diagonal_exposure_score -= max(10, 40 // dist)
                    #have enemy in king's diagonal, see if it can attack king(only BISHOP QUEEN)
                    if abs(direction_file) == abs(direction_rank) and piece.piece_type in [chess.BISHOP, chess.QUEEN]:
                        diagonal_exposure_score -= max(10, 40 // dist)
                    break

        if not our_flag:
            diagonal_exposure_score-=3

    #limit range
    diagonal_exposure_score=max(-80,min(0,diagonal_exposure_score))
    return diagonal_exposure_score

#king's escape square part:
#when king will be check
def calculate_escape_squares(board,color):
    king_color=to_side(color)
    enemy_color=chess.BLACK if king_color == chess.WHITE else chess.WHITE
    king_square=board.king(king_color)
    if king_square is None:
        return 0

    escape_square_num=0
    for sq in get_king_zone(board,color,radius=1):
        piece=board.piece_at(sq)
        if piece and piece.color==king_color:
            continue  #cant escape to here because have our own piece
        #when sq is taken by enemy or is free
        #check if that sq is safe:
        if board.is_attacked_by(enemy_color,sq):
            continue
        escape_square_num+=1
    escape_square_score=5*escape_square_num
    return escape_square_score



In [23]:
def extract_features_from_row(row, game_id):
    #game_id: 唯一编号
    board = chess.Board()

    features = {
        'white': initialize_features(),
        'black': initialize_features(),
        'game_id': game_id,
        'player_id_white':row.get('White', None),
        'player_id_black':row.get('Black', None)
    }

    #把Moves 列变成pgn可以读的格式
    moves_str = row['Moves']
    if not moves_str or moves_str.strip() == "":
        return features['white'], features['black']  # 空对局

    moves_list=[m.strip() for m in str(moves_str).split(",") if m.strip()]

    for move_uci in moves_list:
        move=None
        try:
            move = chess.Move.from_uci(move_uci)
        except Exception:
            # try SAN fallback (some datasets use SAN)
            try:
                move = board.parse_san(move_uci)
            except Exception:
                # invalid move string: skip
                continue

        if move not in board.legal_moves:
            # 非法走子（有些数据集可能不完整），跳过
            continue

        board.push(move)

        # 注意：board.turn 表示 **下一步走棋方**
        if board.turn == chess.WHITE:  # 刚刚是黑方走的
            features['black'] = update_features(features['black'], board, "black", move)
        else:  # 刚刚是白方走的
            features['white'] = update_features(features['white'], board, "white", move)

    #添加基本信息
    features['white']['game_id'] = game_id
    features['white']['player_id'] =row.get('White', None)
    features['black']['game_id'] = game_id
    features['black']['player_id'] =row.get('Black', None)

    # 增加统计平均值
    features['white']['center_control_score_mean'] = compute_mean_score(features['white'], 'center_control_scores_list')
    features['black']['center_control_score_mean'] = compute_mean_score(features['black'], 'center_control_scores_list')

    features['white']['piece_activity_score_mean'] = compute_mean_score(features['white'], 'piece_activity_scores_list')
    features['black']['piece_activity_score_mean'] = compute_mean_score(features['black'], 'piece_activity_scores_list')

    features['white']['king_safety_score_mean']=compute_mean_score(features['white'],'king_safety_scores_list')
    features['black']['king_safety_score_mean']=compute_mean_score(features['black'],'king_safety_scores_list')
    
    return features['white'], features['black']

In [24]:
conn = sqlite3.connect(r"c:\sqlite3\cpl.db")
df = pd.read_sql_query("SELECT * FROM raw", conn)

# 遍历 DataFrame 提取特征
results = []
for i, row in df.iterrows():
    white_features, black_features = extract_features_from_row(row, game_id=i)
    results.append(white_features)
    results.append(black_features)

features_df = pd.DataFrame(results)

In [25]:
features_df

Unnamed: 0,center_control_scores_list,piece_activity_scores_list,king_safety_scores_list,attacking_moves_scores_list,captures_scores_list,pawn_structure_scores_list,castled,lost_castling_rights,game_id,player_id,center_control_score_mean,piece_activity_score_mean,king_safety_score_mean
0,[13],[148.0],[-5.5487528344671215],[],[],[],False,False,0.0,VovAn1991,13.000000,148.000000,-5.548753
1,[],[],[],[],[],[],False,False,0.0,RealBlindMuddy,0.000000,0.000000,0.000000
2,[],[],[],[],[],[],False,False,,,,,
3,[],[],[],[],[],[],False,False,,,,,
4,"[13, 18.0, 22.0, 20.0, 20.0, 23.0, 22.0, 28.0,...","[148.0, 150.0, 180.0, 196.0, 208.0, 228.0, 230...","[-5.5487528344671215, -5.895975056689343, -19....",[],[],[],False,True,2.0,ilmago,25.500000,244.875000,-55.234576
...,...,...,...,...,...,...,...,...,...,...,...,...,...
459,[],[],[],[],[],[],False,False,229.0,IMRosen,0.000000,0.000000,0.000000
460,"[14.0, 14.0, 17.0, 19.0, 20.0, 20.0, 20.0, 20....","[139.0, 157.0, 177.0, 201.0, 182.0, 187.0, 182...","[-8.548752834467122, -9.136507936507936, -6.25...",[],[],[],False,True,230.0,brauliocuarta,19.692308,187.538462,-20.457200
461,"[15.0, 18.0, 21.0, 20.0, 20.0, 19.0, 19.0, 23....","[129.0, 150.0, 158.0, 167.0, 161.0, 158.0, 166...","[4.104024943310657, -6.243197278911564, -1.830...",[],[],[],False,False,230.0,kevin9512,19.153846,182.230769,-4.983412
462,"[13, 18.0, 22.0, 20.0, 23.0, 27.0, 25.0, 30.0,...","[148.0, 150.0, 180.0, 196.0, 216.0, 242.0, 227...","[-5.5487528344671215, -5.895975056689343, -19....",[],[],[],False,True,231.0,Danesz22,23.407407,239.851852,-42.691688


In [26]:
features_df.to_csv('9-8.csv')