In [13]:
import cv2
import mediapipe as mp
import numpy as np
import tensorflow as tf
import os

In [None]:
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistics

In [None]:
# Pose Extractor
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def draw_styled_landmarks(image, results):
    # 얼굴 랜드마크 그리기
    # mp_drawing.draw_landmarks(
    #     image, results.face_landmarks, mp.solutions.face_mesh.FACEMESH_TESSELATION, 
    #     mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
    #     mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
    # )
    # 포즈 랜드마크 그리기
    mp_drawing.draw_landmarks(
        image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
        mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
    )
    
    # 왼팔 랜드마크 그리기
    if results.pose_landmarks:
        left_arm = [
            mp_holistic.PoseLandmark.LEFT_SHOULDER,
            mp_holistic.PoseLandmark.LEFT_ELBOW,
            mp_holistic.PoseLandmark.LEFT_WRIST
        ]
        for landmark in left_arm:
            landmark_point = results.pose_landmarks.landmark[landmark]
            landmark_coords = (int(landmark_point.x * image.shape[1]), int(landmark_point.y * image.shape[0]))
            cv2.circle(image, landmark_coords, 5, (255,0,0), -1)  # 파란색으로 표시

    # 오른팔 랜드마크 그리기
    if results.pose_landmarks:
        right_arm = [
            mp_holistic.PoseLandmark.RIGHT_SHOULDER,
            mp_holistic.PoseLandmark.RIGHT_ELBOW,
            mp_holistic.PoseLandmark.RIGHT_WRIST
        ]
        for landmark in right_arm:
            landmark_point = results.pose_landmarks.landmark[landmark]
            landmark_coords = (int(landmark_point.x * image.shape[1]), int(landmark_point.y * image.shape[0]))
            cv2.circle(image, landmark_coords, 5, (0,0,255), -1)  # 빨간색으로 표시

    # 왼손 랜드마크 그리기
    mp_drawing.draw_landmarks(
        image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
        mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
        mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
    )
    # 오른손 랜드마크 그리기
    mp_drawing.draw_landmarks(
        image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
        mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
        mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
    )

In [None]:
cap = cv2.VideoCapture(video_path)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        image, results = mediapipe_detection(frame, holistic)
        draw_styled_landmarks(image, results)
        # cv2.imshow('OpenCV Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
pose = []
for res in results.pose_landmarks.landmark:
    test = np.array([res.x, res.y, res.z, res.visibility])
    pose.append(test)

In [None]:
def extract_keypoints(results):
	 # 포즈, 얼굴, 손 랜드마크 추출
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

    # 왼팔 랜드마크 추출
    left_arm = np.array([[results.pose_landmarks.landmark[landmark].x,
                          results.pose_landmarks.landmark[landmark].y,
                          results.pose_landmarks.landmark[landmark].z,
                          results.pose_landmarks.landmark[landmark].visibility]
                         for landmark in [mp_holistic.PoseLandmark.LEFT_SHOULDER,
                                          mp_holistic.PoseLandmark.LEFT_ELBOW,
                                          mp_holistic.PoseLandmark.LEFT_WRIST]]).flatten() if results.pose_landmarks else np.zeros(12)

    # 오른팔 랜드마크 추출
    right_arm = np.array([[results.pose_landmarks.landmark[landmark].x,
                           results.pose_landmarks.landmark[landmark].y,
                           results.pose_landmarks.landmark[landmark].z,
                           results.pose_landmarks.landmark[landmark].visibility]
                          for landmark in [mp_holistic.PoseLandmark.RIGHT_SHOULDER,
                                           mp_holistic.PoseLandmark.RIGHT_ELBOW,
                                           mp_holistic.PoseLandmark.RIGHT_WRIST]]).flatten() if results.pose_landmarks else np.zeros(12)

    # 최종 배열에 모든 랜드마크를 포함시킴
    final_landmarks = np.concatenate([pose, lh, rh, left_arm, right_arm])

    return final_landmarks

In [None]:
result_test = extract_keypoints(results)

# TFRecord

In [1]:
import os

# 현재 작업 디렉토리 확인
current_directory = os.getcwd()
print("현재 작업 디렉토리:", current_directory)

현재 작업 디렉토리: /home/seongmin/documents/A2I_Project/model


In [2]:
import os

# 경로 설정 (위의 방법 중 하나를 사용)
directory = r'/home/seongmin/documents/A2I_Project/data/volleyball signals'

dir_name = []

# 디렉토리 내의 모든 폴더 이름 가져오기
for folder_name in os.listdir(directory):
    full_path = os.path.join(directory, folder_name)
    if os.path.isdir(full_path):
        dir_name.append(folder_name)
        print(folder_name)

