# Imports

In [None]:
import torchaudio.transforms as T
import torch.nn as nn
import torch.nn.functional as F

import torchaudio

from sklearn.metrics import classification_report
import numpy as np
import os
import shutil
from tqdm import tqdm
import pyaudio
from dataclasses import dataclass
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
import torch


from djitellopy import Tello
import wave

from sklearn.neighbors import NearestNeighbors


# Data

In [None]:


# # Define the root directory and the target classes
# root_dir = '/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_augmented'
# classes = ['DESCER', 'DIREITA', 'ESQUERDA', 'FRENTE', 'SUBIR', 'TRAS']

# # Create the class directories if they don't exist
# for class_name in classes:
#     class_path = os.path.join(root_dir, class_name)
#     if not os.path.exists(class_path):
#         os.makedirs(class_path)

# # Iterate over each subdirectory in the root directory
# for subdir in os.listdir(root_dir):
#     subdir_path = os.path.join(root_dir, subdir)
#     if os.path.isdir(subdir_path):
#         # Iterate over each class directory in the subdirectory
#         for class_name in classes:
#             class_subdir_path = os.path.join(subdir_path, class_name)
#             if os.path.isdir(class_subdir_path):
#                 # Move all files from the class subdirectory to the corresponding target class directory
#                 for file_name in os.listdir(class_subdir_path):
#                     source_path = os.path.join(class_subdir_path, file_name)
#                     target_path = os.path.join(root_dir, class_name, file_name)
#                     shutil.move(source_path, target_path)


# Collect Audio

In [None]:
p = pyaudio.PyAudio()
for i in range(p.get_device_count()):
    info = p.get_device_info_by_index(i)
    print(f"Device {i}: {info['name']}")
p.terminate()

In [None]:
class RecordVoice:
    def __init__(self, 
                 chunk=1024, 
                 format=pyaudio.paInt16, 
                 channels=1, 
                 rate=44100, 
                 record_seconds=2.5,
                 device_index=4):
        
        self.chunk = chunk
        self.format = format
        self.channels = channels
        self.rate = rate
        self.record_seconds = record_seconds
        self.device_index = device_index
        self.frames = []
        self.p = pyaudio.PyAudio()

    def record(self,output_filename):
        stream = self.p.open(format=self.format,
                             channels=self.channels,
                             rate=self.rate,
                             input=True,
                             input_device_index=self.device_index,
                             frames_per_buffer=self.chunk)

        print("* recording")

        for i in range(0, int(self.rate / self.chunk * self.record_seconds)):
            data = stream.read(self.chunk)
            self.frames.append(data)

        print("* done recording")

        stream.stop_stream()
        stream.close()
        self.p.terminate()

        wf = wave.open(output_filename, 'wb')
        wf.setnchannels(self.channels)
        wf.setsampwidth(self.p.get_sample_size(self.format))
        wf.setframerate(self.rate)
        wf.writeframes(b''.join(self.frames))
        wf.close()

# Processing

In [None]:
model_name = "/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/models/asr_processor_augmented"

w2v_processor = Wav2Vec2Processor.from_pretrained(model_name)
#w2v_processor.save_pretrained("./wav2vec2-large-xlsr-53-portuguese")


In [None]:
MAX_LENGTH=30000

def pad_or_truncate(waveform):
        if waveform.shape[1] > MAX_LENGTH:
            waveform = waveform[:, :MAX_LENGTH]
        else:
            pad_length = MAX_LENGTH - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, pad_length))
        return waveform
    
def process_audio(audio_path):
    resampler = T.Resample(orig_freq=48000, new_freq=16000)

    waveform, sample_rate = torchaudio.load(audio_path)
    if sample_rate != 16000:
        waveform = resampler(waveform)
    waveform = pad_or_truncate(waveform)
    input_values = w2v_processor(waveform.squeeze().numpy(), sampling_rate=16000, return_tensors='pt').input_values
    return input_values, waveform


# Model

## Import model

