In [None]:
import os
import cv2
import itertools
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from glob import glob
from PIL import Image
import warnings
warnings.filterwarnings('ignore')

# Pytorch
import torch
from torch import nn, optim
from torch.autograd import Variable
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms

# Scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

# Pacotes para o relatório de hardware
import gc
import types
import pkg_resources
# import pytorch_lightning as pl

# Seed para reproduzir os mesmos resultados
np.random.seed(10)
torch.manual_seed(10)
torch.cuda.manual_seed(10)

In [None]:
processing_device = "cuda" if torch.cuda.is_available() else "cpu"

# Verificando se GPU pode ser usada (isso depende da plataforma CUDA estar instalada)
torch_aval = torch.cuda.is_available()

# Labels para o relatório de verificação
lable_1 = 'Visão Geral do Ambiente'
lable_2 = 'Se NVIDIA-SMI não for encontrado, então CUDA não está disponível'
lable_3 = 'Fim da Checagem'

# Função para verificar o que está importado nesta sessão
def get_imports():

    for name, val in globals().items():
        if isinstance(val, types.ModuleType):
            name = val.__name__.split(".")[0]

        elif isinstance(val, type):
            name = val.__module__.split(".")[0]

        poorly_named_packages = {"PIL": "Pillow", "sklearn": "scikit-learn"}

        if name in poorly_named_packages.keys():
            name = poorly_named_packages[name]

        yield name

# Imports nesta sessão
imports = list(set(get_imports()))

# Loop para verificar os requerimentos
requirements = []
for m in pkg_resources.working_set:
    if m.project_name in imports and m.project_name!="pip":
        requirements.append((m.project_name, m.version))

# Pasta com os dados (quando necessário)
pasta_dados = r'dados'

print(f'{lable_1:-^100}')
print()
print(f"Device:", processing_device)
print(f"Pasta de Dados: ", pasta_dados)
print(f"Versões dos Pacotes Requeridos: ", requirements)
print(f"Dispositivo Que Será Usado Para Treinar o Modelo: ", processing_device)
print(f"CUDA Está Disponível? ", torch_aval)
print("Versão do PyTorch: ", torch.__version__)
print()
print(f'{lable_2:-^100}\n')
!nvidia-smi
gc.collect()
print()
print(f"Limpando a Memória da GPU (se disponível): ", torch.cuda.empty_cache())
print("\nModelo da GPU:")
# Modelo da GPU usada
!nvidia-smi --query-gpu=name --format=csv,noheader
print(f'\n{lable_3:-^100}')

# Import dos Dados

In [None]:
!apt-get install unrar

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
 print('User uploaded file "{name}" with length {length} bytes'.format(
     name=fn, length=len(uploaded[fn])))

In [None]:
from google.cloud import storage
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'/content/projetos-aleatorios-379913-61df4a1c249e.json'

In [None]:
storage_client = storage.Client()

In [None]:
bucket_name = 'projeto_musical'
csv_file_name = 'labels.csv'
local_file_path = '/content/' + csv_file_name

# Cria um objeto bucket
bucket = storage_client.get_bucket(bucket_name)

# Cria um objeto blob
blob = bucket.blob(csv_file_name)

# Baixa o arquivo para o ambiente local
blob.download_to_filename(local_file_path)

# Agora o arquivo CSV está salvo localmente e você pode lê-lo usando pandas
df = pd.read_csv(local_file_path)
print(df.head())

In [None]:
bucket_name = 'projeto_musical'
rar_file_name = 'imagens_audio.rar'
local_rar_path = '/content/' + rar_file_name

# Define o bucket e o blob
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(rar_file_name)

#Baixa o arquivo RAR para o ambiente local do Colab
blob.download_to_filename(local_rar_path)

# escompacta o arquivo RAR
!mkdir /content/unpacked
!unrar x {local_rar_path} /content/unpacked/

# Modelo

In [None]:
image_folder = 'unpacked/imagens_audio/melspectrogram_224/'

In [None]:
def list_files_in_directory(directory_path):
    try:
        files = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
        return files
    except Exception as e:
        return f"An error occurred: {e}"

