In [None]:
MuMu = pd.read_csv("data\\MuMu_dataset_multi-label.csv")

# Limpiar el dataset
MuMu.dropna(subset=['amazon_id', 'genres'], inplace=True)
MuMu = MuMu[['amazon_id', 'genres']]
MuMu.drop_duplicates(inplace=True)

In [None]:
# Contar la cantidad de géneros por registro
MuMu['num_genres'] = MuMu['genres'].apply(lambda x: len(x.split(',')))

# Filtrar los registros con más de 5 géneros
MuMu_mas_de_5_generos = MuMu[MuMu['num_genres'] > 5]

MuMu_mas_de_5_generos

In [None]:
# Convertir la columna de géneros en una lista y limitar a 5 géneros como máximo por registro
MuMu['genres'] = MuMu['genres'].apply(lambda x: ','.join(x.split(',')[:3]))

In [None]:
# Definir el directorio donde se encuentran los archivos
directory = "data\\MUMU"

# Crear una lista para almacenar los nombres de los archivos sin la extensión
file_names_without_extension = []

# Iterar sobre todos los registros del dataset MuMu
for index, row in MuMu.iterrows():
    amazon_id = row['amazon_id']
    filename = f"{amazon_id}.jpg"
    file_path = os.path.join(directory, filename)
    
    if not os.path.isfile(file_path):
        # Si el archivo no existe, eliminar el registro del dataset
        MuMu.drop(index, inplace=True)
        print(f"Registro eliminado: {amazon_id} (archivo {filename} no encontrado)")
    elif not filename.lower().endswith('.jpg'):
        try:
            # Si el archivo existe pero no es .jpg, eliminar el archivo y luego el registro del dataset
            os.remove(file_path)
            MuMu.drop(index, inplace=True)
            print(f"Registro eliminado: {amazon_id} (archivo {filename} es de otro formato)")
        except Exception as e:
            print(f"No se pudo eliminar {file_path}. Error: {e}")
    else:
        # Guardar el nombre del archivo sin la extensión en la lista
        file_name_without_extension = os.path.splitext(filename)[0]
        file_names_without_extension.append(file_name_without_extension)

df_deleted_files = pd.DataFrame(file_names_without_extension, columns=['amazon_id'])

print("Archivos eliminados que no son .jpg:", len(file_names_without_extension))

In [None]:
label_freq = MuMu['genres'].apply(lambda s: str(s).split(',')).explode().value_counts().sort_values(ascending=False)
top_20_labels = label_freq.head(20)

style.use("fivethirtyeight")
plt.figure(figsize=(12,10))
sns.barplot(y=top_20_labels.index.values, x=top_20_labels, order=top_20_labels.index)
plt.title("Label Frequency - Top 20", fontsize=14)
plt.xlabel("")
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.show()

In [None]:
# Paso 1: Obtener los 10 géneros más frecuentes
top_10_labels = label_freq.head(10).index

# Paso 2: Filtrar el dataset para obtener solo los registros que tienen exclusivamente uno de los géneros del top 10
filtered_dfs = []
for label in top_10_labels:
    # Filtrar registros que tienen exactamente un género en el top 10
    filtered_df = MuMu[MuMu['genres'].apply(lambda x: x.strip() == label)]
    
    # Seleccionar aleatoriamente hasta 400 registros si hay más de 400, sino toma todos
    if len(filtered_df) > 1200:
        filtered_df = filtered_df.sample(n=1200, random_state=42)
    
    filtered_dfs.append(filtered_df)

# Paso 3: Concatenar los dataframes filtrados en un nuevo dataset
MuMu = pd.concat(filtered_dfs).reset_index(drop=True)

# Verificar el resultado final
print("Distribución de géneros en el dataset filtrado:")
print(MuMu['genres'].value_counts())
print(MuMu.shape)

In [None]:
rare = list(label_freq[label_freq<1000].index)
print("We will be ignoring these rare labels:", rare)

In [None]:
MuMu['genres'] = MuMu['genres'].apply(lambda s: [l for l in str(s).split(',') if l not in rare])
# Eliminar registros que quedan vacíos después del filtrado
MuMu = MuMu[MuMu['genres'].apply(len) > 0]
print(MuMu.sort_values(by='genres'))

In [None]:
X_train, X_val, y_train, y_val = train_test_split(MuMu['amazon_id'], MuMu['genres'], test_size=0.2, random_state=44)
print("Number of albums for training: ", len(X_train))
print("Number of albums for validation: ", len(X_val))

