# Model Use: *RNN w/ LSTM*

# 1. Environement Setup

In [6]:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Masking
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras.utils import to_categorical

import ast
import gc

file_path = 'normalized_output.csv'

# 2. Data extraction

In [10]:
# Load the data
data = pd.read_csv(file_path)

# Convert string representations of lists to actual lists
data['ph_seq_encoded'] = data['ph_seq_encoded'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
data['ph_dur'] = data['ph_dur'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
data['f0_seq'] = data['f0_seq'].apply(lambda x: ast.literal_eval(x) if pd.notnull(x) else x)  # Handle NaN for f0_seq
data['note_seq_encoded'] = data['note_seq_encoded'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

# Normalize the float arrays
scaler_ph_dur = MinMaxScaler(feature_range=(0, 1))
# Normalize ph_dur assuming they are already in the correct format
data['ph_dur'] = [scaler_ph_dur.fit_transform(np.array(seq).reshape(-1, 1)).flatten() for seq in data['ph_dur']]

# Only normalize f0_seq if it's not NaN
scaler_f0_seq = MinMaxScaler(feature_range=(0, 1))
data['f0_seq'] = [scaler_f0_seq.fit_transform(np.array(seq).reshape(-1, 1)).flatten() if seq is not np.nan else np.nan for seq in data['f0_seq']]

# Find the maximum sequence length across all sequence columns
max_sequence_length = max(
    max(data['ph_seq_encoded'].apply(len)),
    max(data['ph_dur'].apply(len)),
    max([len(seq) for seq in data['f0_seq'] if seq is not np.nan]),  # Only consider non-NaN sequences
    max(data['note_seq_encoded'].apply(len))
)

# Pad the sequences
data['ph_seq_encoded'] = pad_sequences(data['ph_seq_encoded'], maxlen=max_sequence_length, padding='post').tolist()
data['ph_dur'] = pad_sequences(data['ph_dur'], maxlen=max_sequence_length, padding='post', dtype='float').tolist()
data['f0_seq'] = [pad_sequences([seq], maxlen=max_sequence_length, padding='post', dtype='float').flatten() if seq is not np.nan else np.full(max_sequence_length, np.nan) for seq in data['f0_seq']]
data['note_seq_encoded'] = pad_sequences(data['note_seq_encoded'], maxlen=max_sequence_length, padding='post').tolist()

# Flatten all sequences into a single list
all_ph_seq = [item for sublist in data['ph_seq_encoded'] for item in sublist]

# Convert to numpy array and reshape to be 2D
all_ph_seq_array = np.array(all_ph_seq).reshape(-1, 1)
all_ph_seq.clear()

# Initialize and fit the OneHotEncoder on all sequences at once
encoder = OneHotEncoder(sparse=False)
encoder.fit(all_ph_seq_array)

# Now transform each sequence individually and store the transformed arrays
data['ph_seq_encoded'] = [encoder.transform(np.array(seq).reshape(-1, 1)) for seq in data['ph_seq_encoded']]

# Determine the number of categories for notes
num_note_categories = max(data['note_seq_encoded'].apply(max)) + 1  # Assuming the sequences are zero-indexed

# One-hot encode the note_seq_encoded
data['note_seq_encoded'] = [to_categorical(seq, num_classes=num_note_categories) for seq in data['note_seq_encoded']]

padded_note_seqs = np.array([pad_sequences([seq], maxlen=max_sequence_length, padding='post', value=0)[0] for seq in data['note_seq_encoded']])
data['note_seq_encoded'] = list(padded_note_seqs)

# After padding and encoding, convert to a numpy array
X = np.array(data['ph_seq_encoded'].tolist())
y_ph_dur = np.array(data['ph_dur'].tolist())
y_note_seq_encoded = np.array(data['note_seq_encoded'].tolist())

# Ensure that all arrays have three dimensions
y_ph_dur = y_ph_dur[..., np.newaxis]  # shape (num_sequences, sequence_length, 1)

# Handle missing f0_seq data
mask_value = -1  # Define a mask value that does not appear in the data
y_f0_seq = np.array([np.full(max_sequence_length, mask_value) if np.isnan(seq).all() else seq for seq in data['f0_seq']])

# If y_f0_seq is 2D, also convert it to 3D
if y_f0_seq.ndim == 2:
    y_f0_seq = y_f0_seq[..., np.newaxis]  # shape (num_sequences, sequence_length, 1)

# Check the shapes of the arrays to make sure they are all 3D and can be concatenated
print(f"X shape: {X.shape}")
print(f"y_ph_dur shape: {y_ph_dur.shape}")
print(f"y_note_seq_encoded shape: {y_note_seq_encoded.shape}")
print(f"y_f0_seq shape: {y_f0_seq.shape}")

# Concatenate your inputs along the last axis to create a single input array for each sequence
X_combined = np.concatenate([X, y_ph_dur, y_note_seq_encoded, y_f0_seq], axis=-1)


# Shift the combined input to create the input and target pairs
X_input = X_combined[:, :-1, :]  # All but the last time step
X_target = X_combined[:, 1:, :]  # All but the first time step




X shape: (4022, 4450, 65)
y_ph_dur shape: (4022, 4450, 1)
y_note_seq_encoded shape: (4022, 4450, 56)
y_f0_seq shape: (4022, 4450, 1)


In [21]:
# Free some memory here
del all_ph_seq_array
del padded_note_seqs
del all_ph_seq
del data
del X
del y_f0_seq
del y_ph_dur
del y_note_seq_encoded
del X_combined
gc.collect()

[0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         1.         0.         0.         0.
 0.         0.         0.         0.         0.         0.07759133
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         1.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0. 

# 3. Model Training

In [22]:
# Model architecture
input_layer = Input(shape=(X_input.shape[1], X_input.shape[2]))
lstm_layer = LSTM(128, return_sequences=True)(input_layer)
output_layer = Dense(X_input.shape[2], activation='softmax')(lstm_layer)

model = Model(inputs=input_layer, outputs=output_layer)
model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model on the input and target sequences
model.fit(X_input, X_target, epochs=10, batch_size=64)

# Save the model
model.save('model_self_supervised.h5')

import joblib

# # Save the scalers and encoders
# joblib.dump(scaler_ph_dur, 'scaler_ph_dur.pkl')
# joblib.dump(scaler_f0_seq, 'scaler_f0_seq.pkl')
# joblib.dump(encoder, 'ph_seq_encoder.pkl')

Epoch 1/10


 1/63 [..............................] - ETA: 35:50 - loss: 5.3748 - accuracy: 6.6728e-05

# 4. Validation
Put user input and get output

### 4.1 Input

In [None]:
import json

# Load the encoding dictionary from the ph_token_to_int.json file
with open('ph_token_to_int.json', 'r') as file:
    ph_token_to_int = json.load(file)

# Your input sequence
input_sequence = "AP n ei f a g e n a j i f u y u a p u AP n ei f a g e n a l e y u d ao en AP"

# Split the input sequence into individual phonemes
input_phonemes = input_sequence.split()

# Convert the phonemes to their corresponding integers using the encoding dictionary
encoded_sequence = [ph_token_to_int[phoneme] for phoneme in input_phonemes]

# Convert the sequence to a numpy array and pad it to the right length
new_ph_seq_encoded = np.array([encoded_sequence])  # wrapping in a list to create a batch dimension
new_ph_seq_encoded_padded = pad_sequences(new_ph_seq_encoded, maxlen=max_sequence_length, padding='post')

print(encoded_sequence)


### 4.2 Inference

In [None]:
predictions = model.predict(user_input_padded)


### 4.3 Decoding

In [None]:
predicted_ph_dur, predicted_f0_seq, predicted_note_seq_encoded = predictions

predicted_ph_dur_2d = predicted_ph_dur.reshape(-1, 1)

# Apply inverse transformation
decoded_ph_dur_2d = scaler_ph_dur.inverse_transform(predicted_ph_dur_2d)
# print(predicted_ph_dur_2d)

# Reshape it back to the original shape if needed
decoded_ph_dur = decoded_ph_dur_2d.reshape(-1, max_sequence_length)



predicted_f0_seq_2d = predicted_f0_seq.reshape(-1, 1)

# Reverse normalization for 'f0_seq', if it's not all NaNs
if not np.isnan(y_f0_seq_train).all():
    decoded_f0_seq_2d = scaler_f0_seq.inverse_transform(predicted_f0_seq_2d)
else:
    decoded_f0_seq_2d = None  # or a placeholder value if f0_seq was not predicted
    
decoded_f0_seq = decoded_f0_seq_2d.reshape(-1, max_sequence_length)

# Convert predicted probabilities for 'note_seq_encoded' back to category indices
decoded_note_seq_encoded = np.argmax(predicted_note_seq_encoded, axis=-1)  # If the last dimension contains the category probabilities

# If the decoded sequences are padded, you may want to trim the padding off. For example:
trim_padding = lambda seq, mask: seq[:np.where(seq == mask)[0][0] if np.where(seq == mask)[0].size > 0 else None]
# Now you can trim the padding if your original sequences were padded
decoded_ph_dur_trimmed = [seq[seq != mask_value] for seq in decoded_ph_dur]
if decoded_f0_seq is not None:
    decoded_f0_seq_trimmed = [trim_padding(seq, mask_value) for seq in decoded_f0_seq]
decoded_note_seq_encoded_trimmed = [trim_padding(seq, mask_value) for seq in decoded_note_seq_encoded]

# Now you can print or return the decoded predictions
print("Decoded ph_dur:", decoded_ph_dur_trimmed)
print(len(decoded_ph_dur_trimmed[0]))
if decoded_f0_seq is not None:
    print("Decoded f0_seq:", decoded_f0_seq_trimmed)
    print(len(decoded_f0_seq_trimmed[0]))
print("Decoded note_seq_encoded:", decoded_note_seq_encoded_trimmed)
print(len(decoded_note_seq_encoded_trimmed[0]))

### 4.4 Removing zeros

In [None]:
f0_timestep = 0.005
f0_constant = 205.1
def remove_trailing_zeros(sequence):
    # Find the last non-zero element in the sequence
    last_non_zero = -1
    for i in range(len(sequence) - 1, -1, -1):
        if sequence[i] != 0:
            last_non_zero = i
            break
    # Slice the sequence to remove trailing zeros
    return sequence[:last_non_zero + 1] if last_non_zero != -1 else sequence

# Apply this function to each of your predicted sequences
decoded_ph_dur_no_zeros = remove_trailing_zeros(decoded_ph_dur_trimmed[0])
decoded_f0_seq_no_zeros = remove_trailing_zeros(decoded_f0_seq_trimmed[0])
decoded_note_seq_encoded_no_zeros = remove_trailing_zeros(decoded_note_seq_encoded_trimmed[0])

if len(decoded_ph_dur_no_zeros) > len(input_phonemes):
        # Truncate the sequence if it's longer
    decoded_ph_dur_no_zeros = decoded_ph_dur_no_zeros[:len(input_phonemes)]
elif len(decoded_ph_dur_no_zeros) < len(input_phonemes):
    # Pad the sequence with zeros if it's shorter
    decoded_ph_dur_no_zeros = np.pad(decoded_ph_dur_no_zeros, (0, len(input_phonemes) - len(decoded_ph_dur_no_zeros)), 'constant')

total_time = sum(decoded_ph_dur_no_zeros)

print(total_time)
f0_size = int(total_time / f0_timestep)
print(f0_size)
# Replace non-positive values with the specified constant
decoded_f0_seq_no_zeros = [x if x > 0 else f0_constant for x in decoded_f0_seq_no_zeros]

# Pad sequence with the constant value if it's shorter than the target length
if len(decoded_f0_seq_no_zeros) < f0_size:
    decoded_f0_seq_no_zeros = np.pad(decoded_f0_seq_no_zeros, (0, f0_size - len(decoded_f0_seq_no_zeros)), 'constant', constant_values=(f0_constant,))

# Or truncate if it's longer than the target length (just for safety)
elif len(decoded_f0_seq_no_zeros) > f0_size:
    decoded_f0_seq_no_zeros = decoded_f0_seq_no_zeros[:f0_size]

# Now you have your sequences with trailing zeros removed
print("Decoded ph_dur without trailing zeros:", decoded_ph_dur_no_zeros)
print(len(decoded_ph_dur_no_zeros))
if decoded_f0_seq_no_zeros is not None:
    print("Decoded f0_seq without trailing zeros:", decoded_f0_seq_no_zeros)
    print(len(decoded_f0_seq_no_zeros))
print("Decoded note_seq_encoded without trailing zeros:", decoded_note_seq_encoded_no_zeros)
print(len(decoded_note_seq_encoded_no_zeros))
print(len(input_phonemes))

### 4.5 Make it .ds file

In [None]:
# use input_phonemes

# Load the token-to-int mappings from the JSON files
def load_mapping(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        token_to_int = json.load(f)
    # Invert the dictionary to create an int-to-token mapping
    int_to_token = {v: k for k, v in token_to_int.items()}
    return int_to_token

# Decoding function using the mappings
def decode_predictions(prediction_integers, mapping):
    return [mapping.get(i, 'Unknown') for i in prediction_integers]

note_int_to_token = load_mapping('note_token_to_int.json')

predicted_note_seq_integers = decoded_note_seq_encoded_no_zeros

decoded_note_seq = decode_predictions(predicted_note_seq_integers, note_int_to_token)

# Print or return the decoded sequences
print("Decoded Phonetic Sequence:", input_phonemes)
print("Decoded Note Sequence:", decoded_note_seq)

file = {
    'ph_seq': input_phonemes,
    'ph_dur': decoded_ph_dur_no_zeros,
    'note_seq': decoded_note_seq,
    'f0_seq': decoded_f0_seq_no_zeros
}
