In [27]:
import numpy as np
from collections import defaultdict, Counter
import random
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt
import requests
from mido import Message, MidiFile, MidiTrack

In [5]:
url = 'https://raw.githubusercontent.com/czhuang/JSB-Chorales-dataset/master/Jsb16thSeparated.json'
response = requests.get(url)
data = response.json()
data.keys()
train_data = [np.array(seq, dtype=np.int32) for seq in data['train']]  # 229 sequences
valid_data = [np.array(seq, dtype=np.int32) for seq in data['valid']]  # 76 sequences
test_data = [np.array(seq, dtype=np.int32) for seq in data['test']]    # 77 sequences
print(f"Loaded {len(train_data)} train, {len(valid_data)} valid, "
      f"{len(test_data)} test sequences")

Loaded 229 train, 76 valid, 77 test sequences


In [18]:
class NGramMusicModel:
    def __init__(self, n=3):
        self.n = n
        self.transitions = defaultdict(Counter)
        self.chord_counts = Counter()
        
    def _get_ngrams(self, sequence):
        """Extract n-grams from a sequence of chords"""
        ngrams = []
        for i in range(len(sequence) - self.n + 1):
            # Convert each chord to tuple for hashing
            context = tuple(tuple(chord) for chord in sequence[i:i+self.n-1])
            next_chord = tuple(sequence[i+self.n-1])
            ngrams.append((context, next_chord))
        return ngrams
    
    def fit(self, sequences):
        """Train the n-gram model on sequences of chords"""
        print(f"Training {self.n}-gram model on {len(sequences)} sequences...")
        
        total_chords = 0
        # Count all chords for overall distribution
        for sequence in sequences:
            for chord in sequence:
                chord_tuple = tuple(chord)
                self.chord_counts[chord_tuple] += 1
                total_chords += 1
        
        # Build transition counts
        for sequence in sequences:
            if len(sequence) >= self.n:  # Skip too-short sequences
                ngrams = self._get_ngrams(sequence)
                for context, next_chord in ngrams:
                    self.transitions[context][next_chord] += 1
        
        # Convert counts to probabilities
        for context in self.transitions:
            total = sum(self.transitions[context].values())
            for chord in self.transitions[context]:
                self.transitions[context][chord] /= total
        
        print(f"Processed {total_chords} total chords")
        print(f"Learned {len(self.transitions)} unique contexts")
        print(f"Vocabulary size: {len(self.chord_counts)} unique chords")
        return self
    
    def predict_next(self, context):
        """Predict next chord given context"""
        if context in self.transitions:
            chords = list(self.transitions[context].keys())
            probs = list(self.transitions[context].values())
            return np.random.choice(len(chords), p=probs)
        else:
            # Fallback to overall chord distribution
            chords = list(self.chord_counts.keys())
            counts = list(self.chord_counts.values())
            probs = np.array(counts) / sum(counts)
            idx = np.random.choice(len(chords), p=probs)
            return chords[idx]
    
    def generate_sequence(self, length, seed=None):
        """Generate a sequence of chords of given length"""
        if seed is None:
            # Start with most common context
            if self.transitions:
                seed_context = max(self.transitions.keys(), 
                                 key=lambda x: sum(self.transitions[x].values()))
            else:
                # Fallback to random chords
                chords = list(self.chord_counts.keys())
                seed_context = tuple(random.choices(chords, k=self.n-1))
        else:
            seed_context = tuple(tuple(chord) for chord in seed)
        
        sequence = list(seed_context)
        
        for _ in range(length - len(seed_context)):
            context = tuple(sequence[-(self.n-1):])
            if context in self.transitions:
                chords = list(self.transitions[context].keys())
                probs = list(self.transitions[context].values())
                idx = np.random.choice(len(chords), p=probs)
                next_chord = chords[idx]
            else:
                # Fallback
                chords = list(self.chord_counts.keys())
                counts = list(self.chord_counts.values())
                probs = np.array(counts) / sum(counts)
                idx = np.random.choice(len(chords), p=probs)
                next_chord = chords[idx]
            
            sequence.append(next_chord)
        
        # Convert back to numpy array format
        return np.array([list(chord) for chord in sequence])
    
    def calculate_perplexity(self, sequences):
        """Calculate perplexity on test sequences"""
        total_log_prob = 0
        total_tokens = 0
        
        for sequence in sequences:
            if len(sequence) >= self.n:
                ngrams = self._get_ngrams(sequence)
                for context, next_chord in ngrams:
                    if context in self.transitions and next_chord in self.transitions[context]:
                        prob = self.transitions[context][next_chord]
                    else:
                        # Smoothing: use overall chord probability
                        prob = self.chord_counts[next_chord] / sum(self.chord_counts.values()) if next_chord in self.chord_counts else 1e-10
                    
                    if prob > 0:
                        total_log_prob += np.log(prob)
                    else:
                        total_log_prob += np.log(1e-10)  # Avoid log(0)
                    total_tokens += 1
        
        if total_tokens == 0:
            return float('inf')
        
        avg_log_prob = total_log_prob / total_tokens
        perplexity = np.exp(-avg_log_prob)
        return perplexity

