In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

Get hit type

In [33]:
import json

with open('./hit_types.json', 'r') as file:
    hit_types = json.load(file)

hit_types

['clear',
 'defensive shot',
 'drive',
 'drop',
 'lob',
 'long service',
 'net shot',
 'push/rush',
 'short service',
 'smash']

In [1]:

import torch
from torch.utils.data import Dataset
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
import pandas as pd
import torch
from torch.utils.data import Dataset
import numpy as np
from torch.utils.data import DataLoader
import os
import random

'''
human keypoints: 17*2
court keypoints: 6*2
net keypoints:   4*2
ball keypoints:  1*2
total keypoints: 28*2

'''




class HitTypeDataset(Dataset):
    def __init__(self, dataset_folder,num_consecutive_frames,normalization=True):
        self.dataset=[]
        self.positive=0
        self.negative=0
        # 遍历文件夹及其子文件夹，找到所有的CSV文件路径
        cnt=0
        for root, dirs, files in os.walk(dataset_folder):
            for file in files:
                if file.endswith(".csv"):
                    # 定义处理函数
                    data_path=os.path.join(root, file)
                    print(data_path)

                    try:
                      df= pd.read_csv(data_path, converters={"ball": eval,"top":eval,"bottom":eval,"court":eval,"net":eval})
                    except:
                      print('Error! cannot process: ', data_path)
                      continue

                    rows = len(df)
                    remainder = rows % 30
                    if remainder > 0:
                        num_to_pad = 30 - remainder
                    else:
                        num_to_pad = 0

                    if num_to_pad > 0:
                        last_row = df.iloc[-1]
                        padding_data = np.tile(last_row.values, (num_to_pad, 1))
                        padded_df = pd.DataFrame(padding_data, columns=df.columns)
                        df = pd.concat([df, padded_df], axis=0)
                        df = df.reset_index(drop=True)

                    small_dataset =df

                    for i in range(len(small_dataset)):
                        if i%num_consecutive_frames!=0:
                            continue
                        if i>=len(small_dataset)-num_consecutive_frames:
                            break
                        oridata=small_dataset.loc[i:i+num_consecutive_frames-1,:].copy()
                        oridata=oridata.reset_index(drop=True)
                        data=[]

                        target1=None

                        for index,row in oridata.iterrows():
                            pos=np.array(row['type'])
                            if str(pos) in ["defensive shot", "drive"]:
                            # if str(pos) in hit_types:
                                if target1 is None:
                                    target1=1
                                    self.positive+=1

                            top=np.array(row['top']).reshape(-1,2)
                            bottom=np.array(row['bottom']).reshape(-1,2)
                            court=np.array(row['court']).reshape(-1,2)
                            net=np.array(row['net']).reshape(-1,2)
                            ball=np.array(row['ball']).reshape(-1,2)

                            frame_data = np.concatenate((top, bottom, court, net, ball), axis=0)

                            if normalization:
                                frame_data[:,0]/=1920
                                frame_data[:,1]/=1080
                            data.append(frame_data.reshape(1,-1))
                        data=np.array(data)
                        if target1 is None:
                            if self.negative>self.positive:
                                continue
                            target1=0
                            self.negative+=1
                        self.dataset.append((data.reshape(-1),target1))


    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        # 假设每个样本是一个元组 (input, target)
        sample = self.dataset[index]
        input_data = sample[0]
        target1 = sample[1]

        # 转换为Tensor对象
        input_tensor = torch.tensor(input_data)
        target1_tensor = torch.tensor(target1)

        return input_tensor, target1_tensor


num_consecutive_frames=30
batch_size=30
shuffle=True
normalization=True

TrainDataset=HitTypeDataset("./train",num_consecutive_frames,normalization)
ValidDataset=HitTypeDataset("./valid",num_consecutive_frames,normalization)

