In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
%cd gdrive/My Drive/CPS Final Proj

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
[Errno 2] No such file or directory: 'gdrive/My Drive/CPS Final Proj'
/content/gdrive/My Drive/CPS Final Proj


In [None]:
import os
import pandas as pd
import numpy as np


In [None]:

os.chdir("data/")
os.listdir()

FileNotFoundError: ignored

In [None]:
df = pd.read_csv('shields.csv')
shields = np.asarray(df)

In [None]:
print(shields.shape)

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import plotly.express as px
import plotly.graph_objects as go
from plotly.offline import plot



def read_csv(file_path):
    data = pd.read_csv(file_path, header=None, names=['sequence_num', 'x', 'y', 'z'])
    return data

def plot_point_cloud_matplotlib(data, sequence_num):
    sequence_data = data[data['sequence_num'] == sequence_num]
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(sequence_data['x'], sequence_data['y'], sequence_data['z'], s=50, alpha=0.6, edgecolors='w', depthshade=True)
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    plt.show()

def plot_point_cloud_plotly(data, sequence_num):
    sequence_data = data[data['sequence_num'] == sequence_num]
    fig = px.scatter_3d(sequence_data, x='x', y='y', z='z', opacity=0.6)
    fig.update_layout(scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z'))
    fig.show()

def plot_point_clouds_by_class_plotly(data_array, labels, target_class, aligned):
    target_indices = np.where(labels == target_class)[0]
    data = []

    for idx in target_indices:
        point_cloud = data_array[idx]
        x, y, z = point_cloud[:, 0], point_cloud[:, 1], point_cloud[:, 2]
        scatter = go.Scatter3d(x=x, y=y, z=z, mode='markers', marker=dict(size=2, opacity=0.6))
        data.append(scatter)

    layout = go.Layout(scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z'))
    fig = go.Figure(data=data, layout=layout)
    
    # Save the figure to an HTML file
    if(aligned):
      path = "3d_plot_class_aligned_" + str(target_class) + ".html"
    else:
      path = "3d_plot_class_" + str(target_class) + ".html"
    plot(fig, filename=path, auto_open=False)
    
    # Optionally return the figure object
    return fig

def plot_point_cloud_plotly_numpy(data, sequence_num):
    sequence_data = data[sequence_num]
    x, y, z = sequence_data[:, 0], sequence_data[:, 1], sequence_data[:, 2]
    fig = go.Figure(data=[go.Scatter3d(x=x, y=y, z=z, mode='markers', marker=dict(opacity=0.6))])
    fig.update_layout(scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z'))
    fig.show()

def convert_to_numpy_array(data):
    sequences = []
    for seq_num in data['sequence_num'].unique():
        sequence_data = data[data['sequence_num'] == seq_num]
        sequence_matrix = sequence_data[['x', 'y', 'z']].to_numpy()
        sequences.append(sequence_matrix)
    return np.array(sequences, dtype=object)

def average_sequence_length(arrays):
    lengths = [arr.shape[0] for arr in arrays]
    return int(np.mean(lengths))

def std_frames_per_sequence(arr):
    frame_counts = np.array([len(sequence) for sequence in arr])
    return np.std(frame_counts, ddof=1)

def plot_frame_sizes(arr):
    num_sequences = len(arr)
    frame_counts = [len(sequence) for sequence in arr]
    sequence_numbers = range(num_sequences)

    plt.bar(sequence_numbers, frame_counts)
    plt.xlabel('Sequence Number')
    plt.ylabel('Number of Frames')
    plt.title('Frame Count per Sequence')

    plt.show()

In [None]:
shields_data = read_csv("shields.csv")
#plot_point_cloud_plotly(shields_data, sequence_num=4)
shields_arr = convert_to_numpy_array(shields_data)
plot_frame_sizes(shields_arr)
avg_frames = average_sequence_length(shields_arr)
print("Average number of frames per sequence:", avg_frames)

In [None]:
stars_data = read_csv("stars.csv")
#plot_point_cloud_plotly(stars_data, sequence_num=19)
stars_arr = convert_to_numpy_array(stars_data)
plot_frame_sizes(stars_arr)
avg_frames = average_sequence_length(stars_arr)
print("Average number of frames per sequence:", avg_frames)

In [None]:
swords_data = read_csv("swords.csv")
#plot_point_cloud_plotly(swords_data, sequence_num=19)
swords_arr = convert_to_numpy_array(swords_data)
plot_frame_sizes(swords_arr)
avg_frames = average_sequence_length(swords_arr)
print("Average number of frames per sequence:", avg_frames)

In [None]:
print("STD shields: ", std_frames_per_sequence(shields_arr), " NP ", np.std(np.array([len(sequence) for sequence in shields_arr])))
print("STD swords: ", std_frames_per_sequence(swords_arr))
print("STD stars: ", std_frames_per_sequence(stars_arr))

In [None]:
def remove_short_sequences(arr):
    frame_counts = np.array([len(sequence) for sequence in arr])
    avg_frames = np.mean(frame_counts)
    std_dev = np.std(frame_counts, ddof=1)

    threshold = avg_frames - 2*std_dev
    filtered_arr = np.array([sequence for sequence in arr if len(sequence) >= threshold], dtype=object)

    return filtered_arr

swords_arr_filtered = remove_short_sequences(swords_arr)
shields_arr_filtered = remove_short_sequences(shields_arr)
stars_arr_filtered = remove_short_sequences(stars_arr)



In [None]:
#print("STD swords: ", std_frames_per_sequence(swords_arr_filtered), " NP ", np.std(np.array([len(sequence) for sequence in swords_arr_filtered])))
print("Original swords sequence: ", swords_arr.size, " |  New swords sequence: ", swords_arr_filtered.size)
print("Original shields sequence: ", shields_arr.size, " |  New shields sequence: ", shields_arr_filtered.size)
print("Original stars sequence: ", stars_arr.size, " |  New stars sequence: ", stars_arr_filtered.size)

min_size = min(len(swords_arr_filtered), len(shields_arr_filtered), len(stars_arr_filtered))

# resize each array to the size of the smallest array
swords_arr_resized = np.resize(swords_arr_filtered, min_size)
shields_arr_resized = np.resize(shields_arr_filtered, min_size)
stars_arr_resized = np.resize(stars_arr_filtered, min_size)

print("New size swords: ", swords_arr_resized.size)
print("New size shields: ", shields_arr_resized.size)
print("New size stars: ", stars_arr_resized.size)

In [None]:
ran

In [None]:
from scipy.spatial.distance import cdist
def get_average_sequence_length(swords_arr, shields_arr, stars_arr):
  total_length = 0
  total_sequences = 0

  for gesture_arr in [swords_arr, shields_arr, stars_arr]:
      for sequence in gesture_arr:
          total_length += sequence.shape[0]
          total_sequences += 1

  return int(total_length / total_sequences)

def nearest_neighbor_resampling(sequence, target_length):
  current_length = sequence.shape[0]
  indices = np.round(np.linspace(0, current_length - 1, target_length)).astype(int)
  return sequence[indices]

def furthest_point_sampling(sequence, target_length):
  current_length = sequence.shape[0]
  remaining_indices = set(range(current_length))
  sampled_indices = [0]
  remaining_indices.remove(0)

  while len(sampled_indices) < target_length:
      distances = cdist(sequence[sampled_indices[-1]].reshape(1, -1), sequence[list(remaining_indices)])
      furthest_point = remaining_indices.pop(np.argmax(distances))
      sampled_indices.append(furthest_point)

  sampled_indices.sort()
  return sequence[sampled_indices]
  return array[np.array(sorted(selected_indices))]

def random_sampling(sequence, target_length):
  current_length = sequence.shape[0]
  indices = np.random.choice(current_length, target_length, replace=False)
  indices.sort()
  return sequence[indices]

def prepare_data(swords_arr, shields_arr, stars_arr, average_len):
  average_sequence_length = average_len
  total_samples = len(swords_arr) + len(shields_arr)+len(stars_arr)
  input_data = np.zeros((total_samples, average_len, 3))
  labels = np.zeros((total_samples))
  i = 0
  for gesture_arr, label in [(swords_arr, 0), (shields_arr, 1), (stars_arr, 2)]:
    for sequence in gesture_arr:
      current_length = sequence.shape[0]
      if current_length < average_sequence_length:
          resampled_sequence = nearest_neighbor_resampling(sequence, average_sequence_length)
      elif current_length > average_sequence_length:
          resampled_sequence = random_sampling(sequence, average_sequence_length)
      else:
          resampled_sequence = sequence
      
      input_data[i] = resampled_sequence
      labels[i] = label
      i = i + 1

  labels = np.array(labels)

  return input_data, labels


In [None]:
# Calculate the average sequence length
avg_length = (average_sequence_length(swords_arr_resized) + average_sequence_length(stars_arr_resized) + average_sequence_length(shields_arr_resized))/3
avg_length = int(avg_length)
print("Average length for sampling: ", avg_length)
print("Total number of samples: ", len(stars_arr_resized)+len(swords_arr_resized)+len(shields_arr_resized))
# Prepare the data for input into an SVM model
input_data, labels = prepare_data(swords_arr_resized, shields_arr_resized, stars_arr_resized, avg_length)
print("Compiled data: ", np.shape(input_data))
print("Compiled labels: ", np.shape(labels))

In [None]:
plot_point_cloud_plotly_numpy(input_data, 45)

In [None]:
pip install open3d


In [None]:
import open3d as o3d

# ICP Alignment
def preprocess_point_cloud(point_cloud):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(point_cloud)

    # Translate the centroid of the point cloud to the origin
    centroid = np.mean(point_cloud, axis=0)
    pcd.points = o3d.utility.Vector3dVector(np.asarray(pcd.points) - centroid)

    # Scale the point cloud to a fixed maximum size (e.g., 1 unit)
    max_size = 1.0
    distances = np.linalg.norm(np.asarray(pcd.points), axis=1)
    max_distance = np.max(distances)
    pcd.points = o3d.utility.Vector3dVector(np.asarray(pcd.points) * (max_size / max_distance))

    return pcd
def preprocess_point_cloud_np(point_cloud):
    # Translate the centroid of the point cloud to the origin
    centroid = np.mean(point_cloud, axis=0)
    point_cloud = point_cloud - centroid

    # Scale the point cloud to a fixed maximum size (e.g., 1 unit)
    max_size = 1.0
    distances = np.linalg.norm(point_cloud, axis=1)
    max_distance = np.max(distances)
    point_cloud = point_cloud * (max_size / max_distance)

    return point_cloud
def execute_global_registration(source, target, distance_threshold):
    result = o3d.pipelines.registration.registration_icp(source, target, distance_threshold,
                                                          np.identity(4),
                                                          o3d.pipelines.registration.TransformationEstimationPointToPoint(),
                                                          o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=50))
    return result

def align_point_clouds(data_array, labels, distance_threshold=1.5):
    unique_labels = np.unique(labels)
    aligned_data_array = []
    aligned_labels = []

    for label in unique_labels:
        class_indices = np.where(labels == label)[0]
        base_sample = data_array[class_indices[0]]
        print("here", base_sample.shape)
        base_pcd = preprocess_point_cloud_np(base_sample)
        aligned_class_samples = [base_sample]

        for idx in class_indices[1:]:
            sample = data_array[idx]
            sample_pcd = preprocess_point_cloud_np(sample)
            aligned_class_samples.append(np.asarray(sample_pcd))

        aligned_data_array.extend(aligned_class_samples)
        aligned_labels.extend([label] * len(aligned_class_samples))

    return np.array(aligned_data_array), np.array(aligned_labels)


aligned_data_array, aligned_labels = align_point_clouds(input_data, labels)
print(aligned_data_array.shape)

In [None]:
# Test alignment a couple random points
"""
random_numbers = np.random.randint(0, 88, 3)
for num in random_numbers:
  print("Original sample: ", num)
  plot_point_cloud_plotly_numpy(input_data, num)
  print("Label: ", labels[num], " (swords_arr, 0), (shields_arr, 1), (stars_arr, 2)")
  print("Aligned sample: ", num)
  plot_point_cloud_plotly_numpy(aligned_data_array, num)
  """

In [None]:
os.chdir("/content/gdrive/MyDrive/CPS Final Proj/plots")
plot_point_clouds_by_class_plotly(input_data,labels,1, False)

In [None]:

plot_point_clouds_by_class_plotly(aligned_data_array,aligned_labels,1, True)


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import cross_val_score


# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(aligned_data_array, aligned_labels, test_size=0.2, random_state=42)

print("X_train size pre reshape: ", X_train.shape)
sample = X_test[0]

In [None]:
# Data augmentation functions
def translation(points, max_translation):
    tx, ty, tz = np.random.uniform(-max_translation, max_translation, 3)
    return points + np.array([tx, ty, tz])

def rotation(points, max_angle):
    angle_x, angle_y, angle_z = np.random.uniform(-max_angle, max_angle, 3)
    rot_x = np.array([[1, 0, 0],
                      [0, np.cos(angle_x), -np.sin(angle_x)],
                      [0, np.sin(angle_x), np.cos(angle_x)]])
    rot_y = np.array([[np.cos(angle_y), 0, np.sin(angle_y)],
                      [0, 1, 0],
                      [-np.sin(angle_y), 0, np.cos(angle_y)]])
    rot_z = np.array([[np.cos(angle_z), -np.sin(angle_z), 0],
                      [np.sin(angle_z), np.cos(angle_z), 0],
                      [0, 0, 1]])
    rotation_matrix = rot_x @ rot_y @ rot_z
    return points @ rotation_matrix

def scaling(points, min_scale, max_scale):
    scale_factor = np.random.uniform(min_scale, max_scale)
    return points * scale_factor

def add_noise(points, noise_std_dev):
    noise = np.random.normal(0, noise_std_dev, points.shape)
    return points + noise

def apply_augmentations(points, num_augmentations, **kwargs):
    augmented_data = []

    for _ in range(num_augmentations):
        new_points = points.copy()
        new_points = translation(new_points, kwargs.get("max_translation", 0.1))
        new_points = rotation(new_points, kwargs.get("max_angle", np.radians(10)))
        new_points = scaling(new_points, kwargs.get("min_scale", 0.9), kwargs.get("max_scale", 1.1))
        new_points = add_noise(new_points, kwargs.get("noise_std_dev", 0.01))
        augmented_data.append(new_points)

    return np.array(augmented_data)

# Augment data
max_angle = np.radians(15)
min_scale = 0.8
max_scale = 1.2
noise_std_dev = 0.01
# Apply augmentations
num_augmentations = 10
augmented_data = []
for sample in X_train:
    sample_3d = sample.reshape(-1, 3)
    new_samples = apply_augmentations(sample_3d, num_augmentations=num_augmentations, max_angle=max_angle, min_scale=min_scale, max_scale=max_scale, noise_std_dev=noise_std_dev)
    augmented_data.extend(new_samples.reshape(-1, 459))
augmented_data = np.array(augmented_data)

# Create y_train_augmented and concatenate with y_train
y_train_augmented = np.repeat(y_train, num_augmentations)
y_train_combined = np.hstack((y_train, y_train_augmented))

# Combine X_train and augmented_data
X_train_augmented = np.vstack((X_train.reshape(X_train.shape[0], -1), augmented_data))
print("Augmented data size: ", X_train_augmented.shape)
print("Augmented label size: ", y_train_combined.shape)

In [None]:

# Flatten the 3D array into a 2D array of shape (num_samples, num_frames * 3)
#X_train = augmented_data.reshape((X_train.shape[0], -1))
# Augment data?
X_test = X_test.reshape(X_test.shape[0], -1)

# Train an SVM model
svm_model = SVC(kernel='linear', C=1.0)
# Perform 5-fold cross-validation on the training data
k = 5
scores = cross_val_score(svm_model, X_train_augmented, y_train_combined, cv=k)

# Calculate the average score
average_score = np.mean(scores)

print(f"Average cross-validation score: {average_score:.2f}")

# Train the model using the entire training set
svm_model.fit(X_train_augmented, y_train_combined)

# Evaluate the model on the test set
y_pred = svm_model.predict(X_test)
# Evaluate the model on the test set
y_pred = svm_model.predict(X_test)

# Compute the confusion matrix and the classification report
cm = confusion_matrix(y_test, y_pred)
cr = classification_report(y_test, y_pred)

# Compute the accuracy of the model
accuracy = svm_model.score(X_test, y_test)

# Print the confusion matrix, the classification report, and the accuracy
print("Confusion matrix:\n", cm)
print("Classification report:\n", cr)
print("Accuracy: {:.2f}%".format(accuracy * 100))

In [None]:
# Save model
import pickle

# Assume 'svm_model' is your trained SVM model
with open('svm_model.pkl', 'wb') as file:
    pickle.dump(svm, file)

from google.colab import files

files.download('svm_model.pkl')

In [None]:
print(X_test[0].shape)
print(y_test[0])

In [None]:
def srandom_sampling(sequence, target_length):
    current_length = sequence.shape[0]
    indices = np.random.choice(current_length, target_length, replace=False)
    indices.sort()I
    return sequence[indices]
def snearest_neighbor_resampling(sequence, target_length):
    current_length = sequence.shape[0]
    indices = np.round(np.linspace(0, current_length - 1, target_length)).astype(int)
    return sequence[indices]
def sprepare_data(arr, average_len):
    average_sequence_length = average_len
    i = 0
    current_length = arr.shape[0]
    if current_length < average_sequence_length:
        resampled_sequence = snearest_neighbor_resampling(arr, average_sequence_length)
    elif current_length > average_sequence_length:
        resampled_sequence = srandom_sampling(arr, average_sequence_length)
    else:
        resampled_sequence = arr
    return resampled_sequence

def spreprocess_point_cloud_np(point_cloud):
    # Translate the centroid of the point cloud to the origin
    centroid = np.mean(point_cloud, axis=0)
    point_cloud = point_cloud - centroid

    # Scale the point cloud to a fixed maximum size (e.g., 1 unit)
    max_size = 1.0
    distances = np.linalg.norm(point_cloud, axis=1)
    max_distance = np.max(distances)
    point_cloud = point_cloud * (max_size / max_distance)

    return point_cloud
my_list = np.random.rand(534)

inp = np.array(my_list)
inp = inp.reshape(-1, 3)
# add axis for preprocessing
# from pre processing of training data
target_len = 153
# pre process test data
print("Fixing length to 153, original length: ", str(inp.shape[0]))
inp = sprepare_data(inp, target_len)
print("New length: ", str(inp.shape[0]))
# align
print("Normalizing")
inp = spreprocess_point_cloud_np(inp)
# flatten
inp = inp.reshape((-1))
print("Flattened to: ", str(inp.shape))
# predict
print("Predicting")
svm_model.predict(inp.reshape((1, -1)))
print(str(np.shape(inp.reshape((1,-1)))))



In [None]:
# Test heroku app
import requests
import json
import random

# Set the random seed to ensure reproducibility
random.seed(42)

# Generate a random list of 579 numbers between 0 and 1.3 (inclusive)
my_list = [random.uniform(0, 1.3) for _ in range(579)]


# Replace this with your app's URL
url = "https://ar-recognition.herokuapp.com/predict"

# Replace this with your test dataset
"""
test_data = X_test[0].tolist()
print(len(test_data))
test_test = np.array(test_data)
test_test = test_test.reshape(-1, 3)
print(test_test.shape)
"""
# Send a POST request with the test dataset as a list
response = requests.post(url, json=my_list)

if response.status_code == 200:
    predictions = json.loads(response.text)
    print("Predictions:")
    for data_point, prediction in zip(my_list, predictions):
        print(f"Input data: {data_point}, Prediction: {prediction}")
else:
    print(f"Request failed with status code {response.status_code}: {response.text}")


In [None]:
svm.predict(np.array(test_data).reshape(1, -1))
print(sample.shape)