In [None]:
parsed_files = list_files_in_directory('unpacked/imagens_audio/melspectrogram_224/')
parsed_files_base = [os.path.splitext(f)[0] for f in parsed_files]

In [None]:
def resize_and_save_images(input_folder, output_folder, new_size=(224, 224)):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder, exist_ok=True)

    # Ajuste aqui para verificar os arquivos corretamente
    parsed_files = list_files_in_directory(output_folder)
    parsed_files_base = [os.path.splitext(f)[0] for f in parsed_files]

    image_paths = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith(('.png', '.jpg', '.jpeg')) and os.path.splitext(f)[0] not in parsed_files_base]

    for path in tqdm(image_paths):
        try:
            img = cv2.imread(path)
            img_resized = cv2.resize(img, new_size)
            base_name = os.path.basename(path)
            save_path = os.path.join(output_folder, base_name)
            cv2.imwrite(save_path, img_resized)
        except Exception as e:
            print(f"An error occurred while processing {path}: {e}")

In [None]:
resize_and_save_images('unpacked/imagens_audio/melspectrogram/', 'unpacked/imagens_audio/melspectrogram_224/', new_size=(224, 224))

In [None]:
labels = pd.read_csv('labels.csv')
labels['file_name'] = labels['file_name'].apply(lambda x: f"{x.split('.')[0]}.png")
labels['file_path'] = labels['file_name'].apply(lambda x: os.path.join(image_folder, x))
labels['chord_idx'] = pd.Categorical(labels['chord']).codes
def filter_rows(row):
    clean_prefix = row['clean'].split('_')[0] if '_' in row['clean'] else row['clean']
    chord_prefix = row['chord'].split('\\')[0]
    return clean_prefix == chord_prefix or chord_prefix in row['clean']

# Aplicar o filtro
labels = labels[labels.apply(filter_rows, axis=1)].reset_index()

In [None]:
def calcula_img_mean_std(image_paths):

    # Define altura e largura que usaremos nas imagens
    img_h, img_w = 224, 224

    # Listas de controle
    imgs = []
    means, stdevs = [], []

    # Loop de leitura e resize das imagens
    for i in tqdm(range(len(image_paths))):
        img = cv2.imread(image_paths[i])
        img = cv2.resize(img, (img_h, img_w))
        imgs.append(img)

    # Stack de imagens
    imgs = np.stack(imgs, axis=3)
    print(imgs.shape)

    # Normalização
    imgs = imgs.astype(np.float32) / 255.

    # Loop de cálculo da média e desvio
    for i in range(3):
        pixels = imgs[:, :, i, :].ravel()
        means.append(np.mean(pixels))
        stdevs.append(np.std(pixels))

    # BGR --> RGB
    means.reverse()
    stdevs.reverse()

    print("normMean = {}".format(means))
    print("normStd = {}".format(stdevs))

    return means, stdevs

In [None]:
normMean, normStd = calcula_img_mean_std(labels['file_path'])

In [None]:
y = labels['chord_idx']
_, df_validacao = train_test_split(labels, test_size = 0.2, random_state = 101, stratify = y)

In [None]:
df_validacao.shape

In [None]:
df_validacao['chord_idx'].value_counts()

In [None]:
# Esta função identifica se uma imagem faz parte do conjunto train ou val
def get_val_rows(x):
    val_list = list(df_validacao['clean'])
    if str(x) in val_list:
        return 'val'
    else:
        return 'train'

In [None]:
# Identifica treino ou validação
labels['train_or_val'] = labels['clean']
labels['train_or_val'] = labels['train_or_val'].apply(get_val_rows)

In [None]:
# Filtra as linhas de treino
df_treino = labels[labels['train_or_val'] == 'train']

In [None]:
print(len(df_treino))
print(len(df_validacao))

In [None]:
df_treino['chord_idx'].value_counts()

In [None]:
df_validacao['chord_idx'].value_counts()

In [None]:
class_counts = df_treino['chord_idx'].value_counts()
max_instances = class_counts.max()
data_aug_rate = max_instances // class_counts - 1
data_aug_rate = data_aug_rate.apply(lambda x: max(x, 0))

