# Une démo de la transformation de la partition en midi à la génération d'une improvisation

In [1]:
# imports

from mido import MidiFile, MidiTrack, Message
import mido
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import json
import random


## Tokenization simple d'un fichier midi monophonique

In [2]:
def extract_features(midi_file, output='nested_list'):
    """
    Extrait les paramètres de note d'un fichier MIDI et renvoie une matrice.
    
    Args:
        midi_file: Chemin vers le fichier MIDI.
        output: Format de sortie, 'nested_list' ou 'polars'.
        
    Returns:
        Une matrice contenant les notes avec colonnes [pitch, onset, duration].
    """
    try:
        # Charger le fichier MIDI
        mid = mido.MidiFile(midi_file)
        notes = []
        active_notes = {}  # clé: (track, channel, note) -> valeur: {'start_time': ..., 'velocity': ...}

        # Parcourir toutes les pistes du fichier MIDI
        for track_index, track in enumerate(mid.tracks):
            current_time = 0
            for msg in track:
                current_time += msg.time

                # Ignorer les messages liés au tempo ou à la signature temporelle
                if msg.type in ['set_tempo', 'time_signature']:
                    continue

                # Note-on (activation)
                if msg.type == 'note_on' and msg.velocity > 0:
                    key = (track_index, msg.channel, msg.note)
                    active_notes[key] = {
                        'start_time': current_time,
                        'velocity': msg.velocity
                    }
                # Note-off (ou note_on avec vélocité nulle)
                elif msg.type == 'note_off' or (msg.type == 'note_on' and msg.velocity == 0):
                    key = (track_index, msg.channel, msg.note)
                    if key in active_notes:
                        start_info = active_notes.pop(key)
                        start_time = start_info['start_time']
                        duration_ticks = current_time - start_time
                        # On ne conserve que pitch, onset et duration
                        notes.append({
                            'pitch': msg.note,
                            'onset': start_time,
                            'duration': duration_ticks,
                            'velocity': start_info['velocity']
                        })

        # Trier les notes par onset (temps de début)
        notes = sorted(notes, key=lambda x: x['onset'])
        # Construire la matrice sous forme de liste de listes
        mnotes = [[n['pitch'], n['onset'], n['duration'], n['velocity']] for n in notes]

        if output == 'nested_list':
            return mnotes
        elif output == 'polars':
            return pl.DataFrame(mnotes, schema=['pitch', 'onset', 'duration', 'velocity'])
        else:
            raise ValueError("Le paramètre output doit être 'nested_list' ou 'polars'")
    except Exception as e:
        print("Erreur lors de l'extraction:", e)
        return None

## Implémentation de l'algorithme de l'Oracle

In [3]:
def symbols_are_similar(sym1, sym2):
    """
    Compare deux symboles  en vérifiant l'égalité de chacun de leurs éléments.
    Renvoie True si tous les éléments sont égaux.
    """
    return sym1[0] == sym2[0] and sym1[1] == sym2[1] and sym1[2] == sym2[2]


def find_similar(d, sigma):
    """
    Parcourt le dictionnaire d transitions (d) pour vérifier si une clé déjà présente est similaire à sigma.
    Renvoie la clé existante si trouvée, sinon retourne None.
    """
    for key in d:
        if symbols_are_similar(key, sigma):
            return key
    return None


def oracle(sequence):
    """
    Construit un oracle des facteurs à partir d'une séquence de symboles.
    """
    transitions = {0: {}}
    supply = {0: -1}
    currentState = 0  # dernier état créé (initialement 0)

    def addSymbol(sigma, m, transitions, supply):
        # Convertir le symbole en tuple s'il est passé sous forme de liste
        if isinstance(sigma, list):
            sigma = tuple(sigma)

        newState = m + 1
        transitions[newState] = {}  # Nouveau dictionnaire pour l'état newState

        # Utilisation de find_similar pour vérifier s'il existe déjà une clé similaire dans transitions[m]
        if find_similar(transitions[m], sigma) is None:
            transitions[m][sigma] = newState
        else:
            # Même si un symbole similaire existe déjà, on écrase la transition pour forcer le lien avec newState
            transitions[m][sigma] = newState

        k = supply[m]
        while k > -1 and find_similar(transitions[k], sigma) is None:
            transitions[k][sigma] = newState
            k = supply[k]

        if k == -1:
            s = 0
        else:
            key_match = find_similar(transitions[k], sigma)
            s = transitions[k][key_match]
        supply[newState] = s

        return newState

    # Lecture de la séquence symbole par symbole
    for symbol in sequence:
        currentState = addSymbol(symbol, currentState, transitions, supply)

    return transitions, supply

