In [37]:
# 改进势头计算函数以包括赢得的局数和盘数，以及更精确的发球优势计算
def calculate_streak(df, index, window_size=4):
    start_index = max(index - window_size, 0)
    end_index = min(index + window_size + 1, len(df))
    time_window_df = df.iloc[start_index:end_index]

    # 初始化势头值
    p1_streak = 0
    p2_streak = 0

    # 计算“连胜”值
    p1_streak = time_window_df['game_victor'].apply(lambda x: 1 if x == 1 else 0).cumsum().max()
    p2_streak = time_window_df['game_victor'].apply(lambda x: 1 if x == 2 else 0).cumsum().max()


    return p1_streak, p2_streak


# 对数据集中的每一行应用计算势头的函数
# streak_values = [calculate_streak(match_data, index) for index in range(len(match_data))]

# streak_values

In [38]:
def calculate_physical_strength(df, index, window_size=3):
    start_index = max(index - window_size, 0)
    end_index = min(index + window_size + 1, len(df))
    time_window_df = df.iloc[start_index:end_index]

    # 初始化体力值
    p1_physical_strength = 0
    p2_physical_strength = 0

    # 共同因素
    rally_count = time_window_df['rally_count'].sum()
    # 确定发球者是球员1还是球员2
    serving_player = time_window_df['server'].iloc[-1]

    # 根据发球者计算速度和跑动距离
    if serving_player == 1:
        p1_speed_mph = time_window_df['speed_mph'].iloc[-1]
        p2_speed_mph = 0

    else:
        p2_speed_mph = time_window_df['speed_mph'].iloc[-1]
        p1_speed_mph = 0
        
    
    p1_distance_run = time_window_df['p1_distance_run'].iloc[-1]
    p2_distance_run = time_window_df['p2_distance_run'].iloc[-1]
    # 考虑速度和跑动距离对体力值的影响
    p1_physical_strength += p1_speed_mph
    p2_physical_strength += p2_speed_mph

    p1_physical_strength += p1_distance_run
    p2_physical_strength += p2_distance_run

    # 考虑击球次数对体力值的影响
    p1_physical_strength += rally_count
    p2_physical_strength += rally_count

    return p1_physical_strength, p2_physical_strength


# physical_strength = [calculate_physical_strength(match_data, index) for index in range(len(match_data))]
# physical_strength

In [39]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

def calculate_momentum_topsis(match_data, speed_weight=0.01):
    # 复制数据以避免修改原始数据
    match_data = match_data.copy()

    # 创建新的列 score_difference
    def map_score_to_number(score):
        score_mapping = {'0': 0, '15': 1, '30': 2, '40': 3, 'AD': 4}
        return score_mapping.get(score, 0)

    # 应用映射函数到 p1_score 和 p2_score 列
    match_data['p1_score_number'] = match_data['p1_score'].apply(map_score_to_number)
    match_data['p2_score_number'] = match_data['p2_score'].apply(map_score_to_number)

    # 计算得分差
    match_data['score_difference'] = match_data['p1_score_number'] - match_data['p2_score_number']
    match_data['game_difference'] = match_data['p1_games'] - match_data['p2_games']
    match_data['set_difference'] = match_data['p1_sets'] - match_data['p2_sets']

    # 选择进行Topsis分析的特征列
    selected_features = ['p1_net_pt_won', 'p1_ace', 'score_difference', 'game_difference', 'set_difference', 'p1_winner', 'p1_double_fault', 'p1_unf_err', 'p1_break_pt_won', 'speed_mph']
    selected_features += ['p2_net_pt_won','p2_ace', 'p2_winner', 'p2_double_fault', 'p2_unf_err', 'p2_break_pt_won']

    # 使用均值填充speed
    mean_speed = match_data['speed_mph'].mean()
    match_data['speed_mph'].fillna(mean_speed, inplace=True)

    # 提取这些特征列的数据
    features_data = match_data[selected_features]

    # 步骤1：数据标准化（使用 Min-Max 标准化）
    scaler = MinMaxScaler()
    normalized_data = scaler.fit_transform(features_data)

    # 步骤2：计算每个指标的信息熵
    entropy_values = -normalized_data * np.log2(normalized_data)
    entropy_values[np.isnan(entropy_values)] = 0  # 处理NaN值
    entropy_per_indicator = np.sum(entropy_values, axis=0)

    # 步骤3：计算每个指标的权重
    weights = 1 - (entropy_per_indicator / np.sum(entropy_per_indicator))

    # 步骤4：计算加权标准化值
    weighted_normalized_data = normalized_data * weights

    # 步骤5：正向化负向指标
    # 假设所有指标都是越大越好，后续再处理正负号

    # 步骤6：计算正负理想解
    positive_ideal_solution = weighted_normalized_data.max(axis=0)
    negative_ideal_solution = weighted_normalized_data.min(axis=0)

    # 步骤7：计算距离
    distance_to_positive_ideal = np.linalg.norm(weighted_normalized_data - positive_ideal_solution, axis=1)
    distance_to_negative_ideal = np.linalg.norm(weighted_normalized_data - negative_ideal_solution, axis=1)

    # 步骤8：计算综合得分
    composite_score = distance_to_negative_ideal / (distance_to_positive_ideal + distance_to_negative_ideal)

    # 步骤2：使用比分差距进行势头计算
    match_data['score_momentum'] = match_data['score_difference'] * composite_score

    # 步骤3：将各项比分差异乘以 composite_score
    match_data['game_momentum'] = match_data['game_difference'] * composite_score
    match_data['set_momentum'] = match_data['set_difference'] * composite_score

    # 步骤4：将所有势头指标结合计算动量
    match_data['p1_momentum_topsis'] = -match_data['score_momentum'] + match_data['p1_net_pt_won'] + match_data['p1_ace'] - match_data['game_momentum'] - match_data['set_momentum']
    match_data['p1_momentum_topsis'] += match_data['p1_winner'] - match_data['p1_double_fault'] - match_data['p1_unf_err'] + match_data['p1_break_pt_won'] + speed_weight * match_data['speed_mph']
    match_data['p1_momentum_topsis'] += -match_data['p2_winner'] - match_data['p2_ace'] - match_data['p2_break_pt_won']
    match_data['p2_momentum_topsis'] = match_data['score_momentum'] + match_data['p2_net_pt_won'] + match_data['p2_ace'] + match_data['game_momentum'] + match_data['set_momentum']
    match_data['p2_momentum_topsis'] += match_data['p2_winner'] - match_data['p2_double_fault'] - match_data['p2_unf_err'] + match_data['p2_break_pt_won'] + speed_weight * match_data['speed_mph']
    match_data['p2_momentum_topsis'] += -match_data['p1_winner'] - match_data['p1_ace'] - match_data['p1_break_pt_won']

    return match_data['p1_momentum_topsis'], match_data['p2_momentum_topsis']

