In [1]:
#import psycopg2
import os
import json
import csv
import pandas as pd
from datetime import datetime as dt
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

Plan:
1) Data Collection
    - prase the JSON files
    - collect frames where there is only 1 person and the dance style is either ballet or tap
2) Feature Extraction
    - we have 17 keypoint coordinates we are looking at:
        - nose, R/L eye, R/L ear, R/L shoulder, R/L elbow, R/L wrist, R/L hip, R/L knee, R/L ankle
        - these coordinates will serve as our input data
3) Split the data into train/validation/test sets
4) Model
    - classification model

In [2]:
directory = '/Users/Shruti/Downloads/Dance_Dataset/densepose/txt'

#get directories without hidden files
directories = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
directories.sort()

#Setup dictionary to collect file names
all_file_names = {}

#Identify all file names
for d in directories:
    #get file names without hidden files
    files = [f for f in os.listdir(os.path.join(directory, d)) if os.path.isfile(os.path.join(directory, d, f))]
    files.sort()
    all_file_names[d] = files

In [3]:
for d in directories:
    if d == 'latin':
        print('latin')

latin


In [4]:
#Count the number of videos in each video type
#Determined by checking last number string on each image and
#checking if it matches '0001'
num_videos = {}
first_img_indexes = {}

for d in directories:
    video_count = 0
    num_images = len(all_file_names[d])
    for i in range(num_images):
        if all_file_names[d][i].split('.')[0].split('_')[-1] == '0001':
            video_count += 1
    num_videos[d] = video_count

In [5]:
#Print out summary results
print(f"{'dance':<9} | {'Num Images':<10} | {'Num Videos':<9}")
print("-"*35)
for k,v in all_file_names.items():
    print(f'{k:<9} | {len(v):<10} | {num_videos[k]}')

dance     | Num Images | Num Videos
-----------------------------------
ballet    | 22410      | 89
break     | 25622      | 95
cha       | 28098      | 98
flamenco  | 24755      | 88
foxtrot   | 23738      | 79
jive      | 29100      | 106
latin     | 24460      | 90
pasodoble | 26607      | 98
quickstep | 24036      | 82
rumba     | 27262      | 94
samba     | 25807      | 96
square    | 27453      | 97
swing     | 26337      | 95
tango     | 24020      | 80
tap       | 28541      | 95
waltz     | 24380      | 80


Function: Number of people in a frame

In [6]:
def person_count(file_name):
    '''Input a JSON file and get the number of people in each frame. '''
    f = open(file_name, "r")
    j = json.load(f)
    f.close

    people_count = len(j)

    return people_count

In [7]:
# Testing results, should return 5
peeps = person_count('/Users/Shruti/Downloads/Dance_Dataset/densepose/txt/ballet/-5Yp-vToI2E_016_0001.json')
peeps

5

Extract frames where the dance type is 'ballet' or 'tap' and there is just 1 person in the frame.

In [8]:
def person_count(file_name):
    '''Input a JSON file and get the number of people in each frame. '''
    f = open(file_name, "r")
    j = json.load(f)
    f.close

    people_count = len(j)

    return people_count

In [None]:
# Setup lists to collect file paths
selected_files_ballet = []
selected_files_tap = []
selected_files_latin = []

# Identify all file names
for d in directories:
    if d == 'ballet':
        # Get file names without hidden files
        files = [f for f in os.listdir(os.path.join(directory, d)) if os.path.isfile(os.path.join(directory, d, f))]
        files.sort()
        for file_name in files:
            file_path = os.path.join(directory, d, file_name)
            # Count people in each frame
            num_people = person_count(file_path)
            if num_people == 1:  # Check if there is only one person
                selected_files_ballet.append(file_path)
    elif d == 'tap':
        # Get file names without hidden files
        files = [f for f in os.listdir(os.path.join(directory, d)) if os.path.isfile(os.path.join(directory, d, f))]
        files.sort()
        for file_name in files:
            file_path = os.path.join(directory, d, file_name)
            # Count people in each frame
            num_people = person_count(file_path)
            if num_people == 1:  # Check if there is only one person
                selected_files_tap.append(file_path)
    elif d == 'latin':
        # Get file names without hidden files
        files = [f for f in os.listdir(os.path.join(directory, d)) if os.path.isfile(os.path.join(directory, d, f))]
        files.sort()
        for file_name in files:
            file_path = os.path.join(directory, d, file_name)
            # Count people in each frame
            num_people = person_count(file_path)
            if num_people == 1:  # Check if there is only one person
                selected_files_latin.append(file_path)

