In [12]:
import lxml.html as l
import json

def extract_data(path):
    source = open(path).read()
    root = l.fromstring(source)
    beatmapset = root.xpath(".//script[@id='json-beatmapset']/text()")
    if len(beatmapset) == 0: return None
    beatmapset = beatmapset[0]
    return json.loads(beatmapset)

download_dir = '../parser-vmedv/beatmapsets'

In [None]:
import os
import requests
from pathlib import Path
import cv2

genres = {}

for root, directories, files in os.walk(download_dir):
    for file in files:
        beatmapset = extract_data(download_dir + os.sep + file)
        if beatmapset is None:
            continue
        img_covers = beatmapset['covers']
        if img_covers is None: 
            continue
        img_url = img_covers['cover']
        if img_url is None:
            continue
            
        # print(json.dumps(beatmapset, indent=4))
        # break
        if genres.__contains__(beatmapset['genre']['name']):
            genres[beatmapset['genre']['name']] += 1
        else:
            genres[beatmapset['genre']['name']] = 1
 
        id = img_url[img_url.find("beatmaps") + 9 : img_url.find("covers") - 1]
        pic = Path(f'pictures/{id}.jpg')
        pref = ""
        if beatmapset['genre']['name'] == 'Electronic':
            pref = 'electronic'
        else:
            pref = 'non-electronic'
        if pic.is_file():
            os.replace(pic, f"pictures/{pref}/{id}.jpg")
            continue
        pic = Path(f'pictures/{pref}/{id}.jpg')
        if pic.is_file():
            continue
        img_data = requests.get(img_url).content
        with open(f'pictures/{pref}/{img_url[img_url.find("beatmaps") + 9 : img_url.find("covers") - 1]}.jpg', 'wb') as handler:
            handler.write(img_data)
        try:
            img = cv2.imread(f'pictures/{pref}/{img_url[img_url.find("beatmaps") + 9 : img_url.find("covers") - 1]}.jpg')
        except:
            print("error message")
            pass 
print(genres)


In [12]:
import torch
import torchvision

In [48]:
from torch.utils.data import DataLoader
from PIL import Image

def check_Image(path):
    try:
        _ = Image.open(path)
        return True
    except:
        return False

data = torchvision.datasets.ImageFolder(root='./pictures/', transform=transforms.ToTensor(), is_valid_file=check_Image)
train, test = torch.utils.data.random_split(data, [0.8, 0.2])
dataloaders = {
    'train': DataLoader(
        dataset=train,
        batch_size=64,
        shuffle=True
    ),
    'val': DataLoader(
        dataset=test,
        batch_size=64,
        shuffle=False
    )
}

In [54]:
from torchvision.models import resnet50
from torch import nn


class OsuGenreClassifier(nn.Module):
    def __init__(self, ouput_dim):
        super(OsuGenreClassifier, self).__init__()
        self.model = resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)

        for param in self.model.parameters():
            param.requires_grad = False

        self.fc1 = nn.Sequential(
            nn.Linear(1000, 64),
            nn.ReLU()
        )
        self.fc2 = nn.Sequential(
            nn.Dropout(0.25),
            nn.Linear(64, ouput_dim)
        )

    def embed(self, x):
        return self.fc1(self.model(x))

    def forward(self, x):
        resnet_out = self.embed(x)
        return self.fc2(resnet_out)


In [55]:
from torchvision import transforms
from tqdm.notebook import tqdm
from sklearn.metrics import f1_score
model = OsuGenreClassifier(2)
loss_function = torch.nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(model.parameters(), amsgrad=True, lr=1e-4)

for epoch in range(4):

    for phase in ['train', 'val']:
        dataloader = dataloaders[phase]
        if phase == 'train':
            model.train()
        elif phase == 'val':
            model.eval()

        running_loss = 0.
        running_acc = 0.

        pred = []
        target = []
        for (X_batch, y_batch) in tqdm(dataloader, desc=f'Epoch: {epoch + 1}. Phase: {phase}'):
            X_batch = X_batch / 255
            X_batch = transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))(X_batch)

            optimizer.zero_grad()
            with torch.set_grad_enabled(phase == 'train'):
                y_pred = model(X_batch)

                loss_value = loss_function(y_pred, y_batch)
                y_pred_class = y_pred.argmax(dim=1)

                pred.append(y_pred_class)
                target.append(y_batch)
                if phase == 'train':
                    loss_value.backward()
                    optimizer.step()

            running_loss += loss_value.item()
            running_acc += (y_pred_class == y_batch.data).float().mean().data.cpu().numpy()

        epoch_loss = running_loss / len(dataloader)
        epoch_acc = running_acc / len(dataloader)

        t1 = torch.stack(pred[0:-1])
        t2 = torch.stack(target[0:-1])
        
        print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} F1_score: {f1_score(t1.data, t2.data, average="micro")}', end='')

Epoch: 1. Phase: train:   0%|          | 0/125 [00:00<?, ?it/s]

train Loss: 0.5854 Acc: 0.7098 F1_score: 0.8248956356736243

Epoch: 1. Phase: val:   0%|          | 0/32 [00:00<?, ?it/s]

val Loss: 0.5536 Acc: 0.7302 F1_score: 0.8379988088147707

Epoch: 2. Phase: train:   0%|          | 0/125 [00:00<?, ?it/s]

train Loss: 0.5608 Acc: 0.7188 F1_score: 0.831798278725653

Epoch: 2. Phase: val:   0%|          | 0/32 [00:00<?, ?it/s]

val Loss: 0.5479 Acc: 0.7267 F1_score: 0.8345932869670396

Epoch: 3. Phase: train:   0%|          | 0/125 [00:00<?, ?it/s]

train Loss: 0.5515 Acc: 0.7258 F1_score: 0.8342499236174763

Epoch: 3. Phase: val:   0%|          | 0/32 [00:00<?, ?it/s]

val Loss: 0.5434 Acc: 0.7297 F1_score: 0.8350983358547656

Epoch: 4. Phase: train:   0%|          | 0/125 [00:00<?, ?it/s]

train Loss: 0.5466 Acc: 0.7299 F1_score: 0.8354643595436797

Epoch: 4. Phase: val:   0%|          | 0/32 [00:00<?, ?it/s]

val Loss: 0.5416 Acc: 0.7340 F1_score: 0.8344741754822651