In [None]:

# 下载数据集，这边我放在阿里云oss上面了，由于下载流量需要付费，所以我就不开放了
!wget -O GTZAN.zip 'https://file.fishei.cn/GTZAN.zip' --no-check-certificate


In [None]:
# 解压
!unzip GTZAN


In [None]:
# 下载测试集数据
!wget -O GTZAN_TEST.zip 'https://file.fishei.cn/GTZAN_TEST.zip' --no-check-certificate

In [None]:
# 解压测试集数据
!unzip GTZAN_TEST.zip

In [None]:
# 安装torchsummary
!pip install torchsummary

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset
import torchaudio
from torch.utils.data import DataLoader
import pandas as pd
import os
from itertools import product
from collections import namedtuple
from collections import OrderedDict
from IPython.display import display,clear_output
import time
import json
from torchsummary import summary
import matplotlib.pyplot as plt

torch.set_printoptions(linewidth=120)

In [None]:
# 将标签文件的标签风格字母列表增加一列为数字格式
# 测试集
ANNOTATIONS_FILE = "./GTZAN_TEST/features_30_sec_test.csv"
dataframe = pd.read_csv(ANNOTATIONS_FILE)

labels = set()
for row in range(len(dataframe)):
    labels.add(dataframe.iloc[row, -1])
labels_list = []
for label in labels:
    labels_list.append(label)
sorted_labels = sorted(labels_list)
sorted_labels
mapping = {}
for index, label in enumerate(sorted_labels):
    mapping[label] = index
dataframe["num_label"] = dataframe["label"]
new_dataframe = dataframe.replace({"num_label": mapping})
new_dataframe

In [None]:
new_dataframe.to_csv("features_30_sec_test_final.csv")

In [None]:
# 训练集 数据集
import pandas as pd
ANNOTATIONS_FILE = "./GTZAN/features_30_sec.csv"
dataframe = pd.read_csv(ANNOTATIONS_FILE)
labels = set()
for row in range(len(dataframe)):
    labels.add(dataframe.iloc[row, -1])
labels_list = []
for label in labels:
    labels_list.append(label)
sorted_labels = sorted(labels_list)
mapping = {}
for index, label in enumerate(sorted_labels):
    mapping[label] = index
dataframe["num_label"] = dataframe["label"]
new_dataframe = dataframe.replace({"num_label": mapping})
new_dataframe

In [None]:
new_dataframe.to_csv("features_30_sec_final.csv")

In [None]:
# RunBuild类，对超参数进行管理，对预先设定的超参数在训练过程中可自动组合训练
class RunBuilder():
    @staticmethod
    def get_runs(params):
        Run = namedtuple('Run', params.keys())
        
        runs = []
        
        for element in product(*params.values()):
            runs.append(Run(*element))
        
        return runs