In [None]:
X_train = [os.path.join('data\\MUMU', str(f)+'.jpg') for f in X_train]
X_val = [os.path.join('data\\MUMU', str(f)+'.jpg') for f in X_val]
X_train[:3]

In [None]:
y_train = list(y_train)
y_val = list(y_val)
y_train[:3]

In [None]:
nobs = 8
ncols = 4
nrows = nobs//ncols

style.use("default")
plt.figure(figsize=(12,4*nrows))
for i in range(nrows*ncols):
    ax = plt.subplot(nrows, ncols, i+1)
    img = Image.open(X_train[i])
    plt.imshow(img)
    plt.title(y_train[i], size=10)
    plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
# Convertir la lista de géneros separada por comas a géneros individuales
y_train_individual = [genre.split(',') for genres in y_train for genre in genres]

print("Labels:")
mlb = MultiLabelBinarizer()
mlb.fit(y_train_individual)

N_LABELS = len(mlb.classes_)
for (i, label) in enumerate(mlb.classes_):
    print("{}. {}".format(i, label))

In [None]:
y_train_bin = mlb.transform(y_train)
y_val_bin = mlb.transform(y_val)

In [None]:
for i in range(3):
    print(X_train[i], y_train_bin[i])

In [None]:
IMG_SIZE = 224
CHANNELS = 3

In [None]:
def parse_function(filename, label):
    image_string = tf.io.read_file(filename)
    image_decoded = tf.image.decode_jpeg(image_string, channels=CHANNELS)
    image_resized = tf.image.resize(image_decoded, [IMG_SIZE, IMG_SIZE])
    image_normalized = image_resized / 255.0
    return image_normalized, label

In [None]:
BATCH_SIZE = 256
AUTOTUNE = tf.data.experimental.AUTOTUNE
SHUFFLE_BUFFER_SIZE = 1024

In [None]:
def create_dataset(filenames, labels, is_training=True):
    dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
    dataset = dataset.map(parse_function, num_parallel_calls=AUTOTUNE)
    
    if is_training == True:
        dataset = dataset.cache()
        dataset = dataset.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
        
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.prefetch(buffer_size=AUTOTUNE)
    
    return dataset

In [None]:
train_ds = create_dataset(X_train, y_train_bin)
val_ds = create_dataset(X_val, y_val_bin)

In [None]:
for f, l in train_ds.take(1):
    print("Shape of features array:", f.numpy().shape)
    print("Shape of labels array:", l.numpy().shape)

In [None]:
def custom_loss(y_true, y_pred):
    bce_loss = keras.losses.BinaryCrossentropy(from_logits=False)(y_true, y_pred)
    
    is_all_zero_pred = tf.reduce_all(tf.equal(y_pred, 0), axis=-1)
    penalization = tf.where(is_all_zero_pred, tf.ones_like(bce_loss) * 10.0, tf.zeros_like(bce_loss))
    return bce_loss + penalization

In [None]:
base_model = keras.applications.MobileNetV2(
    weights="imagenet",
    input_shape=(224, 224, 3),
    include_top=False,
)

base_model.trainable = False
inputs = keras.Input(shape=(224, 224, 3))
scale_layer = keras.layers.Rescaling(scale=1 / 127.5, offset=-1)
x = scale_layer(inputs)

x = base_model(x, training=False)
x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Dropout(0.2)(x)
outputs = keras.layers.Dense(N_LABELS, activation='sigmoid', name='output')(x)
model = keras.Model(inputs, outputs)

model.summary(show_trainable=True)

model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=[keras.metrics.BinaryAccuracy()],
)

epochs = 30
print("Fitting the top layer of the model")
history = model.fit(train_ds, epochs=epochs, validation_data=val_ds)

In [None]:
# Obtener predicciones del modelo en el conjunto de validación
y_true = []
y_pred = []

for x_batch, y_batch in val_ds:
    y_true.append(y_batch.numpy())
    y_pred.append(model.predict(x_batch))

# Convertir las listas a arrays
y_true = np.concatenate(y_true, axis=0)
y_pred = np.concatenate(y_pred, axis=0)


In [None]:
# Obtener predicciones y etiquetas verdaderas del conjunto de validación
val_predictions = model.predict(val_ds)  # Predicciones del modelo
val_labels = np.concatenate([y for x, y in val_ds], axis=0)  # Etiquetas verdaderas

# Calcula la curva ROC y AUC para cada etiqueta
fpr = dict()
tpr = dict()
roc_auc = dict()

