<a href="https://www.kaggle.com/code/sharabhojha/chord-generation-lstm-example?scriptVersionId=222920225" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
"""for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))"""

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

"for dirname, _, filenames in os.walk('/kaggle/input'):\n    for filename in filenames:\n        print(os.path.join(dirname, filename))"

In [None]:
# Preprocessing steps:
# 1: condense onset files to just onset times and onset notes

import os
import re
import pandas as pd

# Directory path to your annotations folder
annotations_dir = "/kaggle/input/aam-annotations/AAM-annotations/"

# Iterate through all files in the directory
for filename in os.listdir(annotations_dir):
    if "onsets" in filename and filename.endswith(".arff"):  # Ensure it's an ARFF file with 'onsets' in its name
        file_path = os.path.join(annotations_dir, filename)
        
        # Read the ARFF file
        with open(file_path, "r", encoding="utf-8") as file:
            lines = file.readlines()

        attributes = []
        data_start = False
        data_rows = []

        for line in lines:
            line = line.strip()
            
            # Ignore comments and empty lines
            if not line or line.startswith("%"):
                continue

            if line.lower().startswith("@attribute"):
                # Extract attribute name (between the first space and the last space)
                match = re.match(r"@attribute\s+['\"]?([\w\s]+)['\"]?\s+.*", line, re.IGNORECASE)
                if match:
                    attributes.append(match.group(1).strip())

            elif line.lower().startswith("@data"):
                data_start = True  # Data section starts
            
            elif data_start:
                # Split considering quoted strings properly
                values = re.findall(r'\".*?\"|\'.*?\'|[^,]+', line)
                values = [v.strip("\"' ") if v.strip() else None for v in values]  # Remove extra quotes
                data_rows.append(values)

        # Convert to DataFrame
        df = pd.DataFrame(data_rows, columns=attributes)

        # Convert numeric columns where possible
        for col in df.columns:
            try:
                df[col] = pd.to_numeric(df[col])  # Convert if possible
            except ValueError:
                pass  # Keep as string if conversion fails

        all_onsets = []

        # collect all onset events per timestamp
        for row in range(df.index.size):
            notes_at_onset = []
            for col in range(1, df.columns.size):
                notes_at_onset.append(df.iat[row, col])
            notes_at_onset = re.findall("(\d+)", ''.join(notes_at_onset))
            all_onsets.append([int(item) for item in notes_at_onset])

        # delete onset events for individual instruments and add column for all events
        allCols = df.columns[df.apply(lambda col: col.astype(str).str.contains(r"\[", regex=True)).any()].tolist()
        df.drop(allCols, axis=1, inplace=True)
        df["Onset events"] = all_onsets

        # Save the processed DataFrame to a new CSV file
        output_file = re.search("(\d+)", filename).group(0) + "_onset_condensed.csv"
        df.to_csv(output_file, index=False)

        print(f"Processed {filename} and saved to {output_file}")

Processed 0159_onsets.arff and saved to 0159_onset_condensed.csv
Processed 2502_onsets.arff and saved to 2502_onset_condensed.csv
Processed 0697_onsets.arff and saved to 0697_onset_condensed.csv
Processed 1245_onsets.arff and saved to 1245_onset_condensed.csv
Processed 1808_onsets.arff and saved to 1808_onset_condensed.csv
Processed 1281_onsets.arff and saved to 1281_onset_condensed.csv
Processed 0011_onsets.arff and saved to 0011_onset_condensed.csv
Processed 1785_onsets.arff and saved to 1785_onset_condensed.csv
Processed 2971_onsets.arff and saved to 2971_onset_condensed.csv
Processed 1381_onsets.arff and saved to 1381_onset_condensed.csv
Processed 1554_onsets.arff and saved to 1554_onset_condensed.csv
Processed 2869_onsets.arff and saved to 2869_onset_condensed.csv
Processed 0111_onsets.arff and saved to 0111_onset_condensed.csv
Processed 2144_onsets.arff and saved to 2144_onset_condensed.csv
Processed 2040_onsets.arff and saved to 2040_onset_condensed.csv
Processed 1331_onsets.arf

In [None]:
# 2: encode chord names and replace said chord names with encodings in beatinfo files

# Directory path to your annotations folder
annotations_dir = "/kaggle/input/aam-annotations/AAM-annotations/"
headers = ['Start time in seconds', 'Bar count', 'Quarter count', 'Chord name']

chords = set()

dataframes = []
filenames = []

# Iterate through all files in the directory
for filename in os.listdir(annotations_dir):
    if "beatinfo" in filename and filename.endswith(".arff"):  # Ensure it's an ARFF file with 'beatinfo' in its name
        file_path = os.path.join(annotations_dir, filename)
        filenames.append(filename)
        df = pd.read_csv(file_path, comment='@', header=None)
        df.columns = headers

        for i in range(df.index.size):
            df.iat[i, 3] = df.iat[i, 3].replace("'", "")
            chords.add(df.iat[i, 3])
    
        dataframes.append(df)

sorted_chords = sorted(list(chords))
chord_encodings = dict(zip([i for i in range(len(chords))], sorted_chords))
print(chord_encodings)