./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_1-24.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_1-18.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_1-25.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-38.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-39.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-11.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-9.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-8.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-14.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-24.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-26.csv
./train/Ratchanok_INTANON_WANG_Zhi_Yi_Malaysia_Open_2022_SemiFinals/rally_2-33.csv
./trai

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ShotTypeModel(nn.Module):
    def __init__(self, feature_dim, num_consecutive_frames):
        super(ShotTypeModel, self).__init__()
        self.num_consecutive_frames = num_consecutive_frames
        self.feature_dim = feature_dim

        self.gru1 = nn.GRU(feature_dim // num_consecutive_frames, 64, bidirectional=True, batch_first=True)
        self.gru2 = nn.GRU(128, 64, bidirectional=True, batch_first=True)
        self.global_maxpool = nn.MaxPool1d(num_consecutive_frames)
        self.dense = nn.Linear(128, 1)  # Output layer for binary classification

    def forward(self, x):
        batch_size = x.shape[0]
        x = x.float()
        x = x.view(batch_size, self.num_consecutive_frames, self.feature_dim // self.num_consecutive_frames)
        x, _ = self.gru1(x)
        x, _ = self.gru2(x)
        x = x.transpose(1, 2)
        x = self.global_maxpool(x).squeeze()
        x = self.dense(x)
        return x  # Removed softmax, output is now logits


print(TrainDataset.positive,TrainDataset.negative)
print(ValidDataset.positive,ValidDataset.negative)

feature_dim=90*num_consecutive_frames
train_data_loader = DataLoader(TrainDataset, batch_size=batch_size, shuffle=shuffle, drop_last=True)

valid_data_loader = DataLoader(ValidDataset, batch_size=batch_size, shuffle=shuffle, drop_last=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model=ShotTypeModel(feature_dim,num_consecutive_frames)
criterion = nn.BCEWithLogitsLoss()
model.to(device)
criterion.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 100



train_loss_list = []
valid_loss_list=[]
for epoch in range(num_epochs):
    train_loss_sum=0
    model.train()
    for batch_data in train_data_loader:
        inputs, labels = batch_data
        inputs = inputs.to(device)  # 将输入数据移动到设备上
        labels = labels.to(device).float()  # 将输入数据移动到设备上

        outputs = model(inputs)

        train_loss = criterion(outputs, labels.view(-1, 1))
        train_loss_sum+=train_loss.detach()
        # 反向传播和优化
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()
        # 执行你的训练或测试操作
    train_loss_list.append(train_loss_sum)


    # 打印训练信息
    if (epoch + 1) % 1== 0:
        print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs,
                                                    train_loss.item()))
    with torch.no_grad():
        true_positives = 0
        total_positives = 0  # Actual positives in the dataset
        for inputs, labels in valid_data_loader:
            inputs = inputs.to(device)  # Move input data to the device
            labels = labels.to(device)
            # Forward pass
            outputs = model(inputs).squeeze()  # Squeeze to ensure output is of shape [batch_size]

            # Applying a threshold to determine predictions
            predictions = outputs > 0.5  # Outputs > 0.0 are classified as 1, else 0
            
            # Count true positives and total actual positives
            true_positives += ((predictions == 1) & (labels == 1)).sum().item()
            total_positives += (labels == 1).sum().item()

        if total_positives > 0:
            positive_accuracy = true_positives / total_positives
            print(f'Accuracy on positive class: {positive_accuracy}')
        else:
            print("No positive instances in the dataset.")


2529 2530
173 172
Epoch [1/100], Loss: 0.6056
Accuracy on positive class: 0.5670731707317073
Epoch [2/100], Loss: 0.5741
Accuracy on positive class: 0.19631901840490798
Epoch [3/100], Loss: 0.5412
Accuracy on positive class: 0.44171779141104295
Epoch [4/100], Loss: 0.3392
Accuracy on positive class: 0.6227544910179641
Epoch [5/100], Loss: 0.4215
Accuracy on positive class: 0.4817073170731707
Epoch [6/100], Loss: 0.6545
Accuracy on positive class: 0.5963855421686747
Epoch [7/100], Loss: 0.7144
Accuracy on positive class: 0.5670731707317073
Epoch [8/100], Loss: 0.3010
Accuracy on positive class: 0.6727272727272727
Epoch [9/100], Loss: 0.4993
Accuracy on positive class: 0.7831325301204819
Epoch [10/100], Loss: 0.4239
Accuracy on positive class: 0.41916167664670656
Epoch [11/100], Loss: 0.4100
Accuracy on positive class: 0.7212121212121212
Epoch [12/100], Loss: 0.2762
Accuracy on positive class: 0.6484848484848484
Epoch [13/100], Loss: 0.3389
Accuracy on positive class: 0.7116564417177914


KeyboardInterrupt: 

In [36]:
# 保存整个模型
torch.save(model, './defensive-shot_shot_detect.pth')

# # print(f"best_accuracy={best_accuracy}")
# with torch.no_grad():
#     correct = 0
#     total = 0
#     for inputs, labels in valid_data_loader:
#         inputs = inputs.to(device)  # 将输入数据移动到设备上
#         labels = labels.to(device).float()  # 将标签移动到设备上

#         # 前向传播
#         outputs = model(inputs)

#         y_true=torch.argmax(labels,axis=1)
#         y_pred=torch.argmax(outputs,axis=1)
#         print(y_true)
#         print(y_pred)
#         total+=len(y_true)
#         correct+=(y_true==y_pred).sum().item()


#     print(f'Accuracy on test set: {correct/total}')




In [37]:
# final_list=[]
# true_list=[]
# data_path="/content/drive/MyDrive/Badminton_Player_Analysis_Project/SoloShuttlePose/draft/HitDataset/valid/Viktor_AXELSEN_Jonatan_CHRISTIE_Malaysia_Open_2022_SemiFinals/rally_1-12.csv"
# df= pd.read_csv(data_path, converters={"ball": eval,"top":eval,"bottom":eval,"court":eval,"net":eval})

# rows = len(df)
# remainder = rows % 12
# if remainder > 0:
#     num_to_pad = 12 - remainder
# else:
#     num_to_pad = 0

# if num_to_pad > 0:
#     last_row = df.iloc[-1]
#     padding_data = np.tile(last_row.values, (num_to_pad, 1))
#     padded_df = pd.DataFrame(padding_data, columns=df.columns)
#     df = pd.concat([df, padded_df], axis=0)
#     df = df.reset_index(drop=True)

# small_dataset =df
# probs_list=[]
# for i in range(len(small_dataset)):

#     if i>=len(small_dataset)-num_consecutive_frames:
#         break


#     oridata=small_dataset.loc[i:i+num_consecutive_frames-1,:].copy()
#     oridata=oridata.reset_index(drop=True)
#     data=[]
#     # print(oridata)
#     if str(oridata.loc[0,'pos'])=='nan':
#             true_list.append(0)
#     elif str(oridata.loc[0,'pos'])=='top':
#             true_list.append(1)
#     elif str(oridata.loc[0,'pos'])=='bottom':
#             true_list.append(2)

#     for index,row in oridata.iterrows():

#         top=np.array(row['top']).reshape(-1,2)
#         bottom=np.array(row['bottom']).reshape(-1,2)
#         court=np.array(row['court']).reshape(-1,2)
#         # net=np.array(row['net']).reshape(-1,2)
#         ball=np.array(row['ball']).reshape(-1,2)

#         frame_data = np.concatenate((top, bottom, court, ball), axis=0)

#         if normalization:
#             # 将 x 坐标从 0 到 1920 映射到 1 到 2
#             frame_data[:,0] /=1920

#             # 将 y 坐标从 0 到 1080 映射到 1 到 2
#             frame_data[:,1] /=1080

#         data.append(frame_data.reshape(1,-1))
#     data=np.array(data).reshape(1,-1)
#     # print(data.shape)
#     # if i%num_consecutive_frames!=0:
#     #     final_list.append(0)
#     #     continue
#     outputs=model(torch.FloatTensor(data).to(device))

#     # print(outputs)
#     pred=torch.argmax(outputs).item()
#     probs=outputs[pred].item()
#     final_list.append(pred)

# print(true_list)
# print(final_list)