In [34]:
import numpy as np
from collections import defaultdict, Counter
import random
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt

class NGramMusicModel:
    def __init__(self, n=3):
        self.n = n
        self.transitions = defaultdict(Counter)
        self.chord_counts = Counter()
        
    def _get_ngrams(self, sequence):
        """Extract n-grams from a sequence of chords"""
        ngrams = []
        for i in range(len(sequence) - self.n + 1):
            # Convert each chord to tuple for hashing
            context = tuple(tuple(chord) for chord in sequence[i:i+self.n-1])
            next_chord = tuple(sequence[i+self.n-1])
            ngrams.append((context, next_chord))
        return ngrams
    
    def fit(self, sequences):
        """Train the n-gram model on sequences of chords"""
        print(f"Training {self.n}-gram model on {len(sequences)} sequences...")
        
        total_chords = 0
        # Count all chords for overall distribution
        for sequence in sequences:
            for chord in sequence:
                chord_tuple = tuple(chord)
                self.chord_counts[chord_tuple] += 1
                total_chords += 1
        
        # Build transition counts
        for sequence in sequences:
            if len(sequence) >= self.n:  # Skip too-short sequences
                ngrams = self._get_ngrams(sequence)
                for context, next_chord in ngrams:
                    self.transitions[context][next_chord] += 1
        
        # Convert counts to probabilities
        for context in self.transitions:
            total = sum(self.transitions[context].values())
            for chord in self.transitions[context]:
                self.transitions[context][chord] /= total
        
        print(f"Processed {total_chords} total chords")
        print(f"Learned {len(self.transitions)} unique contexts")
        print(f"Vocabulary size: {len(self.chord_counts)} unique chords")
        return self
    
    def predict_next(self, context):
        """Predict next chord given context"""
        if context in self.transitions:
            chords = list(self.transitions[context].keys())
            probs = list(self.transitions[context].values())
            return np.random.choice(len(chords), p=probs)
        else:
            # Fallback to overall chord distribution
            chords = list(self.chord_counts.keys())
            counts = list(self.chord_counts.values())
            probs = np.array(counts) / sum(counts)
            idx = np.random.choice(len(chords), p=probs)
            return chords[idx]
    
    def generate_sequence(self, length, seed=None):
        """Generate a sequence of chords of given length"""
        if seed is None:
            # Start with most common context
            if self.transitions:
                seed_context = max(self.transitions.keys(), 
                                 key=lambda x: sum(self.transitions[x].values()))
            else:
                # Fallback to random chords
                chords = list(self.chord_counts.keys())
                seed_context = tuple(random.choices(chords, k=self.n-1))
        else:
            seed_context = tuple(tuple(chord) for chord in seed)
        
        sequence = list(seed_context)
        
        for _ in range(length - len(seed_context)):
            context = tuple(sequence[-(self.n-1):])
            if context in self.transitions:
                chords = list(self.transitions[context].keys())
                probs = list(self.transitions[context].values())
                idx = np.random.choice(len(chords), p=probs)
                next_chord = chords[idx]
            else:
                # Fallback
                chords = list(self.chord_counts.keys())
                counts = list(self.chord_counts.values())
                probs = np.array(counts) / sum(counts)
                idx = np.random.choice(len(chords), p=probs)
                next_chord = chords[idx]
            
            sequence.append(next_chord)
        
        # Convert back to numpy array format
        return np.array([list(chord) for chord in sequence])
    
    def calculate_perplexity(self, sequences):
        """Calculate perplexity on test sequences"""
        total_log_prob = 0
        total_tokens = 0
        
        for sequence in sequences:
            if len(sequence) >= self.n:
                ngrams = self._get_ngrams(sequence)
                for context, next_chord in ngrams:
                    if context in self.transitions and next_chord in self.transitions[context]:
                        prob = self.transitions[context][next_chord]
                    else:
                        # Smoothing: use overall chord probability
                        prob = self.chord_counts[next_chord] / sum(self.chord_counts.values()) if next_chord in self.chord_counts else 1e-10
                    
                    if prob > 0:
                        total_log_prob += np.log(prob)
                    else:
                        total_log_prob += np.log(1e-10)  # Avoid log(0)
                    total_tokens += 1
        
        if total_tokens == 0:
            return float('inf')
        
        avg_log_prob = total_log_prob / total_tokens
        perplexity = np.exp(-avg_log_prob)
        return perplexity