def compute_rsl(supply, sequence):
    """
    Calcule la longueur du contexte répété pour chaque état.
    
    Args:
        supply: dictionnaire des suffix links
        sequence: séquence originale
    
    Returns:
        Un tableau des longueurs de contexte
    """
    n = len(sequence)
    rsl = [0] * n
    
    for i in range(1, n):
        if supply[i] != -1:
            j = supply[i]
            if j > 0:
                # La longueur du contexte est la longueur du plus long suffixe répété
                rsl[i] = rsl[j] + 1
            else:
                # Si le suffixe pointe vers l'état initial, la longueur est 1
                rsl[i] = 1
    
    return rsl

## Création d'un symbol

In [None]:
def create_symbole(matrix):
    """
    On prend le dico en entrée puis on en ressort un tuple par notes
    """
    if isinstance(matrix,pl.DataFrame):
        return matrix.select(["pitch", "duration", "velocity"]).rows()
    else:
        return [(note[0], note[2], note[3]) for note in matrix]



## Implémentation de l'algorithme de génération

In [None]:
def generate_sequence_simple(transitions, supply, p=0.8, steps=100):
    """
    Génère une séquence aléatoire en suivant les transitions du Factor Oracle.
    - transitions: dict des transitions principales
    - supply: dict des suffix links
    - p: probabilité de suivre un factor link
    - steps: nombre d'étapes à générer
    """
    sequence = []
    state = 0
    max_state = max(transitions.keys())  # Or: len(midSymbols) - 1

    for _ in range(steps):
        next_state = None

        if state in transitions and transitions[state]:
            if random.random() < p:
                next_state = random.choice(list(transitions[state].values()))
            elif state in supply and supply[state] != -1:
                next_state = supply[state]

        # If no valid transition or suffix, move forward *only* if within range
        if next_state is None:
            if state + 1 <= max_state:
                next_state = state + 1
            else:
                # Restart from 0 or pick a random valid state
                next_state = 0  # or: random.randint(0, max_state)

        sequence.append(next_state)
        state = next_state

    return sequence

def generate_sequence_improved(transitions, supply, p=0.8, steps=100):
    """
    Génère une séquence aléatoire en suivant les transitions du Factor Oracle.
    - transitions: dict des transitions principales
    - supply: dict des suffix links
    - p: probabilité de suivre un factor link
    - steps: nombre d'étapes à générer
    """
    sequence = []
    state = 0
    max_state = max(transitions.keys())  # Or: len(midSymbols) - 1

    for _ in range(steps):
        next_state = None

        # Decide whether to follow a factor link or a suffix link
        if state in transitions and transitions[state]:
            if random.random() < p:
                # Follow a factor link
                next_state = random.choice(list(transitions[state].values()))
            elif state in supply and supply[state] != -1:
                # Follow a suffix link
                next_state = supply[state]
                # Immediately move to the next state after a suffix jump
                if next_state + 1 <= max_state:
                    sequence.append(next_state)
                    next_state += 1
                else:
                    # Restart from 0 if out of bounds
                    next_state = 0

        # If no valid transition or suffix, move forward *only* if within range
        if next_state is None:
            if state + 1 <= max_state:
                next_state = state + 1
            else:
                # Restart from 0 or pick a random valid state
                next_state = 0  # or: random.randint(0, max_state)

        sequence.append(next_state)
        state = next_state

    return sequence

### SLTSearch