for i in range(N_LABELS):
    fpr[i], tpr[i], _ = roc_curve(val_labels[:, i], val_predictions[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Graficar las curvas ROC
plt.figure(figsize=(12, 8))  # Aumenta el tamaño de la figura para más claridad
for i in range(N_LABELS):
    plt.plot(fpr[i], tpr[i], lw=2, label=f'ROC curve for {label_names[i]} (area = {roc_auc[i]:.2f})')

plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.grid(True)  # Añade una cuadrícula para mejor visualización
plt.show()


In [None]:
def learning_curves(history):    
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    macro_f1 = history.history['macro_soft_f1']
    val_macro_f1 = history.history['val_macro_soft_f1']
    
    epochs = len(loss)

    style.use("bmh")
    plt.figure(figsize=(8, 8))

    plt.subplot(2, 1, 1)
    plt.plot(range(1, epochs+1), loss, label='Training Loss')
    plt.plot(range(1, epochs+1), val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')

    plt.subplot(2, 1, 2)
    plt.plot(range(1, epochs+1), macro_f1, label='Training Macro F1-score')
    plt.plot(range(1, epochs+1), val_macro_f1, label='Validation Macro F1-score')
    plt.legend(loc='lower right')
    plt.ylabel('Macro F1-score')
    plt.title('Training and Validation Macro F1-score')
    plt.xlabel('epoch')

    plt.show()
    
    return loss, val_loss, macro_f1, val_macro_f1

In [None]:
print(history.history['binary_accuracy'])
print(history.history['loss'])
print(history.history['val_binary_accuracy'])
print(history.history['val_loss'])

In [None]:
# Realizar las predicciones
y_pred = model.predict(val_ds)

# Convertir las probabilidades en etiquetas binarias
y_pred_binary = np.where(y_pred > 0.5, 1, 0)

# Obtener las etiquetas verdaderas del conjunto de validación
y_true = np.concatenate([y for x, y in val_ds], axis=0)

# Calcula la matriz de confusión para cada etiqueta
conf_matrix = confusion_matrix(y_true.argmax(axis=1), y_pred_binary.argmax(axis=1))

# Visualizar la matriz de confusión
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Matriz de Confusión")
plt.xlabel("Predicción")
plt.ylabel("Etiqueta Real")
plt.show()


In [None]:
print(history.history.keys())
plt.plot(history.history['binary_accuracy'])
plt.plot(history.history['val_binary_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
@tf.function
def macro_soft_f1(y, y_hat):
    y = tf.cast(y, tf.float32)
    y_hat = tf.cast(y_hat, tf.float32)
    tp = tf.reduce_sum(y_hat * y, axis=0)
    fp = tf.reduce_sum(y_hat * (1 - y), axis=0)
    fn = tf.reduce_sum((1 - y_hat) * y, axis=0)
    soft_f1 = 2*tp / (2*tp + fn + fp + 1e-16)
    cost = 1 - soft_f1
    macro_cost = tf.reduce_mean(cost)
    return macro_cost

In [None]:
@tf.function
def macro_f1(y, y_hat, thresh=0.5):
   
    y_pred = tf.cast(tf.greater(y_hat, thresh), tf.float32)
    tp = tf.cast(tf.math.count_nonzero(y_pred * y, axis=0), tf.float32)
    fp = tf.cast(tf.math.count_nonzero(y_pred * (1 - y), axis=0), tf.float32)
    fn = tf.cast(tf.math.count_nonzero((1 - y_pred) * y, axis=0), tf.float32)
    f1 = 2*tp / (2*tp + fn + fp + 1e-16)
    macro_f1 = tf.reduce_mean(f1)
    return macro_f1

In [None]:
base_model = keras.applications.MobileNetV2(
    weights="imagenet",
    input_shape=(224, 224, 3),
    include_top=False,
)

base_model.trainable = False
inputs = keras.Input(shape=(224, 224, 3))
scale_layer = keras.layers.Rescaling(scale=1 / 127.5, offset=-1)
x = scale_layer(inputs)

x = base_model(x, training=False)
x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Dropout(0.2)(x)
outputs = keras.layers.Dense(N_LABELS, activation='sigmoid', name='output')(x)
model = keras.Model(inputs, outputs)

model.summary(show_trainable=True)

model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=macro_soft_f1,
    metrics=[macro_soft_f1],
)
epochs = 30
print("Fitting the top layer of the model")
history = model.fit(train_ds, epochs=epochs, validation_data=val_ds)

In [None]:
print(history.history.keys())
losses, val_losses, macro_f1s, val_macro_f1s = learning_curves(history)