def evaluate_model(model, test_data, num_generated=10):
    """Evaluate the trained model"""
    print("\n=== Model Evaluation ===")
    
    # 1. Perplexity on test data
    perplexity = model.calculate_perplexity(test_data)
    print(f"Perplexity on test data: {perplexity:.2f}")
    
    # 2. Generate some sequences
    print(f"\nGenerating {num_generated} sequences...")
    generated_sequences = []
    avg_test_length = int(np.mean([len(seq) for seq in test_data]))
    print(f"Average test sequence length: {avg_test_length}")
    
    for _ in range(num_generated):
        gen_seq = model.generate_sequence(avg_test_length)
        generated_sequences.append(gen_seq)
    
    # 3. Compare chord distributions
    print("\n=== Chord Distribution Comparison ===")
    
    # Flatten all chords from all sequences
    test_chords = []
    for sequence in test_data:
        for chord in sequence:
            test_chords.append(tuple(chord))
    
    gen_chords = []
    for sequence in generated_sequences:
        for chord in sequence:
            gen_chords.append(tuple(chord))
    
    test_chord_dist = Counter(test_chords)
    gen_chord_dist = Counter(gen_chords)
    
    print("Most common chords in test data:")
    for chord, count in test_chord_dist.most_common(10):
        print(f"  {chord}: {count} ({count/len(test_chords)*100:.1f}%)")
    
    print("\nMost common chords in generated data:")
    for chord, count in gen_chord_dist.most_common(10):
        print(f"  {chord}: {count} ({count/len(gen_chords)*100:.1f}%)")
    
    # 4. Show some example generations
    print(f"\n=== Example Generated Sequences ===")
    for i in range(min(3, len(generated_sequences))):
        print(f"Generated sequence {i+1}:")
        for j, chord in enumerate(generated_sequences[i][:8]):  # Show first 8 chords
            print(f"  Chord {j+1}: {chord}")
        if len(generated_sequences[i]) > 8:
            print(f"  ... ({len(generated_sequences[i])} chords total)")
        print()
    
    print(f"Test sequence examples for comparison:")
    for i in range(min(3, len(test_data))):
        print(f"Test sequence {i+1}:")
        for j, chord in enumerate(test_data[i][:8]):  # Show first 8 chords
            print(f"  Chord {j+1}: {chord}")
        if len(test_data[i]) > 8:
            print(f"  ... ({len(test_data[i])} chords total)")
        print()
    
    return {
        'perplexity': perplexity,
        'generated_sequences': generated_sequences,
        'test_chord_dist': test_chord_dist,
        'gen_chord_dist': gen_chord_dist
    }

