In [6]:
import tensorflow as tf
import numpy as np
import os
import zipfile
from tensorflow.keras import layers, models

In [7]:
def load_chorales(data_dir="jsb_chorales 2"):

    dataset = {}
    for subset in ['train', 'valid', 'test']:
        subset_path = os.path.join(data_dir, subset)
        chorales = []
        for file in sorted(os.listdir(subset_path)):
            if file.endswith(".csv"):
                filepath = os.path.join(subset_path, file)
                chorale = np.loadtxt(filepath, delimiter=",", skiprows=1, dtype=int)
                chorales.append(chorale)
        dataset[subset] = chorales
        print(f"Loaded {len(chorales)} chorales from {subset} directory.")
    return dataset

# Example usage
chorale_data = load_chorales()
train_chorales = chorale_data['train']
valid_chorales = chorale_data['valid']
test_chorales = chorale_data['test']


Loaded 229 chorales from train directory.
Loaded 76 chorales from valid directory.
Loaded 77 chorales from test directory.


In [8]:
# Find the max value for normalizing
max_value = max([chorale.max() for chorale in train_chorales])
max_value

81

In [9]:
# Prepare the data
def create_dataset(chorales, seq_length):
    X, y = [], []
    for chorale in chorales:
        for i in range(len(chorale) - seq_length):
            X.append(chorale[i:i + seq_length])
            y.append(chorale[i + seq_length])
    return np.array(X), np.array(y)

In [10]:
def build_model(seq_length, input_dim):
    model = models.Sequential([
        layers.Conv1D(64, kernel_size=3, activation="relu", input_shape=(seq_length, input_dim)),
        layers.Conv1D(128, kernel_size=3, activation="relu"),
        layers.GlobalMaxPooling1D(),
        layers.Dense(128, activation="relu"),
        layers.Dense(input_dim, activation="linear")  # Predict the next time step (4 notes)
    ])
    model.compile(optimizer="adam", loss="mse", metrics=["mae"])
    return model 

In [11]:
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import KFold

SEQ_LENGTH = 32  # Length of input sequences

X_train, y_train = create_dataset(train_chorales, SEQ_LENGTH)
X_valid, y_valid = create_dataset(valid_chorales, SEQ_LENGTH)
X_test, y_test = create_dataset(test_chorales, SEQ_LENGTH)

# Normalize and reshape data
X_train, X_valid, X_test = X_train / max_value, X_valid / max_value, X_test / max_value
y_train, y_valid, y_test = y_train / max_value, y_valid / max_value, y_test / max_value

early_stopping = EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True)
kf = KFold(n_splits=5, shuffle=True, random_state=42)  # 5-fold cross-validation

fold_results = []

for fold, (train_idx, val_idx) in enumerate(kf.split(X_train)):
    print(f"Training fold {fold + 1}/{kf.n_splits}...")
    
    # Split the data for this fold
    X_fold_train, X_fold_val = X_train[train_idx], X_train[val_idx]
    y_fold_train, y_fold_val = y_train[train_idx], y_train[val_idx]
    
    # Build the model
    model = build_model(SEQ_LENGTH, X_train.shape[-1])
    
    # Train the model
    history = model.fit(
        X_fold_train, y_fold_train,
        epochs=20,
        batch_size=32,
        validation_data=(X_fold_val, y_fold_val),
        callbacks=[early_stopping],
        verbose=1
    )
    
    # Save fold results
    final_val_loss = history.history['val_loss'][-1]
    fold_results.append(final_val_loss)
    print(f"Fold {fold + 1} validation loss: {final_val_loss:.4f}")


Training fold 1/5...
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d (Conv1D)             (None, 30, 64)            832       
                                                                 
 conv1d_1 (Conv1D)           (None, 28, 128)           24704     
                                                                 
 global_max_pooling1d (Globa  (None, 128)              0         
 lMaxPooling1D)                                                  
                                                                 
 dense (Dense)               (None, 128)               16512     
                                                                 
 dense_1 (Dense)             (None, 4)                 516       
                                                                 
Total params: 42,564
Trainable params: 42,564
Non-trainable params: 0
_______________________________

2024-11-17 17:48:04.560555: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M3
2024-11-17 17:48:04.560779: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 8.00 GB
2024-11-17 17:48:04.560799: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 2.67 GB
2024-11-17 17:48:04.561553: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-11-17 17:48:04.561762: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Epoch 1/20


2024-11-17 17:48:04.830161: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2024-11-17 17:48:05.016535: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.




2024-11-17 17:48:10.474774: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
 132/1198 [==>...........................] - ETA: 4s - loss: 0.0032 - mae: 0.0331

KeyboardInterrupt: 

In [None]:
# Cross-validation results
print(f"Cross-validation results: {fold_results}")
print(f"Mean validation loss: {np.mean(fold_results):.4f}")

# Step 4: Evaluate on test set
test_loss, test_mae = model.evaluate(X_test, y_test, verbose=1)
print(f"Test loss: {test_loss:.4f}, Test MAE: {test_mae:.4f}")

# Step 5: Generate Bach-like music
def generate_music(model, seed_sequence, num_steps=100):
    generated = seed_sequence.copy()
    for _ in range(num_steps):
        input_seq = generated[-SEQ_LENGTH:].reshape(1, SEQ_LENGTH, -1)
        next_step = model.predict(input_seq)
        generated = np.vstack((generated, next_step))
    return generated

# Example: Use the first test sequence as the seed
seed_sequence = X_test[0]
generated_music = generate_music(model, seed_sequence)

# Denormalize and convert to integers
generated_music = (generated_music * max_value).astype(int)

print("Generated music shape:", generated_music.shape)

Cross-validation results: [0.0017597409896552563, 0.0021010302007198334, 0.0019861215259879827, 0.0014612997183576226, 0.0026875846087932587]
Mean validation loss: 0.0020
Test loss: 0.0032, Test MAE: 0.0350


2024-11-16 21:12:05.044403: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


Generated music shape: (132, 4)