- mini

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.conv1 = nn.Conv1d(1, 32, kernel_size=5)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=5)
        self.conv3 = nn.Conv1d(64, 128, kernel_size=5)

        self.fc1 = nn.Linear(39808, 128)  # Adjust the dimension based on your input size
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 16)

    def forward_once(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool1d(x, kernel_size=4)
        x = F.relu(self.conv2(x))
        x = F.max_pool1d(x, kernel_size=4)
        x = F.relu(self.conv3(x))
        x = F.max_pool1d(x, kernel_size=6)
        x = x.view(x.size()[0], -1)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2


- small

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.conv1 = nn.Conv1d(1, 64, kernel_size=10)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=10)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=10)


        self.fc1 = nn.Linear(52736, 512)  # Adjust the dimension based on your input size
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 32)

    def forward_once(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool1d(x, kernel_size=4)  # Increase pooling size
        x = F.relu(self.conv2(x))
        x = F.max_pool1d(x, kernel_size=6)  # Increase pooling size
        x = F.relu(self.conv3(x))
        x = F.max_pool1d(x, kernel_size=6)  # Increase pooling size
        x = x.view(x.size()[0], -1)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2



- base

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.conv1 = nn.Conv1d(1, 128, kernel_size=10)  # Increased the number of filters
        self.conv2 = nn.Conv1d(128, 256, kernel_size=10)  # Increased the number of filters
        self.conv3 = nn.Conv1d(256, 512, kernel_size=10)  # Increased the number of filters
        self.conv4 = nn.Conv1d(512, 1024, kernel_size=10)  # Added an additional convolutional layer

        self.fc1 = nn.Linear(32768, 1024)  # Adjust the dimension based on your input size
        self.fc2 = nn.Linear(1024, 512)  # Increased the number of neurons
        self.fc3 = nn.Linear(512, 256)  # Increased the number of neurons
        self.fc4 = nn.Linear(256, 32)  # Added an additional fully connected layer

    def forward_once(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool1d(x, kernel_size=4)
        x = F.relu(self.conv2(x))
        x = F.max_pool1d(x, kernel_size=6)
        x = F.relu(self.conv3(x))
        x = F.max_pool1d(x, kernel_size=6)
        x = F.relu(self.conv4(x))
        x = F.max_pool1d(x, kernel_size=6)
        x = x.view(x.size()[0], -1)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2


In [None]:
# Load the pre-trained model
model = SiameseNetwork().to('mps')
model_path = '/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/models/pipeline3_m2_pipe1asr_mini.pth'
model.load_state_dict(torch.load(model_path, map_location=torch.device('mps')))

## Vector Database

In [None]:
import os
import numpy as np
import torch

class VectorDatabase:
    def __init__(self, model):
        self.model = model
        self.data = np.array([])  # Encoded data
        self.labels = []  # Corresponding labels

    def encode_database(self, path,new_base_path):
        '''
        Receives a path to a folder containing wav files
        Encodes the audio files and saves them in a new folder
        '''
        if not os.path.exists(new_base_path):
            os.makedirs(new_base_path)
        
        for root, _, files in os.walk(path):
            for file in tqdm(files):
                if file.endswith('.wav'):
                    file_path = os.path.join(root, file)
                    encoded_data = self.encode(file_path)
                    
                    # Create new folder structure in the encoded folder
                    relative_path = os.path.relpath(root, path)
                    new_dir = os.path.join(new_base_path, relative_path)
                    if not os.path.exists(new_dir):
                        os.makedirs(new_dir)
                    
                    # Save the encoded numpy array
                    new_file_path = os.path.join(new_dir, file.replace('.wav', '.npy'))
                    np.save(new_file_path, encoded_data)

    def encode(self, path):
        input_values, waveform = process_audio(path)
        input_tensor = input_values.unsqueeze(0)  # Add batch dimension
        self.model.eval()
        input_tensor = input_tensor.to('mps')
        with torch.no_grad():
            output, _ = self.model(input_tensor, input_tensor)  # Duplicate input for the second branch
        output = output.cpu().numpy()[0]
        return output
        
    def load_database(self, path):
        '''
        Receives a path to a folder containing encoded files
        Loads the data in dictionary
        '''
        data = []
        labels = []
        
        for root, _, files in os.walk(path):
            for file in files:
                if file.endswith('.npy'):
                    file_path = os.path.join(root, file)
                    encoded_data = np.load(file_path)
                    
                    # Extract the label from the directory name
                    label = os.path.basename(root)
                    
                    data.append(encoded_data)
                    labels.append(label)
        
        self.data = np.array(data)
        self.labels = labels

In [None]:
test = VectorDatabase(model)
data_to_augment = '/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_augmented'
new_base_path = '/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_augmented_encoded_mini_model'
test.encode_database(data_to_augment,new_base_path)

In [None]:
test = VectorDatabase(model)
test.encode('/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/voice.wav')

In [None]:
class KNNModel:
    def __init__(self, vector_database, n=35):
        self.vector_database = vector_database
        self.data = vector_database.data
        self.labels = vector_database.labels
        
        self.knn = NearestNeighbors(n_neighbors=n, metric='euclidean')
        self.knn.fit(self.data)

    def search(self, query_path):
        '''
        Receives a query path
        Returns the labels of the most similar vectors
        '''
        # Encode the query audio
        if isinstance(query_path,str):
            query_vector = self.vector_database.encode(query_path).reshape(1, -1)
        elif isinstance(query_path,np.ndarray):
            query_vector = query_path.reshape(1,-1)
        
        # Find the n nearest neighbors
        distances, indices = self.knn.kneighbors(query_vector)
        
        # Get the labels of the nearest neighbors
        nearest_labels = [self.labels[i] for i in indices[0]]
        
        return nearest_labels

    def make_inference(self, query_path):
        '''
        Performs k-neighbors to make an inference
        Count the majority of the neighbors and return an inference
        '''
        search_results = self.search(query_path)
        count = {}
        
        for label in search_results:
            count[label] = count.get(label, 0) + 1
        
        return max(count, key=count.get)


In [None]:
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

class KNNModel:
    def __init__(self, vector_database, n=1, test_size=0.2, random_state=42):
        self.vector_database = vector_database
        self.data = vector_database.data
        self.labels = vector_database.labels

        # Split data into training and testing sets
        self.train_data, self.test_data, self.train_labels, self.test_labels = train_test_split(
            self.data, self.labels, test_size=test_size, random_state=random_state
        )

        self.knn = KNeighborsClassifier(n_neighbors=n, weights='distance', metric='euclidean')
        self.knn.fit(self.train_data, self.train_labels)

    def search(self, query_path):
        '''
        Receives a query path
        Returns the labels of the most similar vectors
        '''
        # Encode the query audio
        if isinstance(query_path, str):
            query_vector = self.vector_database.encode(query_path).reshape(1, -1)
        elif isinstance(query_path, np.ndarray):
            query_vector = query_path.reshape(1, -1)

        # Find the n nearest neighbors
        distances, indices = self.knn.kneighbors(query_vector, return_distance=True)

        # Get the labels of the nearest neighbors
        nearest_labels = [self.train_labels[i] for i in indices[0]]

        return nearest_labels

    def make_inference(self, query_path,print_count = False):
        '''
        Performs k-neighbors to make an inference
        Count the majority of the neighbors and return an inference
        '''
        search_results = self.search(query_path)
        count = {}

        for label in search_results:
            count[label] = count.get(label, 0) + 1

        if print_count:
            print(count)
        return max(count, key=count.get)

    def test_model(self):
        '''
        Tests the model on the test data and returns a classification report
        '''
        predictions = self.knn.predict(self.test_data)
        report = classification_report(self.test_labels, predictions)
        return report


In [None]:
vector_database = VectorDatabase(model)
vector_database.load_database('/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_augmented_encoded_mini_model')

In [None]:
knn = KNNModel(vector_database,n=15)
knn.make_inference('/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/voice.wav',print_count = True)

In [None]:
for n in range(1,30,2):
    print('N: ',n)
    knn = KNNModel(vector_database, n)
    print(knn.test_model())
    print('\n')
    

# Drone 

In [None]:
class DroneController:
    def __init__(self,drone):
        self.drone = drone
        
    def control_drone(self, command):
        if command == "SUBIR":
            self.drone.takeoff()
            self.drone.move_up(70)
        elif command == "DESCER":
            self.drone.move_down(70)
            self.drone.land()
        elif command == "ESQUERDA":
            self.drone.move_left(70)
        elif command == "DIREITA":
            self.drone.move_right(70)
        elif command == "FRENTE":
            self.drone.move_forward(70)
        elif command == "TRAS":
            self.drone.move_back(70)
        else:
            print("Unknown command")

        self.drone.end()

# Pipeline

In [None]:
drone = Tello()
#drone.connect()

In [None]:
recorder = RecordVoice()
vector_database = VectorDatabase(model)
vector_database.load_database('/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_encoded')
knn = KNNModel(vector_database)
controller = DroneController(drone)



In [None]:
path = '/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/voice.wav'
recorder = RecordVoice()
recorder.record(path)
knn_text = knn.make_inference(path, 10)
print('Commmand: ',knn_text)
#controller.control_drone(knn_text)


In [None]:
knn_text

# Metrics

## Accuracy

In [None]:
vector_database = VectorDatabase(model)
vector_database.load_database('/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_augmented_encoded_mini_model')
knn = KNNModel(vector_database,n=1)

In [None]:
import glob
import time
base_directory = '/Users/luccaemmanuel/Desktop/BIA/FunctionCalling/Dados/Dados_wav_augmented'
predictions = []
labels = []
inference_times = []
for label in os.listdir(base_directory):
    folder_path = os.path.join(base_directory, label)
    if os.path.isdir(folder_path):
        for audio_file in tqdm(glob.glob(os.path.join(folder_path, '*.wav'))):
            # calculate time of inference
            start = time.time()
            
            prediction = knn.make_inference(audio_file)
            predictions.append(prediction)
            
            end = time.time()
            inference_times.append(end - start)
            labels.append(label)


In [None]:
# Compute report
report = classification_report(labels, predictions)
print("Classification Report:")
print(report)

print('Mean inference time: ', np.mean(inference_times))
print('Std inference time: ', np.std(inference_times))

In [None]:
n = 3:
    accuracy: 0.92

n = 5
    accuracy: 0.84

n = 35 
    accuracy: 0.48