In [34]:
import sys
import os
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from collections import Counter

sys.path.append('..')
audio_root_folder = '../archive/Data'
labels_csv = os.path.join(audio_root_folder, 'data.csv')
unslicedData = pd.read_csv(labels_csv, header=0)
slicedCSV = os.path.join(audio_root_folder, 'features.csv')
slicedDataSet = pd.read_csv(slicedCSV, header=0)
label = unslicedData['label']
unslicedData = unslicedData.sample(frac=1).reset_index(drop=True)
pca = PCA(n_components=0.90)

In [35]:
def splitTabularPredicting(model, index, dataSet, scaler, pca):
    """
    first find the sliced 10 data for each data in the testing data, then do the prediction to all 10 data.
    Pick the mode of the prediction to be the final prediction, then compute the accuracy of this prediction.
    Note that the first slice of filename "blues.00000.wav" is named as "blues.00000.0.wav"
    Args:
        pca: principal component analysis object
        scaler: scaler used to scale data
        index: the index columns of testing data
        model: the model we trained
        dataSet: The whole data set include the testing data
    Returns:
        result: the predicted label
    """
    prediction = []
    for i in range(len(index)):
        songIndex = index.iloc[i]
        sliceRow = dataSet[dataSet['index'] == songIndex].drop(['label', 'index'], axis=1)
        sliceRow = pd.DataFrame(scaler.transform(sliceRow), columns=sliceRow.columns)
        #sliceRow = pca.transform(sliceRow)
        slice_prediction = model.predict(sliceRow)
        prediction.append(Counter(slice_prediction).most_common(1)[0][0])
    return np.array(prediction)

In [36]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression

def naiveWeightTraining(model1, dataSet, scaler, model2):
    """
    This method trains a weight function from the fitted model using the training data
    Args:
        model: the fitted model
        dataSet: whole dataset
        scaler: the scaler to scale the data. I prefer standardscaler
    Returns:
        newModel: the fitted weight function
    """
    songSelected = dataSet.copy()
    labels = songSelected['label']
    songSelected.drop(['label', 'index'], axis=1, inplace=True)
    scaledData = pd.DataFrame(scaler.transform(songSelected), columns=songSelected.columns)
    rawPrediction = model1.predict(scaledData)
    matchVector = np.where(labels == rawPrediction, 1, 0)
    model2.fit(scaledData, matchVector)
    return model2

In [37]:
def predictionUsingTwoModel(model1, model2, data, threshold):
    """
    Use model1 to produce the sliced raw prediction for each slice.
    Then for the song in the same index, use model2 to get the probability
    Args:
        threshould: 
        model1: 
        model2: 
        data: 

    Returns:

    """
    copy = data.copy()
    index = copy['index']
    copy.drop(['label', 'index'], axis=1, inplace=True)
    rawPrediction = model1.predict(copy)
    weight = model2.predict_proba(copy)[:, 1]
    prediction_with_index = pd.DataFrame({
        'index': index,
        'probability': weight,
        'prediction': rawPrediction
    })
    # Calculate the total probability for each index
    total_probability = prediction_with_index.groupby('index')['probability'].transform('sum')
    
    # Normalize the probabilities
    prediction_with_index['normalized_probability'] = prediction_with_index['probability'] / total_probability
    
    # Filter out the noise based on the threshold
    filtered_predictions = prediction_with_index[prediction_with_index['normalized_probability'] >= threshold]
    
    # Continue with aggregation and selection of the highest probability prediction for each index
    if not filtered_predictions.empty:
        aggregated = filtered_predictions.groupby(['index', 'prediction'], as_index=False)['normalized_probability'].sum()
        aggregated_sorted = aggregated.sort_values(by=['index', 'normalized_probability'], ascending=[True, False])
        final_predictions = aggregated_sorted.drop_duplicates(subset=['index'], keep='first').sort_index()
        final_predictions.drop(['normalized_probability'], axis=1, inplace=True)
    else:
        # Handle the case where filtering leaves some indices without predictions
        final_predictions = pd.DataFrame(columns=['index', 'prediction'])
    
    return final_predictions
    

In [38]:
def getAccuracy(predictions, real):
    """
    Return the accuracy of the prediction
    Args:
        real: the real label with index
        predictions: the prediction with index
    Returns:
        the accuracy score
    """
    merged = pd.merge(predictions, real, on='index')
    correct_predictions = (merged['prediction'] == merged['label']).sum()
    accuracy_score = correct_predictions / len(merged)
    return accuracy_score