IllegalBlock
IllegalStart
PointRight
BeginServeLeft
PointLeft
BeginServeRight
IllegalBackRowAttack
TimeOutRight
IllegalHit
BallTouched
FourHits
LineViolation
Replay
OverNet
DoubleHit
TimeOutLeft
BallOut
BallIn
CourtChange
DelayofService
Substitution
NetFoul
EndofSet


In [14]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('../data/volleyball signals') 

# Actions that we try to detect
actions = np.array([
    "IllegalBlock",
    "IllegalStart",
    "PointRight",
    "BeginServeLeft",
    "PointLeft",
    "BeginServeRight",
    "IllegalBackRowAttack",
    "TimeOutRight",
    "IllegalHit",
    "BallTouched",
    "FourHits",
    "LineViolation",
    "Replay",
    "OverNet",
    "DoubleHit",
    "TimeOutLeft",
    "BallOut",
    "BallIn",
    "CourtChange",
    "DelayofService",
    "Substitution",
    "NetFoul",
    "EndofSet"
])

# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30 

# Path for tfrecords
TFRECORD_PATH = '../data/tfrecord/features.tfrecord'

In [3]:
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):  # if value is a tensor
        value = value.numpy()  # get the value of the tensor
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.encode_jpeg(value).numpy()]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [4]:
def create_tf_example(image, keypoints, label):
    feature = {
        'image': _bytes_feature(image),
        'keypoints': tf.train.Feature(float_list=tf.train.FloatList(value=keypoints)),
        'label': _int64_feature(label)
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))

In [5]:
def write_tfrecord(video_data, labels, output_path):
    with tf.io.TFRecordWriter(output_path) as writer:
        for idx, frames in enumerate(video_data):
            action = labels[idx]
            for frame in frames:
                image, results = mediapipe_detection(frame, holistic)
                keypoints = extract_keypoints(results)

                # Create TFRecord example
                tf_example = create_tf_example(frame, keypoints, action)
                writer.write(tf_example.SerializeToString())

In [6]:
def process_video_with_optical_flow(video_path, target_frame_count):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    
    cap.release()

    if len(frames) < target_frame_count:
        while len(frames) < target_frame_count:
            new_frames = []
            for i in range(len(frames) - 1):
                new_frames.append(frames[i])
                interpolated_frame = interpolate_frames(frames[i], frames[i + 1], 0.5)
                new_frames.append(interpolated_frame)
            new_frames.append(frames[-1])
            frames = new_frames[:target_frame_count]

    elif len(frames) > target_frame_count:
        indices = np.linspace(0, len(frames) - 1, target_frame_count).astype(int)
        frames = [frames[i] for i in indices]
    
    return frames

def interpolate_frames(frame1, frame2, interpolation_ratio):
    gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(gray1, gray2, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    
    h, w = frame1.shape[:2]
    flow_map = np.column_stack(np.mgrid[0:h, 0:w]).astype(np.float32)
    flow_map += flow * interpolation_ratio
    
    interpolated_frame = cv2.remap(frame1, flow_map, None, cv2.INTER_LINEAR)
    return interpolated_frame

In [8]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # 색상 변환
    image.flags.writeable = False
    results = model.process(image)  # 처리
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def draw_styled_landmarks(image, results):
    mp.solutions.drawing_utils.draw_landmarks(
        image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)

def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() \
        if results.pose_landmarks else np.zeros(33 * 4)
    return pose

In [9]:
def load_videos_from_directory(directory, target_frame_count=30):
    video_data = []
    labels = []
    for label in os.listdir(directory):
        label_dir = os.path.join(directory, label)
        if os.path.isdir(label_dir):
            for video_file in os.listdir(label_dir):
                video_path = os.path.join(label_dir, video_file)
                frames = process_video_with_optical_flow(video_path, target_frame_count)
                video_data.append(frames)
                labels.append(label)
    return video_data, labels

In [None]:
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    video_data, labels = load_videos_from_directory(DATA_PATH, target_frame_count=sequence_length)
    write_tfrecord(video_data, labels, TFRECORD_PATH)

# Preprocess Data and Create Labels and Features

In [15]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [16]:
label_map = {label:num for num, label in enumerate(actions)}

In [17]:
label_map

{'IllegalBlock': 0,
 'IllegalStart': 1,
 'PointRight': 2,
 'BeginServeLeft': 3,
 'PointLeft': 4,
 'BeginServeRight': 5,
 'IllegalBackRowAttack': 6,
 'TimeOutRight': 7,
 'IllegalHit': 8,
 'BallTouched': 9,
 'FourHits': 10,
 'LineViolation': 11,
 'Replay': 12,
 'OverNet': 13,
 'DoubleHit': 14,
 'TimeOutLeft': 15,
 'BallOut': 16,
 'BallIn': 17,
 'CourtChange': 18,
 'DelayofService': 19,
 'Substitution': 20,
 'NetFoul': 21,
 'EndofSet': 22}