# Modify existing dataframes to match encodings
for i in range(len(dataframes)):
    dataframes[i].drop(columns=['Bar count', 'Quarter count'], inplace=True)

    for j in range(dataframes[i].index.size):
        dataframes[i].iat[j, 1] = sorted_chords.index(dataframes[i].iat[j, 1])

    dataframes[i].to_csv(filenames[i].replace('arff', 'csv'), index=False)

In [None]:
# visualize the files
working_dir = "/kaggle/working/"
onsets = pd.read_csv(working_dir + "0001_onset_condensed.csv")
print(onsets.head())
beatinfo = pd.read_csv(working_dir + "0001_beatinfo.csv")
print(beatinfo.head())

def align_onsets_with_chords(onsets, beatinfo):
    aligned_data = []
    for _, onset_row in onsets.iterrows():
        onset_time = onset_row['Onset time in seconds']
        # Find the chord corresponding to this onset time
        chord_row = beatinfo[beatinfo['Start time in seconds'] <= onset_time].iloc[-1]
        onset_list = eval(onset_row['Onset events'])
        aligned_data.append((onset_list, chord_row['Chord name']))
    return aligned_data

In [None]:
# create aligned data for every onset and beatinfo file

all_data = []
working_dir = "/kaggle/working/"

for filename in os.listdir(working_dir):
    if "onset" in filename:
        onset_path = os.path.join(working_dir, filename)
        beatinfo_path = os.path.join(working_dir, re.search("(\d+)", filename).group(0) + "_beatinfo.csv")
        onsets = pd.read_csv(onset_path)
        beatinfo = pd.read_csv(beatinfo_path)
        all_data += align_onsets_with_chords(onsets, beatinfo)

#for i in range(len(all_data)):
    #all_data[i][0][0] = eval(all_data[i][0][0])
    

print(all_data[0:20])

In [None]:
"""import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Masking
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

# Convert string lists to actual lists
X_raw = [x[0] for x in all_data]  # Convert string lists to actual lists
y_raw = np.array([x[1] for x in all_data])  # Target values

# Normalize target values
scaler = MinMaxScaler()
y_scaled = scaler.fit_transform(y_raw.reshape(-1, 1))

# Pad sequences to make them equal length
X_padded = pad_sequences(X_raw, padding='post', dtype='float32')

# Reshape for LSTM (samples, timesteps, features)
X_final = np.expand_dims(X_padded, axis=-1)

# Split into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X_final, y_scaled, test_size=0.2, random_state=42)

# Build LSTM Model
model = Sequential([
    Masking(mask_value=0.0, input_shape=(X_final.shape[1], 1)),  # Ignore padded zeros
    LSTM(50, return_sequences=True),
    LSTM(50),
    Dense(50, activation='relu'),
    Dense(25, activation='softmax')  # 25-class classification
])

model.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Train the model
model.fit(X_train, y_train, epochs=10, batch_size=4, validation_data=(X_test, y_test))

# Predict on test set
predictions = model.predict(X_test)
predictions = scaler.inverse_transform(predictions)  # Convert back to original scale

print("Predicted Values:", predictions.flatten())"""

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, TimeDistributed

def create_chord_classification_model(vocab_size, embedding_dim, lstm_units, num_classes, max_sequence_length):
    # Input for note sequences
    note_input = Input(shape=(max_sequence_length,))
    
    # Embedding layer for note sequences
    note_embedding = Embedding(vocab_size, embedding_dim)(note_input)
    
    # LSTM layers
    lstm_output = LSTM(lstm_units, return_sequences=True)(note_embedding)
    lstm_output = LSTM(lstm_units, return_sequences=False)(lstm_output)
    
    # Output layer
    output = Dense(num_classes, activation='softmax')(lstm_output)
    
    model = Model(inputs=note_input, outputs=output)
    return model

# Hyperparameters
vocab_size = 128  # Assuming MIDI note range
embedding_dim = 32
lstm_units = 64
num_classes = 25  # Number of chord classes
max_sequence_length = 4  # Adjust based on your data

# Create the model
model = create_chord_classification_model(vocab_size, embedding_dim, lstm_units, num_classes, max_sequence_length)

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Function to prepare data
def prepare_data(data, max_sequence_length):
    X = []
    y = []
    for sequence in data:
        notes, chord = sequence
        padded_notes = tf.keras.preprocessing.sequence.pad_sequences([notes], maxlen=max_sequence_length, padding='post', truncating='post')[0]
        X.append(padded_notes)
        y.append(chord)
    return np.array(X), np.array(y)

# Prepare your data
X, y = prepare_data(all_data, max_sequence_length)

# Convert y to one-hot encoded format
y_onehot = tf.keras.utils.to_categorical(y, num_classes=num_classes)

# Train the model
history = model.fit(X, y_onehot, validation_split=0.2, epochs=100, batch_size=32)

# Function for inference
def predict_chord(model, note_sequence):
    padded_sequence = tf.keras.preprocessing.sequence.pad_sequences([note_sequence], maxlen=max_sequence_length, padding='post', truncating='post')
    predictions = model.predict(padded_sequence)
    return np.argmax(predictions[0])  # Return the prediction

# Example usage
sample_sequence = [60, 64, 67, 72]  # C major chor
predicted_chord = predict_chord(model, sample_sequence)
print(f"Predicted chord num: {predicted_chord}")
print(f"Predicted chord: {chord_encodings[predicted_chord]}")