In [None]:
def slt_search(i, transitions, supply, rsl, min_context_length=2, max_candidates=6):
    """
    Recherche les candidats de saut en utilisant la structure SLT (Suffix Link Tree).
    
    Args:
        i: position actuelle dans la séquence
        transitions: dictionnaire des factor links
        supply: dictionnaire des suffix links
        rsl: tableau des longueurs de contexte pour chaque état (repeated suffix length)
        min_context_length: longueur minimale du contexte pour considérer un saut
        max_candidates: nombre maximum de candidats à conserver
    
    Returns:
        Une liste triée de tuples (position_source, position_cible, longueur_contexte)
    """
    candidates = []
    
    # Fonction pour stocker une solution si elle est valide
    def store_solution(source, target, context_length):
        if context_length < min_context_length:
            return
        
        # Insérer dans la liste triée par longueur de contexte
        for idx, (_, _, length) in enumerate(candidates):
            if context_length > length:
                candidates.insert(idx, (source, target, context_length))
                if len(candidates) > max_candidates:
                    candidates.pop()  # Supprimer le dernier (moins bon) candidat
                return
        
        # Si on arrive ici, c'est que le candidat est moins bon que tous les autres
        if len(candidates) < max_candidates:
            candidates.append((source, target, context_length))
    
    # Fonction pour explorer les sous-arbres
    def slt_search_subtree(source, node, context_length):
        store_solution(source, node, context_length)
        
        # Explorer les enfants du nœud
        for child in get_children(node, supply, rsl):
            child_context = min(context_length, rsl[child])
            if child_context >= min_context_length:
                slt_search_subtree(source, child, child_context)
    
    # Fonction pour obtenir les enfants d'un nœud (les états qui ont ce nœud comme suffix link)
    def get_children(node, supply, rsl):
        children = []
        for state, suffix_link in supply.items():
            if suffix_link == node:
                children.append(state)
        
        # Trier les enfants par ordre décroissant de longueur de contexte
        children.sort(key=lambda x: rsl[x], reverse=True)
        return children
    
    # Recherche en remontant dans l'arbre des suffix links
    current = i
    while current != 0:  # 0 est l'état initial
        parent = supply.get(current, -1)
        if parent == -1:
            break
            
        context_length = rsl[current]
        if context_length >= min_context_length:
            # Considérer le saut direct
            store_solution(i, parent, context_length)
            
            # Explorer les frères du nœud courant (autres enfants du parent)
            for sibling in get_children(parent, supply, rsl):
                if sibling != current:
                    sibling_context = min(context_length, rsl[sibling])
                    if sibling_context >= min_context_length:
                        slt_search_subtree(i, sibling, sibling_context)
        
        current = parent
    
    return candidates