# Print the sizes of the selected files lists
print("Number of selected files for ballet:", len(selected_files_ballet))
print("Number of selected files for tap:", len(selected_files_tap))
print("Number of selected files for tap:", len(selected_files_latin))

In [None]:
print(selected_files_ballet[0])
print(selected_files_tap[0])
print(selected_files_latin[0])

Machine Learning Classification Model - Random Forest

In [None]:
def get_body_position_info(file_name):
    '''Input a JSON file and get the body position of each person.  Outer list is
    list of people.  Inner list is x,y position of each body part for that person'''
    f = open(file_name, "r")
    j = json.load(f)
    f.close
    people= []

    for i,person in enumerate(j):
        #Set up list to collect person's body location
        person_i = []
        count = 0

        #add each body part position to the list
        for body_part in person:
            count+=1
            if count == 1: continue
            else: person_i.append(body_part[1])

        #Append entire list to list of people
        people.append(person_i)

    return people

In [None]:
get_body_position_info(selected_files_latin[0])

In [None]:
def get_body_part_labels(file_name):
    '''Input a JSON file and get the body position of each person.  Outer list is
    list of people.  Inner list is x,y position of each body part for that person'''
    f = open(file_name, "r")
    j = json.load(f)
    f.close()
    
    # Extract body part labels from the first person entry in the JSON file
    body_part_labels = [body_part[0] for body_part in j[0][1:]]
   
    return body_part_labels

In [None]:
get_body_part_labels(selected_files_latin[0])

**Creating Feature Map**

In [None]:
# Concatenate the lists of selected files for ballet and tap dance styles
all_selected_files = selected_files_ballet + selected_files_tap + selected_files_latin

# Split data into training and testing sets
train_files, test_files = train_test_split(all_selected_files, test_size=0.2, random_state=42)

# Initialize lists to store feature data and corresponding labels
train_feature_data = []
train_feature_data_flattened = []

train_labels = []

# Iterate through each JSON file representing a frame in the training set
for file_path in train_files:
    # Get body position info for the single person in the frame
    body_positions = get_body_position_info(file_path)
    
    train_feature_data.append(body_positions)

    # Flatten the list of x, y positions
    flattened_positions = [coord for point in body_positions for coord in point]
    
    # Append the flattened positions to the feature data list
    train_feature_data_flattened.append(flattened_positions)
    
    # Determine the label based on the file path (e.g., ballet or tap)
    if file_path in selected_files_ballet:
        label = 'ballet'
    elif file_path in selected_files_tap:
        label = 'tap'
    elif file_path in selected_files_latin:
        label = 'latin'
    else:
        label = 'unknown'
    train_labels.append(label)

# Convert the training feature data and labels into numpy arrays
X_train = np.array(train_feature_data)
X_train_flattened = np.array(train_feature_data_flattened)
y_train = np.array(train_labels)

print("Number of frames (rows) in X_train:", len(X_train_flattened))
print("Number of features (columns) in X_train:", len(X_train_flattened[0]))
print("Number of labels in y_train:", len(y_train))

In [None]:
for row in X_train_flattened:
    print(row)

In [None]:
for row in y_train:
    print(row)

In [None]:
# Initialize lists to store feature data and corresponding labels for the testing set
test_feature_data = []
test_labels = []

# Iterate through each JSON file representing a frame in the testing set
for file_path in test_files:
    # Get body position info for the single person in the frame
    body_positions = get_body_position_info(file_path)
    
    # Flatten the list of x, y positions
    flattened_positions = [coord for point in body_positions for coord in point]
    
    # Append the flattened positions to the feature data list for the testing set
    test_feature_data.append(flattened_positions)

     # Determine the label based on the file path (e.g., ballet or tap)
    if file_path in selected_files_ballet:
        label = 'ballet'
    elif file_path in selected_files_tap:
        label = 'tap'
    elif file_path in selected_files_latin:
        label = 'latin'
    else:
        label = 'unknown'
    test_labels.append(label)