In [None]:
# 运行时的数据管理类
class RunManager():
    def __init__(self):
        #训练集
        # epoch 的数量
        self.epoch_count = 0
        # 每一epoch的损失值
        self.epoch_loss = 0
        # 每一epoch的正确预测数量
        self.epoch_correct_num = 0
        # 每一epoch的开始训练时间
        self.epoch_start_time = None
        
        # 测试集
        self.test_epoch_count = 0
        self.test_epoch_loss = 0
        self.test_epoch_correct_num = 0
        
        
        # 每一run的超参数、循环次数等
        self.run_params = None
        self.run_count = 0
        self.run_data = []
        self.run_start_time = None
        
        
        self.network = None
        self.loader = None
        # tensorboard
        self.tb = None
    
    def begin_run(self, run, network, loader, test_loader):
        # 初始化启动时间
        self.run_start_time = time.time()
        # 初始化超参数
        self.run_params = run
        # run 次数+1
        self.run_count += 1
        
        self.network = network
        self.loader = loader
        self.test_loader = test_loader
        # 加载tensorboard
        self.tb = SummaryWriter(comment=f'-{run}')
        
        # signal：采样信号 sr：采样频率
        signal, sr, address = next(iter(self.loader))
        
        
        # 这边缺少信号转化为mel-spectrum，暂时未添加图片可视化
        
        # 神经网络结构图像可视化
        self.tb.add_graph(
            self.network,
            signal.to(run.device)
        )
        
    def end_run(self):
        # 关闭 tensorboard 写入数据
        self.tb.close()
        # 每个epoch再重新计数
        self.epoch_count = 0
        self.test_epoch_count = 0
        
    def begin_epoch(self):
        self.epoch_start_time = time.time()
        self.epoch_count += 1
        self.epoch_loss = 0
        self.epoch_correct_num = 0
        
        self.test_epoch_count += 1
        self.test_epoch_loss = 0
        self.test_epoch_correct_num = 0
        
    def end_epoch(self):
        epoch_duration = time.time() - self.epoch_start_time
        run_duration = time.time() - self.run_start_time

        # 训练集损失值
        loss = self.epoch_loss / len(self.loader.dataset)
        # 测试集准确率
        accuracy = self.epoch_correct_num / len(self.loader.dataset)
        print(f'正确率：{self.epoch_correct_num} / {len(self.loader.dataset)}')
        
        # 测试集 test
        # print(f"{self.test_epoch_correct_num}+{len(self.test_loader.dataset)}")
        test_loss = self.test_epoch_loss / len(self.test_loader.dataset)
        test_accuracy = self.test_epoch_correct_num / len(self.test_loader.dataset)
        
        # 加入损失函数图像
        self.tb.add_scalars('Loss', {"train_loss": loss, 
                                    "test_loss": test_loss}, self.epoch_count)
        # 加入准确度函数图像
        self.tb.add_scalars('Accuracy', {"train_accuracy": accuracy, 
                                        "test_accuracy": test_accuracy}, self.epoch_count)
        
        # self.tb.add_scalar('Test_Loss', test_loss, self.epoch_count)
        
        #self.tb.add_scalar('Test_Accuracy', test_accuracy, self.epoch_count)
        
        for name, param in self.network.named_parameters():
            # 神经网络每一层的值
            self.tb.add_histogram(name, param, self.epoch_count)
            # 每一层值所对应的梯度
            self.tb.add_histogram(f'{name}.grad', param.grad, self.epoch_count)

        results = OrderedDict()

        results['run'] = self.run_count
        results['epoch'] = self.epoch_count
        results['loss'] = loss
        results['accuracy'] = accuracy
        results['epoch duration'] = epoch_duration
        results['run duration'] = run_duration

        for k, v in self.run_params._asdict().items():
            results[k] = v

        self.run_data.append(results)

        df = pd.DataFrame.from_dict(self.run_data, orient='columns')

        clear_output(wait = True)
        display(df)
        
    # def test_view(self):
        
    # 作业核心数
    def get_num_workers(self,num_workers):
        self.epoch_num_workers = num_workers

    # 记录每一epoch的损失 训练集    
    def track_loss(self,loss,batch):
        self.epoch_loss += loss.item()*batch[0].shape[0]
    
    # 测试集
    def test_loss(self,test_loss, test_batch):
         self.test_epoch_loss += test_loss.item()*test_batch[0].shape[0]
    
    # 记录每一epoch上测试正确的数量 测试集
    def test_num_correct(self, test_preds, test_labels):
        
        self.test_epoch_correct_num += self.get_correct_num(test_preds, test_labels)
        
    # 训练集
    def track_num_correct(self, preds, labels):
        self.epoch_correct_num += self.get_correct_num(preds, labels)
    
    def get_correct_num(self, preds, labels):
        return preds.argmax(dim=1).eq(labels).sum().item()
    
    # 训练数据保存CSV文件中
    def save(self, fileName):
        pd.DataFrame.from_dict(
            self.run_data, orient='columns'
        ).to_csv(f'{fileName}.csv')
        
        with open(f'{fileName}.json', 'w', encoding='utf-8') as f:
            json.dump(self.run_data, f, ensure_ascii=False, indent=4)