In [39]:
def predictionFinalStep(testData, model1, model2, scaler, threshold):
    copy = testData.copy()
    index = copy['index']
    copy.drop(['label', 'index'], axis=1, inplace=True)
    data_scaled = pd.DataFrame(scalar.fit_transform(copy), columns=copy.columns)
    rawPrediction = model1.predict(data_scaled)
    weight = model2.predict_proba(copy)[:, 1]
    prediction_with_index = pd.DataFrame({
        'index': index,
        'probability': weight,
        'prediction': rawPrediction
    })
    # Calculate the total probability for each index
    total_probability = prediction_with_index.groupby('index')['probability'].transform('sum')
    
    # Normalize the probabilities
    prediction_with_index['normalized_probability'] = prediction_with_index['probability'] / total_probability
    
    # Filter out the noise based on the threshold
    filtered_predictions = prediction_with_index[prediction_with_index['normalized_probability'] >= threshold]
    
    # Continue with aggregation and selection of the highest probability prediction for each index
    if not filtered_predictions.empty:
        aggregated = filtered_predictions.groupby(['index', 'prediction'], as_index=False)['normalized_probability'].sum()
        aggregated_sorted = aggregated.sort_values(by=['index', 'normalized_probability'], ascending=[True, False])
        final_predictions = aggregated_sorted.drop_duplicates(subset=['index'], keep='first').sort_index()
        final_predictions.drop(['normalized_probability'], axis=1, inplace=True)
    else:
        # Handle the case where filtering leaves some indices without predictions
        final_predictions = pd.DataFrame(columns=['index', 'prediction'])
    
    return final_predictions

In [40]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler

#scalar = StandardScaler()
scalar = MinMaxScaler()

In [41]:
#generate the testing data folds
K = 5
testDataFolds = []
testDataSliced = [] #the sliced testing data
length = len(unslicedData)
totalIndex = unslicedData['index']
slice_size = length // K
start_index = 0
for i in range(K):
    end_index = min(start_index + slice_size, length)
    fold = unslicedData.iloc[start_index: end_index]
    start_index = end_index
    index = fold['index']
    slicedTestData = slicedDataSet[slicedDataSet['index'].isin(index)]
    testDataSliced.append(slicedTestData)
    testDataFolds.append(fold)
    

In [42]:
#generate the training data folds
trainData = []
trainingLabel = []
for i in range(K):
    index = testDataFolds[i]['index']
    label = unslicedData[~unslicedData['index'].isin(index)][['label', 'index']]
    trainingLabel.append(label)
    trainData.append(slicedDataSet[~slicedDataSet['index'].isin(index)])
    

In [43]:
#model training
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from tqdm import tqdm

#model = RandomForestClassifier(max_depth = 15, max_features = 10)
model = LogisticRegression(max_iter=10000, C=1e-3)
#model = SVC(kernel='rbf', C=1)

predictions = []
testLabels = []
threshold = 0.07
for i in range(K):
    X_train = trainData[i].copy()
    index = X_train['index']
    label = X_train['label']
    X_train.drop(['label','index'], axis=1, inplace=True)
    X_train_scaled = pd.DataFrame(scalar.fit_transform(X_train), columns=X_train.columns)
    #X_train_pca = pca.fit_transform(X_train_scaled)
    #X_train = pd.DataFrame(npArray, columns=X_train.columns)
    #trainingData, _, trainingLabel, _ = train_test_split(X_train_pca, label, train_size=0.5)
    #model.fit(trainingData, trainingLabel)
    #model.fit(X_train_pca, label)
    model.fit(X_train_scaled, label)
    weightModel = RandomForestClassifier(max_depth=20, max_features=20)
    weightModel = naiveWeightTraining(model, trainData[i], scalar, weightModel)
    X_train_scaled['index'] = index.values
    X_train_scaled['label'] =label.values
    #model.fit(X_train, label)
    #trainAccuracy = accuracy_score(model.predict(X_train), label)
    trainPrediction = predictionUsingTwoModel(model, weightModel, X_train_scaled, threshold)
    trainAccuracy = getAccuracy(trainPrediction, trainingLabel[i])
    #trainAccuracy = accuracy_score(model.predict(X_train_pca), label)
    print(f"training accuracy: {trainAccuracy}")
    testLabel = testDataFolds[i][['label', 'index']] #index and label of 30 second songs together
    testLabels.append(testLabel)
    prediction = predictionFinalStep(testDataSliced[i], model, weightModel, scalar, threshold)
    predictions.append(prediction)
    

training accuracy: 0.53875
training accuracy: 0.4775
training accuracy: 0.51125
training accuracy: 0.51375
training accuracy: 0.5925


In [44]:
#average accuracy
accuracies = []
for i in range(K):
    prediction = predictions[i]
    accuracyOfPrediction = getAccuracy(prediction, testLabels[i])
    accuracies.append(accuracyOfPrediction)

mean_accuracy = np.mean(accuracies)
print(f"Mean Accuracy: {mean_accuracy}")

Mean Accuracy: 0.38994974874371857