In [None]:
augmented_data = []

for i in range(373):

    if data_aug_rate.to_list()[i] > 0:
        class_subset = df_treino[df_treino['chord_idx'] == i]
        augmented_subset = pd.DataFrame(np.repeat(class_subset.values, data_aug_rate.to_list()[i], axis=0))
        augmented_subset.columns = class_subset.columns
        augmented_data.append(augmented_subset)

df_treino = pd.concat([df_treino] + augmented_data, ignore_index=True)

In [None]:
df_treino['chord_idx'].value_counts()

In [None]:
# Podemos dividir o conjunto de validação em um conjunto de validação e um conjunto de teste
df_validacao, df_teste = train_test_split(df_validacao, test_size = 0.5)

In [None]:
# Reset do índice
df_validacao = df_validacao.reset_index()
df_teste = df_teste.reset_index()

In [None]:
df_validacao.shape

In [None]:
df_teste.shape

### Funções do modelo

In [None]:
# feature_extracting é um booleano que define se estamos fazendo um ajuste fino ou extração de recursos.
# Se feature_extracting = False, o modelo é ajustado e todos os parâmetros do modelo são atualizados.
# Se feature_extracting = True, apenas os parâmetros da última camada são atualizados, os outros permanecem fixos.
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [None]:
# Função para inicializar diferentes arquiteturas de Deep Learning
def inicializa_modelo(model_name, num_classes, feature_extract, use_pretrained = True):

    model_ft = None
    input_size = 0

    # Usaremos o modelo resnet50
    if model_name == "resnet":

        # Tamanho (pixels) das imagens de entrada
        input_size = 224

        # Carregamos o modelo pré-treinado com todos os pesos
        model_ft = models.resnet50(pretrained = use_pretrained)

        # Treinamos o modelo e atualizamos os pesos durante o treinamento
        set_parameter_requires_grad(model_ft, feature_extract)

        # Define o número de atributos de entrada
        num_ftrs = model_ft.fc.in_features

        # Camada linear final para prever a probabilidade das 7 classes com as quais estamos trabalhando
        model_ft.fc = nn.Linear(num_ftrs, num_classes)

    # Usaremos o modelo Densenet121
    elif model_name == "densenet":

        # Tamanho (pixels) das imagens de entrada
        input_size = 224

        # Carregamos o modelo pré-treinado com todos os pesos
        model_ft = models.densenet121(pretrained = use_pretrained)

        # Treinamos o modelo e atualizamos os pesos durante o treinamento
        set_parameter_requires_grad(model_ft, feature_extract)

        # Define o número de atributos de entrada
        num_ftrs = model_ft.classifier.in_features

        # Camada linear final para prever a probabilidade das 7 classes com as quais estamos trabalhando
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)

    # Usaremos o Inception V3
    elif model_name == "inception":

        # Tamanho (pixels) das imagens de entrada
        # Tenha cuidado, pois espera-se (299 x 299) para o tamanho das imagens e ainda tem saída auxiliar
        input_size = 299

        # Carregamos o modelo pré-treinado com todos os pesos
        model_ft = models.inception_v3(pretrained = use_pretrained)

        # Treinamos o modelo e atualizamos os pesos durante o treinamento
        set_parameter_requires_grad(model_ft, feature_extract)

        # Tratando a auxilary net da arquitetura Inceptio
        model_ft.aux_logits = False

        # Tratando a primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)

    else:
        print("Modelo inválido...")
        exit()

    return model_ft, input_size