# Convert the testing feature data and labels into numpy arrays
X_test_flattened = np.array(test_feature_data)
y_test = np.array(test_labels)

print("Number of frames (rows) in X_test:", len(X_test_flattened))
print("Number of features (columns) in X_test:", len(X_test_flattened[0]))
print("Number of labels in y_test:", len(y_test))


In [None]:
# Define the body part labels
body_part_labels = [
    "nose_x", "nose_y", 
    "left_eye_x", "left_eye_y", 
    "right_eye_x", "right_eye_y", 
    "left_ear_x", "left_ear_y", 
    "right_ear_x", "right_ear_y", 
    "left_shoulder_x", "left_shoulder_y", 
    "right_shoulder_x", "right_shoulder_y", 
    "left_elbow_x", "left_elbow_y", 
    "right_elbow_x", "right_elbow_y", 
    "left_wrist_x", "left_wrist_y", 
    "right_wrist_x", "right_wrist_y", 
    "left_hip_x", "left_hip_y", 
    "right_hip_x", "right_hip_y", 
    "left_knee_x", "left_knee_y", 
    "right_knee_x", "right_knee_y", 
    "left_ankle_x", "left_ankle_y", 
    "right_ankle_x", "right_ankle_y"
]

# Create a dictionary to map body part labels to column indices
body_part_mapping = {label: i for i, label in enumerate(body_part_labels)}

# Access the x-coordinate of the left eye for all frames
left_eye_x_coordinates = X_train_flattened[:, body_part_mapping["left_eye_x"]]
print(left_eye_x_coordinates)
print(len(left_eye_x_coordinates))

# Access the y-coordinate of the right shoulder for all frames
right_shoulder_y_coordinates = X_train_flattened[:, body_part_mapping["right_shoulder_y"]]
# print(right_shoulder_y_coordinates)
print(len(right_shoulder_y_coordinates))

# Access both x and y coordinates of the nose for all frames
nose_coordinates = X_train_flattened[:, [body_part_mapping["nose_x"], body_part_mapping["nose_y"]]]
# print(nose_coordinates)
print(len(nose_coordinates))

In [None]:
print(X_train_flattened.shape)
print(X_train.shape)

In [None]:
df_train = pd.DataFrame(X_train.reshape(X_train.shape[0], -1), columns=body_part_labels)
df_train['label'] = y_train
df_train

**Model**

X (feature matrix): 
- the x,y positions of the different body parts for each frame in our dataset
- each row in the matrix corresponds to one frame
- each column represents either the x or y position of a specific body part
- there are a total of 17 attributes - nose, R/L eye, R/L ear, R/L shoulder, R/L elbow, R/L wrist, R/L hip, R/L knee, R/L ankle 
- our matrix has a total of 34 columns (17x2 for each x,y coordinate)
- our matrix has a total of 15429 rows (8351 ballet frames; 7078 tap frames)

Y (target variable)
- represents the output label, in our case the dance style associated with each frame (ballet OR tap)

In [None]:
X_train_flattened_reshaped = X_train_flattened.reshape(X_train_flattened.shape[0], -1)
X_test_flattened_reshaped = X_test_flattened.reshape(X_test_flattened.shape[0], -1)
print(X_train_flattened_reshaped.shape)
print(X_test_flattened_reshaped.shape)
print(y_train.shape)
print(y_test.shape)

In [None]:
X_train_flattened_reshaped = X_train_flattened.reshape(X_train_flattened.shape[0], -1)
X_test_flattened_reshaped = X_test_flattened.reshape(X_test_flattened.shape[0], -1)

# Initialize the Random Forest Classifier
model = RandomForestClassifier(n_estimators=100, random_state=42)

# Train the model
model.fit(X_train_flattened_reshaped, y_train)

# Make predictions on the test data
y_pred = model.predict(X_test_flattened_reshaped)

# Evaluate the model
test_accuracy = accuracy_score(y_test, y_pred)
print("Test Accuracy:", test_accuracy)

In [None]:
print(y_pred.shape)