In [None]:
# 数据预处理类
# 这边的注释
class GTZANDataset(Dataset):
    def __init__(self,
                 annotations_file,
                 audio_dir,
                 transformation,
                 target_sample_rate,
                 num_samples,
                 device):
        # 读取标签文件
        self.annotations = pd.read_csv(annotations_file)
        # 读取音频地址
        self.audio_dir = audio_dir
        # 设置设备
        self.device = device
        # 加梅尔频谱数据加载到设备中
        self.transformation = transformation.to(self.device)
        # 设定采样频率
        self.target_sample_rate = target_sample_rate
        # 设定采样数量
        self.num_samples = num_samples
        
        
    # 返回有多少个音频文件
    def __len__(self):
        return len(self.annotations)

    
    # 数组的方式可获得音频的数据、标签、路径
    def __getitem__(self, index):
        # 获得歌曲路径
        audio_sample_path = self._get_audio_sample_path(index)
        # 获得标签
        label = self._get_audio_sample_label(index)
        # signal 采样信号 sr 采样频率
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        # 控制采样频率
        signal = self._resample_if_necessary(signal, sr)
        # 双通道->单通道
        signal = self._mix_down_if_necessary(signal)
        # 控制采样数量
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        # 转化下mel频谱
        signal = self.transformation(signal)
        return signal, label, audio_sample_path

    
    # 是否需要对信号裁剪： 如果采数量 > 设定的数量 -> 裁剪
    def _cut_if_necessary(self, signal):
        # print('_cut_if_necessary')
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal
    
    
    # 是否需要对信号补充： 向右填0补充，如果采数量 < 设定的数量 -> 补充
    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        # print('_right_pad_if_necessary')
        if length_signal < self.num_samples:
            
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            # last_dim_padding.to(self.device)
            
            signal = torch.nn.functional.pad(signal, last_dim_padding)

        return signal

    
    # 重新设定采样频率
    def _resample_if_necessary(self, signal, sr):
        # print('_resample_if_necessary')
        # 如果实际的采样频率没有和设定的一致，那么才重新设定
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate).to(self.device)
            signal = resampler(signal)
            # signal = torchaudio.functional.resample(signal, sr, self.target_sample_rate)
            
        return signal


    # 将音频的双通道改为单通道
    def _mix_down_if_necessary(self, signal):
        # print('_mix_down_if_necessary')
        # 通道数大于1 就 取均值变成单通道
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    # 对音频路径进行拼接提取
    def _get_audio_sample_path(self, index):
        # print('_get_audio_sample_path')
        fold = f"{self.annotations.iloc[index, -2]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[
            index, 1])
        return path
    
    
    # 从csv文件中提取出标签
    def _get_audio_sample_label(self, index):
        # print('_get_audio_sample_label')
        return self.annotations.iloc[index, -1]
    

if __name__ == "__main__":
    ANNOTATIONS_FILE = "./features_30_sec_final.csv"
    AUDIO_DIR = "./GTZAN/genres_original"
    SAMPLE_RATE = 22050
    NUM_SAMPLES = 22050 * 5 # -> 1 second of audio
    plot = True

    if torch.cuda.is_available():
        device = "cuda"
    else:
        device = "cpu"
    print(f"Using {device} device")

    mfcc = torchaudio.transforms.MFCC(
        sample_rate=SAMPLE_RATE,
        n_mfcc=40,
        log_mels=True
    )

    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        # 窗口大小
        hop_length=512,
        # 梅尔频度
        n_mels=64
    )

    # objects inside transforms module are callable!
    # ms = mel_spectrogram(signal)

    gtzan = GTZANDataset(
        ANNOTATIONS_FILE,
        AUDIO_DIR,
        mfcc,
        SAMPLE_RATE,
        NUM_SAMPLES,
        device
    )

    print(f"There are {len(gtzan)} samples in the dataset")

    if plot:
        signal, label, path = gtzan[666]
        print(f'path:{path}')
        signal = signal.cpu()
        print(signal.shape)
        
        plt.figure(figsize=(16, 8), facecolor="white")
        plt.imshow(signal[0,:,:], origin='lower')
        plt.autoscale(False)
        plt.xlabel("Time")
        plt.ylabel("Frequency")
        plt.colorbar()
        plt.axis('auto')
        plt.show()


In [None]:
ANNOTATIONS_FILE = "./features_30_sec_final.csv"
AUDIO_DIR = "./GTZAN/genres_original"
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050  * 5

# 接下来这三个函数实际在后面没啥作用，可以删除
# 创建数据加载集
def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True,num_workers=0, pin_memory=True)
    return train_dataloader


# 对每一个epoch训练
def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)

        # calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)

        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

    print(f"loss: {loss.item()}")