def analyze_voice_leading(sequences, name="sequences"):
    """Analyze voice leading patterns in the sequences"""
    print(f"\n=== Voice Leading Analysis for {name} ===")
    
    voice_movements = [[] for _ in range(4)]  # 4 voices
    
    for sequence in sequences:
        for i in range(len(sequence) - 1):
            curr_chord = sequence[i]
            next_chord = sequence[i + 1]
            
            for voice in range(4):
                movement = next_chord[voice] - curr_chord[voice]
                voice_movements[voice].append(movement)
    
    for voice in range(4):
        movements = voice_movements[voice]
        if movements:
            avg_movement = np.mean(np.abs(movements))
            print(f"Voice {voice+1}: avg movement = {avg_movement:.2f} semitones")
            
            # Show most common movements
            movement_counts = Counter(movements)
            print(f"  Most common movements: {movement_counts.most_common(5)}")

In [30]:
combined_data = train_data + valid_data
model = NGramMusicModel(n=1)
model.fit(combined_data)
results = evaluate_model(model, test_data)
analyze_voice_leading(test_data, 'test data')
analyze_voice_leading(results['generated_sequences'], 'generated')

Training 1-gram model on 305 sequences...
Processed 73636 total chords
Learned 1 unique contexts
Vocabulary size: 5129 unique chords

=== Model Evaluation ===
Perplexity on test data: 5616.53

Generating 10 sequences...
Average test sequence length: 245

=== Chord Distribution Comparison ===
Most common chords in test data:
  (69, 64, 61, 45): 286 (1.5%)
  (67, 62, 59, 43): 265 (1.4%)
  (69, 66, 62, 50): 208 (1.1%)
  (74, 66, 57, 50): 192 (1.0%)
  (72, 64, 55, 48): 180 (1.0%)
  (71, 67, 62, 55): 174 (0.9%)
  (73, 69, 64, 57): 158 (0.8%)
  (71, 68, 64, 52): 155 (0.8%)
  (74, 67, 59, 55): 138 (0.7%)
  (72, 67, 64, 48): 138 (0.7%)

Most common chords in generated data:
  (69, 66, 62, 50): 41 (1.7%)
  (71, 68, 64, 52): 27 (1.1%)
  (67, 62, 59, 43): 26 (1.1%)
  (69, 64, 61, 45): 24 (1.0%)
  (70, 67, 62, 55): 24 (1.0%)
  (74, 66, 57, 50): 24 (1.0%)
  (71, 67, 62, 55): 23 (0.9%)
  (69, 65, 60, 53): 23 (0.9%)
  (70, 65, 62, 46): 21 (0.9%)
  (73, 69, 64, 57): 20 (0.8%)

=== Example Generated Se

In [31]:
def array_to_midi(data, filename="chords_16th.mid", velocity=64, ticks_per_quarter=480):

    mid = MidiFile(ticks_per_beat=ticks_per_quarter)
    track = MidiTrack()
    mid.tracks.append(track)

    step_time = ticks_per_quarter // 4  # 16th note
    previous_notes = set()

    for row in data:
        # Ensure notes are ints in range 0–127
        current_notes = set(int(n) for n in row if 0 <= int(n) <= 127)

        # Turn off notes no longer playing
        for note in previous_notes - current_notes:
            track.append(Message('note_off', note=note, velocity=0, time=0))

        # Turn on new notes
        for note in current_notes - previous_notes:
            track.append(Message('note_on', note=note, velocity=velocity, time=0))

        # Advance time
        if current_notes:
            # Advance on just one note (others use time=0)
            first = list(current_notes)[0]
            track.append(Message('note_off', note=first, velocity=0, time=step_time))
        else:
            # If no notes, still advance time
            track.append(Message('note_off', note=0, velocity=0, time=step_time))

        previous_notes = current_notes

    # Final cleanup
    for note in previous_notes:
        track.append(Message('note_off', note=note, velocity=0, time=0))

    mid.save(filename)

In [33]:
generated_seq = model.generate_sequence(length=200)
array_to_midi(generated_seq, "generated.mid")