In [40]:
# 改进势头计算函数以包括赢得的局数和盘数，以及更精确的发球优势计算
def calculate_momentum_improved(df, index, window_size=3):
    start_index = max(index - window_size, 0)
    end_index = min(index + window_size + 1, len(df))
    time_window_df = df.iloc[start_index:end_index]

    # 初始化势头值
    p1_momentum = 0
    p2_momentum = 0

    # 赢得的局数和盘数
    p1_sets_won = time_window_df['p1_sets'].iloc[-1] - time_window_df['p1_sets'].iloc[0]
    p2_sets_won = time_window_df['p2_sets'].iloc[-1] - time_window_df['p2_sets'].iloc[0]
    p1_games_won = time_window_df['p1_games'].iloc[-1] - time_window_df['p1_games'].iloc[0]
    p2_games_won = time_window_df['p2_games'].iloc[-1] - time_window_df['p2_games'].iloc[0]

    # 发球优势
    # 假设发球方在每个得分上的额外权重为0.1
    serve_advantage_weight = 0.1
    p1_serve_advantage = (time_window_df[time_window_df['server'] == 1]['point_victor'] == 1).sum() * serve_advantage_weight
    p2_serve_advantage = (time_window_df[time_window_df['server'] == 2]['point_victor'] == 2).sum() * serve_advantage_weight

    # 其他因素（得分优势、破发点、非受迫性失误、制胜分）
    p1_points_advantage = time_window_df['point_victor'].apply(lambda x: x == 1).sum() - time_window_df['point_victor'].apply(lambda x: x == 2).sum()
    p2_points_advantage = -p1_points_advantage
    p1_break_points_won = time_window_df['p1_break_pt_won'].sum()
    p2_break_points_won = time_window_df['p2_break_pt_won'].sum()
    p1_unforced_errors = -time_window_df['p1_unf_err'].sum()
    p2_unforced_errors = -time_window_df['p2_unf_err'].sum()
    p1_winners = time_window_df['p1_winner'].sum()
    p2_winners = time_window_df['p2_winner'].sum()

    # 合并计算势头
    p1_momentum = p1_points_advantage + p1_serve_advantage + p1_break_points_won + p1_unforced_errors + p1_winners + p1_sets_won + p1_games_won
    p2_momentum = p2_points_advantage + p2_serve_advantage + p2_break_points_won + p2_unforced_errors + p2_winners + p2_sets_won + p2_games_won

    return p1_momentum, p2_momentum

def cumsum_detection(series):
    """
    CUMSUM检测算法实现，用于检测序列中的转折点。
    :param series: 一维数据序列 (Pandas Series)
    :return: 转折点的索引列表
    """
    # 计算差分序列
    diff_series = series.diff().fillna(0)  # 用0填充NaN值
    
    # 计算累积和
    cumsum_series = diff_series.cumsum()
    
    # 识别转折点：当累积和重新穿过零点时，认为是一个转折点
    turning_points = []
    for i in range(1, len(cumsum_series)):
        # 如果累积和的符号与前一个不同，则认为是转折点
        if cumsum_series[i] * cumsum_series[i-1] < 0:
            turning_points.append(i)
    
    return turning_points