# 训练
def train(model, data_loader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")

In [None]:
# AlexNet网络
class AlexNet(nn.Module):
    def __init__(self, num_classes=10):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            # 卷积 输入通道1，输出通道64 卷积核大小11*11 步长4 零填充2
            nn.Conv2d(1, 64, kernel_size=11, stride=4, padding=2),
            # ReLU激活函数
            nn.ReLU(inplace=True),
            # 最大池化
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        # 展开
        self.flatten = nn.Flatten()
        # 分类器
        self.classifier = nn.Sequential(
            # 线性分类器 全连接层
            nn.Linear(12288, 1024),
            nn.ReLU(inplace=True),
            # Dropout 随机失活
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(1024, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3, inplace=False),
            nn.Linear(1024, num_classes),
        )
    # 前向传播
    def forward(self, x):
        x = self.features(x)
        #x = x.view(-1, 3072)
        x = self.flatten(x)
        x = self.classifier(x)
        return x

In [None]:
if __name__ == '__main__':
    from torchsummary import summary
    alex=AlexNet().to("cuda")
    summary(alex, (1, 128, 111* 5))

In [None]:
torch.manual_seed(128)

In [None]:
# 定义超参数的字典
params = OrderedDict(
    lr = [.001, .0001]
    , batch_size = [64]
    , num_workers = [0]
    , device = ['cuda']
    
)

# 训练集标签文件地址
ANNOTATIONS_FILE = "./features_30_sec_final.csv"
# 训练集音频文件地址
AUDIO_DIR = "./GTZAN/genres_original"

# 测试集
ANNOTATIONS_FILE_TEST = "./features_30_sec_test_final.csv"
AUDIO_DIR_TEST = "./GTZAN_TEST/genres_original"

In [None]:
# 这个每用，用下面的梅尔频谱
mfcc = torchaudio.transforms.MFCC(
        sample_rate=SAMPLE_RATE,
        n_mfcc=128,
        log_mels=True
)

# mel频谱转换
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE,
    n_fft=1024,
    hop_length=512,
    n_mels=64
)





m = RunManager()
# c = 0
# acc = 0
# now_acc = 0
# 下面就是正式训练了
for run in RunBuilder.get_runs(params):
    usd = GTZANDataset(ANNOTATIONS_FILE,
                        AUDIO_DIR,
                        mfcc,
                        SAMPLE_RATE,
                        NUM_SAMPLES,
                        run.device)
    usd_test = GTZANDataset(
        ANNOTATIONS_FILE_TEST,
        AUDIO_DIR_TEST,
        mfcc,
        SAMPLE_RATE,
        NUM_SAMPLES,
        run.device
    )
    
    print(run)
    device = torch.device(run.device)
    
#     train_dataloader = create_data_loader(usd,
#                                          batch_size = run.batch_size,
#                                          num_workers = run.num_workers)
    
    train_data_loader = DataLoader(usd, batch_size=run.batch_size, num_workers = run.num_workers, shuffle=True)
    
    test_data_loader = DataLoader(usd_test, batch_size=run.batch_size,num_workers = run.num_workers)
    
    # network=VGG16().to(device)
    # network = CNNNetwork().to(device)
    #network = ANNNet().to(device)
    network = AlexNet().to(device)
    print(network)
    # 优化器
    optimizer = optim.Adam(network.parameters(),lr=run.lr)
    m.begin_run(run, network, train_data_loader, test_data_loader)
    #best_loss 初始化为正无穷
    best_loss = float('inf')
    for epoch in range(100):
        network.train()
        m.begin_epoch()
        for batch in train_data_loader:
            input = batch[0].to(device)
            target = batch[1].to(device)
            preds = network(input)
            loss = F.cross_entropy(preds,target)
            optimizer.zero_grad()
            # 反向传播
            loss.backward()
            optimizer.step()
            m.track_loss(loss, batch)
            m.track_num_correct(preds, target)
                    
        with torch.no_grad():
            # 这部分用于测试不用于训练所以不计算梯度
            for test_batch in test_data_loader:
                test_input = batch[0].to(device)
                test_target = batch[1].to(device)
                test_preds = network(test_input)
                test_loss = F.cross_entropy(test_preds,test_target)

                m.test_loss(test_loss, test_batch)
                
                m.test_num_correct(test_preds, test_target)    
        m.end_epoch()
        
    # 保存模型    
    torch.save(network.state_dict(), f'best_model_okk.pth')
    m.end_run()
    m.save(f'{run.lr}_{run.batch_size}')

In [None]:
!zip -r runs.zip runs


In [None]:
!rm -rf runs