In [None]:
# Defina um organizador de dados para modelo PyTorch
class OrganizaDados(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        X = Image.open(self.df['file_path'][index]).convert('RGB')
        y = torch.tensor(int(self.df['chord_idx'][index]))

        if self.transform:
          X = np.array(X)
          X = np.ascontiguousarray(X)
          X = Image.fromarray(X)
          X = self.transform(X)

        return X, y

In [None]:
# del loader_treino
# torch.cuda.empty_cache()

In [None]:
# Função para calcular erro em treino e validação durante o treinamento
class CalculaMetricas(object):

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [None]:
total_loss_train, total_acc_train = [],[]
# Função de treino do modelo
def treina_modelo(treino_loader, model, criterion, optimizer, epoch):

    # Coloca o modelo em modo de treino
    model.train()

    # Inicializa objetos de cálculo de métricas
    train_loss = CalculaMetricas()
    train_acc = CalculaMetricas()

    # Iteração
    curr_iter = (epoch - 1) * len(treino_loader)

    # Loop de treino
    for i, data in enumerate(treino_loader):

        # Extra os dados
        images, labels = data

        # Tamanho da imagem
        N = images.size(0)

        # Coloca imagens e labels no device
        images = Variable(images).to(device)
        labels = Variable(labels).to(device)

        # Zera os gradientes
        optimizer.zero_grad()

        # Previsão do modelo
        outputs = model(images)

        # Erro do modelo
        loss = criterion(outputs, labels)

        # Backpropagation
        loss.backward()
        optimizer.step()

        # Obtem a previsão de maior probabilidade
        prediction = outputs.max(1, keepdim = True)[1]

        # Atualiza as métricas
        train_acc.update(prediction.eq(labels.view_as(prediction)).sum().item()/N)
        train_loss.update(loss.item())

        # Iteração
        curr_iter += 1

        # Print e update das métricas
        # A condição *** and curr_iter < 1000 *** pode ser removida se você quiser treinar com o dataset completo
        if (i + 1) % 100 == 0 and curr_iter < 1000:
            print('[epoch %d], [iter %d / %d], [train loss %.5f], [train acc %.5f]' % (epoch,
                                                                                       i + 1,
                                                                                       len(treino_loader),
                                                                                       train_loss.avg,
                                                                                       train_acc.avg))
            total_loss_train.append(train_loss.avg)
            total_acc_train.append(train_acc.avg)

    return train_loss.avg, train_acc.avg

In [None]:
total_loss_val, total_acc_val = [],[]
# Função para validação
def valida_modelo(val_loader, model, criterion, optimizer, epoch):

    # Coloca o modelo em modo de validação
    model.eval()

    # Inicializa objetos de cálculo de métricas
    val_loss = CalculaMetricas()
    val_acc = CalculaMetricas()

    # Validação
    with torch.no_grad():
        for i, data in enumerate(val_loader):

            images, labels = data

            N = images.size(0)

            images = Variable(images).to(device)

            labels = Variable(labels).to(device)

            outputs = model(images)

            prediction = outputs.max(1, keepdim = True)[1]

            val_acc.update(prediction.eq(labels.view_as(prediction)).sum().item()/N)

            val_loss.update(criterion(outputs, labels).item())

    print('------------------------------------------------------------')
    print('[epoch %d], [val loss %.5f], [val acc %.5f]' % (epoch, val_loss.avg, val_acc.avg))
    print('------------------------------------------------------------')

    return val_loss.avg, val_acc.avg

## Inicializando o Modelo

In [None]:
# Modelo que será treinado
#nome_modelo = 'densenet'
nome_modelo = 'resnet'
#nome_modelo = 'inception'

In [None]:
num_classes = 373

In [None]:
# Vamos treinar o modelo e sempre atualizar os pesos
feature_extract = False

In [None]:
# Inicializa o modelo
model_ft, input_size = inicializa_modelo(nome_modelo, num_classes, feature_extract, use_pretrained = False)

In [None]:
device = processing_device

In [None]:
# Coloca o modelo no device
model = model_ft.to(device)

In [None]:
transform_treino = transforms.Compose([#transforms.Resize((input_size,input_size)),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.RandomVerticalFlip(),
                                       transforms.RandomRotation(20),
                                       transforms.ColorJitter(brightness = 0.1, contrast = 0.1, hue = 0.1),
                                       transforms.ToTensor(), transforms.Normalize(normMean, normStd)
                                       ])

In [None]:
normMeanVal, normStdVal = calcula_img_mean_std(df_validacao['file_path'])

In [None]:
transform_val = transforms.Compose([#transforms.Resize((input_size,input_size)),
                                    transforms.ToTensor(),
                                    transforms.Normalize(normMeanVal, normStdVal)])

## Carregando Dataloader

In [None]:
del loader_treino
del loader_val
del loader_teste
del model_ft
torch.cuda.empty_cache()

In [None]:
set_treino = OrganizaDados(df_treino, transform = transform_treino)

In [None]:
loader_treino = DataLoader(set_treino, batch_size = 32, shuffle = True, num_workers = 4, persistent_workers=False)

In [None]:
set_val = OrganizaDados(df_validacao, transform = transform_val)
loader_val = DataLoader(set_val, batch_size = 32, shuffle = False, num_workers = 4)

In [None]:
set_teste = OrganizaDados(df_teste, transform = transform_val)
loader_teste = DataLoader(set_teste, batch_size = 32, shuffle = False, num_workers = 4)

In [None]:
optimizer = optim.Adam(model.parameters(), lr = 1e-3)

In [None]:
criterion = nn.CrossEntropyLoss().to(device)

## Treinamento

In [None]:
# Hiperparâmetros
epoch_num = 10
best_val_acc = 0

In [None]:
%%time
for epoch in range(1, epoch_num + 1):

    # Execute a função de treino
    loss_train, acc_train = treina_modelo(loader_treino, model, criterion, optimizer, epoch)

    # Executa a função de validação
    loss_val, acc_val = valida_modelo(loader_val, model, criterion, optimizer, epoch)

    # Calcula as métricas
    total_loss_val.append(loss_val)
    total_acc_val.append(acc_val)

    # Verifica a acurácia em validação
    if acc_val > best_val_acc:
        best_val_acc = acc_val
        print('*****************************************************')
        print('Melhor Resultado: [epoch %d], [val loss %.5f], [val acc %.5f]' % (epoch, loss_val, acc_val))
        print('*****************************************************')

        torch.save(model.state_dict(), f'{nome_modelo}_model_{epoch}.pth')

In [None]:
# Plot
fig = plt.figure(num = 2)
fig1 = fig.add_subplot(2,1,1)
fig2 = fig.add_subplot(2,1,2)
fig1.plot(total_loss_train, label = 'Erro em Treino')
fig1.plot(total_acc_train, label = 'Acurácia em Treino')
fig2.plot(total_loss_val, label = 'Erro em Validação')
fig2.plot(total_acc_val, label = 'Acurácia em Validação')
plt.legend()
plt.show()

In [None]:
# Função de plot da confusion_matrix
def plot_confusion_matrix(cm,
                          classes,
                          normalize = False,
                          title = 'Confusion matrix',
                          cmap = plt.cm.Blues):

    plt.imshow(cm, interpolation = 'nearest', cmap = cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.

    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment = "center",
                 color = "white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('Label Real')
    plt.xlabel('Label Previsto')

In [None]:
# Avaliação do modelo com dados de teste
model.eval()
y_label = []
y_predict = []
with torch.no_grad():
    for i, data in enumerate(loader_teste):
        images, labels_2 = data
        N = images.size(0)
        images = Variable(images).to(device)
        outputs = model(images)
        prediction = outputs.max(1, keepdim = True)[1]
        y_label.extend(labels_2.cpu().numpy())
        y_predict.extend(np.squeeze(prediction.cpu().numpy().T))

In [None]:
# Cria a confusion matrix
confusion_mtx = confusion_matrix(y_label, y_predict)

In [None]:
# Plot da confusion matrix
plot_labels = labels['chord'].unique().tolist()
# plot_confusion_matrix(confusion_mtx, plot_labels)

In [None]:
# Gera o relatório de classificação
report = classification_report(y_label, y_predict, target_names = plot_labels)
print(report)

In [None]:
# Plot de erros por classe
label_frac_error = 1 - np.diag(confusion_mtx) / np.sum(confusion_mtx, axis = 1)
plt.bar(np.arange(373),label_frac_error)
plt.xlabel('Label Real')
plt.ylabel('Classificação Incorreta')