from scipy.stats import norm

def runs_test(sequence):
    """
    游程检验，判断序列随机性。
    :param sequence: 输入的序列 (list or np.array)
    :return: Z统计量和p值
    """
    # 将序列分为两类
    median_value = np.median(sequence)
    binary_sequence = [1 if x > median_value else 0 for x in sequence]
    
    # 计算游程数量
    runs = 1  # 至少有一个游程
    for i in range(1, len(binary_sequence)):
        if binary_sequence[i] != binary_sequence[i-1]:
            runs += 1
    
    # 计算期望的游程数量和标准差
    n1 = binary_sequence.count(1)
    n2 = binary_sequence.count(0)
    expected_runs = 2 * n1 * n2 / (n1 + n2) + 1
    variance = (expected_runs - 1) * (expected_runs - 2) / (n1 + n2 - 1)
    
    # 计算Z统计量
    Z = (runs - expected_runs) / np.sqrt(variance)
    
    # 计算p值
    p_value = 2 * (1 - norm.cdf(abs(Z)))  # 双尾检验
    
    return Z, p_value


def mark_indices_in_list(length, indices):
    """
    根据指定的索引列表，在长度为length的列表中标记索引位置。
    
    :param length: 列表的长度
    :param indices: 需要标记为1的索引值列表
    :return: 标记后的列表，其中指定索引位置为1，其他位置为0
    """
    # 初始化列表，长度为length，所有值为0
    marked_list = [0] * length
    
    # 在指定索引位置标记为1
    for index in indices:
        if index < length:  # 确保索引在列表长度范围内
            marked_list[index] = 1
            
    return marked_list




def getva(x):
    # print(x)
    if x<0.05:
        return 1
    else:
        return 0

In [41]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings 
warnings.filterwarnings('ignore')
# 读取数据
df = pd.read_csv('american_clean_data.csv')

# 选择一场特定的比赛进行分析，根据题目描述选择2023年温布尔登决赛
# 假设决赛的match_id为"2023-wimbledon-1701"
match_id = "2023-usopen-1107"
match_data = df[df['match_id'] == match_id].copy()
match_data.reset_index(inplace=True,drop=True)

# 选择一场特定的比赛进行分析，根据题目描述选择2023年温布尔登决赛
# 假设决赛的match_id为"2023-wimbledon-1701"
# 对数据集中的每一行应用计算势头的函数
momentum_values = [calculate_momentum_improved(match_data, index) for index in range(len(match_data))]
p1_momentum_sub=[]
p2_momentum_sub=[]
for i in momentum_values:
    p1_momentum_sub.append(i[0])
    p2_momentum_sub.append(i[1])

# 将计算得到的势头值分配回原始数据集
match_data['p1_momentum_sub']=pd.DataFrame(p1_momentum_sub)
match_data['p2_momentum_sub']=pd.DataFrame(p2_momentum_sub)
match_data['p1_momentum_topsis'], match_data['p2_momentum_topsis'] = calculate_momentum_topsis(match_data)
# 设置 Topsis 方法的权重
topsis_weight = 0.2
# 将 Topsis 方法的结果加权合并到总势头中
p1_momentum = (1 - topsis_weight) * match_data['p1_momentum_sub'] + topsis_weight * match_data['p1_momentum_topsis']
p2_momentum = (1 - topsis_weight) * match_data['p2_momentum_sub'] + topsis_weight * match_data['p2_momentum_topsis']    
    
# 将计算得到的势头值分配回原始数据集
match_data['p1_momentum']=pd.DataFrame(p1_momentum)
match_data['p2_momentum']=pd.DataFrame(p2_momentum)

# 对数据集中的每一行应用计算势头的函数
streak_values = [calculate_streak(match_data, index) for index in range(len(match_data))]
physical_strength = [calculate_physical_strength(match_data, index) for index in range(len(match_data))]

p1_streak=[]
p2_streak=[]
for i in streak_values:
    p1_streak.append(i[0])
    p2_streak.append(i[1])


p1_physical_strength=[]
p2_physical_strength=[]
for i in physical_strength:
    p1_physical_strength.append(i[0])
    p2_physical_strength.append(i[1])

# 将计算得到的新分量分配回原始数据集
match_data['p1_streak']=pd.DataFrame(p1_streak)
match_data['p2_streak']=pd.DataFrame(p2_streak)

# match_data['p1_physical_strength']=pd.DataFrame(p1_physical_strength)
# match_data['p2_physical_strength']=pd.DataFrame(p2_physical_strength)



# 指定输出文件的路径和文件名
output_file_path = 'american_1107_match_swing.csv'

# 将数据保存到CSV文件
match_data.to_csv(output_file_path, index=False)