def generate_sequence_with_slt(transitions, supply, rsl, length=100, continuity_factor=16, min_context_length=5):
    """
    Génère une séquence en utilisant l'algorithme SLTSearch pour naviguer l'Oracle des Facteurs.
    
    Args:
        transitions: dictionnaire des factor links
        supply: dictionnaire des suffix links
        rsl: tableau des longueurs de contexte pour chaque état
        length: longueur totale de la séquence à générer
        continuity_factor: nombre d'événements à jouer avant de chercher un saut
        min_context_length: longueur minimale du contexte pour considérer un saut
    
    Returns:
        La séquence générée
    """
    sequence = []
    state = 0
    max_state = max(transitions.keys()) if transitions else 0
    
    # Liste tabou pour éviter les boucles
    taboo_list = []
    taboo_size = 8  # Nombre de cibles à garder en mémoire
    
    steps_since_last_edit = 0
    
    while len(sequence) < length:
        sequence.append(state)
        
        # Déterminer s'il est temps de chercher un point d'édition
        if steps_since_last_edit >= continuity_factor:
            # Définir une région d'édition autour du point théorique
            edit_window = max(1, continuity_factor // 5)
            edit_region_start = max(0, steps_since_last_edit - edit_window)
            edit_region_end = steps_since_last_edit + edit_window
            
            best_candidates = []
            for edit_offset in range(edit_region_start, edit_region_end + 1):
                if len(sequence) - edit_offset - 1 >= 0:  # -1 pour obtenir l'index correct
                    edit_state = sequence[len(sequence) - edit_offset - 1]
                    candidates = slt_search(edit_state, transitions, supply, rsl, min_context_length)
                    
                    # Filtrer les candidats qui sont dans la liste tabou
                    candidates = [c for c in candidates if c[1] not in taboo_list]
                    
                    best_candidates.extend(candidates)
            
            # Trier tous les candidats par longueur de contexte
            best_candidates.sort(key=lambda x: x[2], reverse=True)
            
            if best_candidates:
                # Sélectionner un candidat avec une distribution de probabilité favorisant les contextes longs
                if len(best_candidates) == 1 or random.random() < 0.5:
                    selected = best_candidates[0]
                else:
                    # Pondérer la sélection en fonction de la longueur du contexte
                    weights = [c[2] for c in best_candidates]
                    total = sum(weights)
                    probs = [w/total for w in weights]
                    selected_idx = random.choices(range(len(best_candidates)), weights=probs, k=1)[0]
                    selected = best_candidates[selected_idx]
                
                # Extraire l'information du candidat sélectionné
                _, target_state, _ = selected
                
                # Ajouter à la liste tabou
                taboo_list.append(target_state)
                if len(taboo_list) > taboo_size:
                    taboo_list.pop(0)
                
                # Effectuer le saut
                state = target_state + 1  # Immédiatement aller à l'état suivant après un saut
                if state > max_state:
                    state = 0
                
                steps_since_last_edit = 0
            else:
                # Aucun candidat valide, continuer séquentiellement
                if state + 1 <= max_state:
                    state += 1
                else:
                    state = 0
                
                steps_since_last_edit += 1
        else:
            # Continuer séquentiellement
            if state + 1 <= max_state:
                state += 1
            else:
                state = 0
            
            steps_since_last_edit += 1
    
    return sequence



## Reconstruction d'un fichier midi à partir de la séquence générée

## cellule qui execute le process

In [6]:
def sequence_to_midi(sequence, note_list, output_file='output.mid'):
    """
    On créé un fichier midi à partir de la séquence généré
    
    Paramètres
      sequence (list of int): la liste des indices des notes de la séquence généré
      note_list (list of tuple): Tuple représentant les notes de la séquence
      output_file (str): Fichier dans lequel on sauvegarde le midi

    """
    # Create a new MIDI file and add a track.
    mid = mido.MidiFile()
    track = mido.MidiTrack()
    mid.tracks.append(track)
    
    # Loop through the indices in the sequence.
    for i, note_index in enumerate(sequence):
        # Retrieve note parameters from the note_list.
        pitch, duration, velocity = note_list[note_index-1]
        
        # Create a note on event.
        # We set time=0 because notes follow each other without a gap.
        note_on = mido.Message('note_on', note=pitch, velocity=velocity, time=0)
        track.append(note_on)
        
        # Create a note off event.
        # The time for note_off is the note's duration.
        note_off = mido.Message('note_off', note=pitch, velocity=0, time=duration)
        track.append(note_off)
    
    # Save the MIDI file.
    mid.save(output_file)


In [26]:
midFile = '/home/sylogue/Documents/MuseScore4/Scores/Thirty_Caprices_No._3.mid'
midFeatures = extract_features(midFile, "polars")
midSymbols = create_symbole(midFeatures)
transitions, supply = oracle(midSymbols)
rsl = compute_rsl(supply, midSymbols)
print(supply)

{0: -1, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 1, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, 16: 0, 17: 0, 18: 11, 19: 0, 20: 13, 21: 0, 22: 15, 23: 0, 24: 16, 25: 17, 26: 0, 27: 0, 28: 0, 29: 0, 30: 0, 31: 0, 32: 0, 33: 0, 34: 0, 35: 0, 36: 0, 37: 0, 38: 6, 39: 30, 40: 32, 41: 33, 42: 34, 43: 35, 44: 36, 45: 37, 46: 38, 47: 39, 48: 40, 49: 0, 50: 35, 51: 0, 52: 37, 53: 0, 54: 30, 55: 31, 56: 32, 57: 49, 58: 50, 59: 51, 60: 52, 61: 53, 62: 54, 63: 55, 64: 56, 65: 33, 66: 34, 67: 35, 68: 36, 69: 37, 70: 38, 71: 39, 72: 40, 73: 41, 74: 42, 75: 43, 76: 44, 77: 45, 78: 46, 79: 47, 80: 48, 81: 0, 82: 0, 83: 0, 84: 0, 85: 0, 86: 6, 87: 31, 88: 0, 89: 0, 90: 0, 91: 31, 92: 53, 93: 85, 94: 0, 95: 83, 96: 0, 97: 0, 98: 35, 99: 0, 100: 53, 101: 0, 102: 0, 103: 0, 104: 32, 105: 97, 106: 98, 107: 99, 108: 100, 109: 101, 110: 102, 111: 103, 112: 97, 113: 96, 114: 83, 115: 94, 116: 53, 117: 0, 118: 102, 119: 90, 120: 0, 121: 96, 122: 114, 123: 115, 124: 116, 125: 117, 126: 118, 127: 119, 

  return pl.DataFrame(mnotes, schema=['pitch', 'onset', 'duration', 'velocity'])


In [23]:
gen =generate_sequence_with_slt(transitions, supply, rsl, 100, 10)
#gen = generate_sequence_improved(transitions, supply,)
sequence_to_midi(gen, midSymbols)