In [None]:
!gdown --id dataset_id --output test.csv
!gdown --id dataset_id --output train.csv

### Libraries

In [None]:
import pandas as pd
import numpy as np
import functools

import matplotlib.pyplot as plt
from multiprocessing.dummy import Pool

from sklearn.cluster import DBSCAN
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

### Config

In [None]:
config = {
    'all_feature' : False,
    'train_data' : "./train.csv",
    'test_data' : "./test.csv",
    'threshold' : 0.6,
    'feature_selected' : [0, 2, 3, 4, 6, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
}

In [None]:
def pitch_processing(data: pd.DataFrame)->pd.DataFrame:
  pitch_mapping = {'C': 0, 'C#': 1, 'D': 2, 'D#': 3, 'E': 4, 'F': 5, 'F#': 6, 'G': 7, 'G#': 8, 'A': 9, 'A#': 10, 'B': 11}
  data['Feature 13'] = data['Feature 13'].map(pitch_mapping)
  data['Feature 13'] = data['Feature 13'] * (np.pi / 6)
  data['Feature 13'] = np.sin(data['Feature 13'])

  return data

def feature_interaction(data: pd.DataFrame, interaction_features):
  interation = pd.DataFrame()

  for feature_pair in interaction_features:
    feature1, feature2, new_feature_name = feature_pair
    interation[new_feature_name] = data[feature1] * data[feature2]

  tar_pos = data.columns.get_loc('Feature 13')

  for new_feature_name in interation.columns:
    tar_pos += 1
    data.insert(loc=tar_pos, column=new_feature_name, value=interation[new_feature_name])

  return data

def normalize(data: pd.DataFrame, target: list):
  data_copy = data.copy()  # Create a copy of the DataFrame to avoid modifying it in place
  scaler = MinMaxScaler()

  for tar in target:
      data_copy[tar] = scaler.fit_transform(data_copy[[tar]])

  return data_copy

def standardize(data: pd.DataFrame, target: list):
  data_copy = data.copy()  # Create a copy of the DataFrame to avoid modifying it in place
  scaler = StandardScaler()

  for tar in target:
      data_copy[tar] = scaler.fit_transform(data_copy[[tar]])

  return data_copy

### Load Data & Preprocessing

In [None]:
normalize_target = [
    'Feature 4', 'Feature 5', 'Feature 8'
]

standardize_target = [
    'Feature 9', 'Feature 13'
]

interaction = [
    ('Feature 10', 'Feature 12', 'vocal_instrument')
]

In [None]:
train_data = pd.read_csv(config['train_data'])
test_data = pd.read_csv(config['test_data'])

song_id = train_data['song_id']
train_data.drop(['song_id'], axis = 1, inplace= True)

train_data = pitch_processing(train_data)
train_data = feature_interaction(train_data, interaction)

feature_names = train_data.columns.to_list()

scaler = MinMaxScaler()
train_data = scaler.fit_transform(train_data)

processed_data = pd.DataFrame(train_data, columns=feature_names)
processed_data.insert(loc = 0, column = 'song_id',value = song_id)
processed_data.to_csv("processed.csv", index=False)

### Training

### KMeans

In [None]:
costs = []
for k in range(1, 30):
    kmeans = KMeans(n_clusters=k, random_state=0)
    kmeans.fit(train_data)
    costs.append(kmeans.inertia_)  # inertia_ 是 KMeans 模型的成本函數值


plt.plot(range(1, 30), costs, marker='o')
plt.title('Elbow Method for Optimal k')
plt.xlabel('Number of clusters (k)')
plt.ylabel('Cost (Inertia)')
plt.show()

In [None]:
k_values = range(5, 15)
silhouette_scores = []

for k in k_values:
    kmeans = KMeans(n_clusters=k, random_state=0)
    cluster_labels = kmeans.fit_predict(train_data)
    silhouette_avg = silhouette_score(train_data, cluster_labels)
    silhouette_scores.append(silhouette_avg)

plt.plot(k_values, silhouette_scores, marker='o')
plt.title('Silhouette Analysis for Optimal k')
plt.xlabel('Number of clusters (k)')
plt.ylabel('Silhouette Score')
plt.show()

In [None]:
kmeans = KMeans(n_clusters=9, random_state=0)
cluster_labels = kmeans.fit_predict(train_data)

distances = kmeans.transform(train_data)

min_values = np.min(distances, axis=1, keepdims=True)

distances = np.where(distances == min_values, np.inf, distances)

sc_val = min_values / distances

print(sc_val)

has_value_above_threshold = np.any(sc_val > config['threshold'], axis=1)
indices_with_value_above_threshold = np.where(has_value_above_threshold)[0]

wrong_ratio = len(indices_with_value_above_threshold) / len(sc_val)

### DBSCAN

In [None]:
best_score = -1
best_params = {'eps': None, 'min_samples': None}
eps_val = [1.0, 1.5, 2.0]
min_samples_val = [5, 10, 15, 20]

for eps in eps_val:
    for min_samples in min_samples_val:
        dbscan = DBSCAN(eps=eps, min_samples=min_samples)
        labels = dbscan.fit_predict(train_data)
        silhouette = silhouette_score(train_data, labels)

        if silhouette > best_score:
            best_score = silhouette
            best_params['eps'] = eps
            best_params['min_samples'] = min_samples

print("Best Parameters:", best_params)

In [None]:
dbscan = DBSCAN(eps=2, min_samples=10)
cluster_labels = dbscan.fit_predict(train_data)

### Result

In [None]:
data_with_labels = np.column_stack((song_id, cluster_labels))

result = pd.DataFrame(data_with_labels, columns=['song_id', 'Cluster_Label'])

result.to_csv('cluster_labels.csv', index=False)

In [None]:
result_counts = result['Cluster_Label'].value_counts()
print(result_counts)
plt.bar(result_counts.index, result_counts.values)
plt.xlabel('Clustering')
plt.ylabel('Number')
plt.title('Clustering Result')
plt.show()

In [None]:
song1 = test_data['col_1']
song2 = test_data['col_2']

test_len = len(song1)

res = []

for i in range(test_len):
  song1_id = song1[i]
  song2_id = song2[i]
  if cluster_labels[song1_id] != cluster_labels[song2_id] :
    res.append(0)
  else:
    res.append(1)

id = [str(i) for i in range(len(res))]

result = pd.DataFrame(res, index=id, columns=['ans'])
result.index.name = 'id'

result.to_csv('result.csv')