!rm -rf "runs.zip"

In [None]:
class_mapping = [
    'blues',
    'classical',
    'country',
    'disco',
    'hiphop',
    'jazz',
    'metal',
    'pop',
    'reggae',
    'rock'
]

In [None]:
import random

ran = random.sample(range(0,1000),10)
ran 

In [None]:
# X输入mel频谱的张量，y实际的标签下标，class_mapping 标签字典
def predict(model, X, y, class_mapping):
    model.eval()    # train <-> eval: changes how model behave (e.g. no dropout, ...)
    with torch.no_grad():
        predictions = model(X)
        # tensor (1, 10) -> [ [0.1, 0.04, ..., 0.6] ]
        # 取出输出最大的下标
        predicted_index = predictions[0].argmax(0)
        # 读出预测标签
        predicted = class_mapping[predicted_index]
        # 实际的标签
        expected = class_mapping[y]
        
    return predicted, expected

In [None]:
# 测试下测试集上的精度
def verify_acc(local):
    
    # load back the model
    #cnn = CNNNetwork()
    # cnn=VGG16()
    cnn = AlexNet()
    #cnn = ANNNet()
    state_dict = torch.load(local)
    cnn.load_state_dict(state_dict)

    # load gtzan validation dataset
    mfcc = torchaudio.transforms.MFCC(
        sample_rate=SAMPLE_RATE,
        n_mfcc=128,
        log_mels=True
    )

    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=64
    )

    gtzan = GTZANDataset(
        annotations_file=ANNOTATIONS_FILE,
        audio_dir=AUDIO_DIR,
        transformation=mfcc,
        target_sample_rate=SAMPLE_RATE,
        num_samples=NUM_SAMPLES,
        device="cpu"
    )

    count = 0
    for i in range(0,800):
        index = i

        # get a sample from the gtzan dataset for inference
        X, y = gtzan[index][0], gtzan[index][1] # [batch_size, num_channels, freq, time]
        X.unsqueeze_(0) # insert an extra dimension at index 0
        #print(X.shape)
        #print(y)

        # make an inference
        predicted, expected = predict(cnn, X, y, class_mapping)
        #print(f"Predicted: {predicted}")
        #print(f"Expected: {expected}")
        if predicted == expected:
            
            count += 1
            #print(count)
    print(count/800.00)
    return (count/800.00) 
acc = verify_acc("best_model_okk.pth")
acc        
#         print(f"Predicted: {predicted}")
#         print(f"Expected: {expected}")
        

In [None]:
import random
cnn = AlexNet()
state_dict = torch.load("best_model_okk.pth")
cnn.load_state_dict(state_dict)
mfcc = torchaudio.transforms.MFCC(
        sample_rate=SAMPLE_RATE,
        n_mfcc=128,
        log_mels=True
)

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE,
    n_fft=1024,
    hop_length=512,
    n_mels=64
)

gtzan = GTZANDataset(
    annotations_file=ANNOTATIONS_FILE,
    audio_dir=AUDIO_DIR,
    transformation=mfcc,
    target_sample_rate=SAMPLE_RATE,
    num_samples=NUM_SAMPLES,
    device="cpu"
)




cnn.eval()

initial = random.sample(range(0,800),1)[0]
print(f'initial:{initial}')

music_init_index = gtzan[initial][1]
music_init_url = gtzan[initial][2]

print(f'music_init:{music_init_url}')


ran = random.sample(range(0,800),15)

max_music_value = - float("inf")
max_music_index = None
max_music_url = None
real_label_index = None

content={}
for i in range(15):
    with torch.no_grad():
        predictions = cnn(gtzan[ran[i]][0].unsqueeze_(0))
        predicted_item = predictions[0][music_init_index].item()
        if max_music_value < predicted_item:
            max_music_value = predicted_item
            max_music_index = i
            real_label_index = gtzan[ran[i]][1]
            max_music_url = gtzan[ran[i]][2]
        content[gtzan[ran[i]][2]] = predicted_item


In [None]:
class_mapping[music_init_index]


In [None]:
content

In [None]:
max_music_value

In [None]:
max_music_index

In [None]:
real_label_index

In [None]:
class_mapping[real_label_index]

In [None]:
max_music_url

In [None]:
ran = random.sample(range(0,1000),10)
for i in range(10):
    print(ran[i])