# SYMBOLIC MUSIC TOKENIZER

Setup

In [None]:
import os
import sys
import music21 as m21
from typing import List, Optional, Union, Dict, Any

def isColab() -> bool:
    return "google.colab" in sys.modules

basePath = "/content" if isColab() else "."

def fileExists(path: str) -> bool:
    return os.path.exists(path)

if isColab():
    try:
        import pandas as pd
    except:
        !pip -q install pandas
    try:
        import music21  
    except:
        !pip -q install music21
    from google.colab import files as colabFiles

Helpers

In [None]:
def normalizePitchName(nameWithOctave: str) -> str:
    # A- a un Ab
    return nameWithOctave.replace("-", "b")

def getInstrumentName(part: m21.stream.Part, partIndex: int) -> str:
    # PART_<instrument>
    inst = part.getInstrument(returnDefault=False)
    if inst is not None:
        if getattr(inst, "instrumentName", None):
            name = inst.instrumentName
        else:
            best = inst.bestName() if hasattr(inst, "bestName") else None
            name = best if best else f"Part{partIndex + 1}"
    else:
        name = f"Part{partIndex + 1}"
    return str(name).strip().replace(" ", "_").replace("/", "_")

def clefToken(clefObj: m21.clef.Clef) -> str:
    # CLEF_<type>_<line>
    clefType = clefObj.__class__.__name__
    clefLine = getattr(clefObj, "line", None)
    lineStr = str(clefLine) if clefLine is not None else "NA"
    return f"CLEF_{clefType}_{lineStr}"

def timeSignatureToken(tsObj: m21.meter.TimeSignature) -> str:
    # TIME_SIG_<num>/<denom>
    return f"TIME_SIG_{tsObj.numerator}/{tsObj.denominator}"

def keyToken(keyObj: Union[m21.key.KeySignature, m21.key.Key]) -> str:
    # KEY_<tonic>_<mode>
    if isinstance(keyObj, m21.key.Key):
        tonic = normalizePitchName(keyObj.tonic.name)
        mode = keyObj.mode
        return f"KEY_{tonic}_{mode}"
    inferred = keyObj.asKey()
    tonic = normalizePitchName(inferred.tonic.name)
    mode = inferred.mode
    return f"KEY_{tonic}_{mode}"

Measure helpers

In [None]:
def getMeasureTimeSignature(measure: m21.stream.Measure) -> Optional[m21.meter.TimeSignature]:
    # Time signature vigente
    ts = measure.timeSignature
    if ts is not None:
        return ts
    tss = list(measure.recurse().getElementsByClass(m21.meter.TimeSignature))
    return tss[0] if tss else None

def getMeasureKeyObject(measure: m21.stream.Measure) -> Optional[Union[m21.key.KeySignature, m21.key.Key]]:
    # porqueeeee music21 a veces guarda Key en vez de KeySignature
    ks = measure.keySignature
    if ks is not None:
        return ks
    keys = list(measure.recurse().getElementsByClass(m21.key.Key))
    return keys[0] if keys else None

def getMeasureClef(measure: m21.stream.Measure) -> Optional[m21.clef.Clef]:
    # Clef vigente
    clefs = list(measure.recurse().getElementsByClass(m21.clef.Clef))
    return clefs[0] if clefs else None

Tokenizador

In [None]:
def tokenizeMusicXml(musicXmlPath: str) -> List[str]:
    # Task: implement a symbolic music tokenizer (MusicXML a tokens)
    score = m21.converter.parse(musicXmlPath)
    tokens: List[str] = []

    for partIndex, part in enumerate(score.parts):
        tokens.append("<BOS>")
        tokens.append(f"PART_{getInstrumentName(part, partIndex)}")

        lastTs: Optional[str] = None
        lastKey: Optional[str] = None
        lastClef: Optional[str] = None

        measures = list(part.getElementsByClass(m21.stream.Measure))

        for measure in measures:
            measureNumber = measure.number if measure.number is not None else 0
            tokens.append(f"BAR_{measureNumber}")

            tsObj = getMeasureTimeSignature(measure)
            if tsObj is not None:
                tsTok = timeSignatureToken(tsObj)
                if lastTs != tsTok:
                    tokens.append(tsTok)
                    lastTs = tsTok

            keyObj = getMeasureKeyObject(measure)
            if keyObj is not None:
                keyTok = keyToken(keyObj)
                if lastKey != keyTok:
                    tokens.append(keyTok)
                    lastKey = keyTok

            clefObj = getMeasureClef(measure)
            if clefObj is not None:
                clefTok = clefToken(clefObj)
                if lastClef != clefTok:
                    tokens.append(clefTok)
                    lastClef = clefTok

            for el in measure.notesAndRests:
                posBar = float(el.offset)
                posAbs = float(measure.offset + el.offset)
                durQl = float(el.duration.quarterLength)

                tokens.append(f"POS_BAR_{posBar}")
                tokens.append(f"POS_ABS_{posAbs}")
                tokens.append(f"DUR_{durQl}")

                if isinstance(el, m21.note.Rest):
                    restType = el.duration.type if el.duration.type else "unknown"
                    tokens.append(f"REST_{restType}")
                    continue

                if isinstance(el, m21.chord.Chord):
                    for p in sorted(el.pitches, key=lambda x: x.midi):
                        tokens.append(f"PITCH_{normalizePitchName(p.nameWithOctave)}")
                    continue

                if isinstance(el, m21.note.Note):
                    tokens.append(f"PITCH_{normalizePitchName(el.pitch.nameWithOctave)}")
                    continue

        tokens.append("<EOS>")

    return tokens

Test with local files

In [None]:
import os

fileNames = [
    "armandosRhumba.musicxml",
    "armandosRhumbaPiano.musicxml",
]

musicXmlFiles = [os.path.join(basePath, f) for f in fileNames]

if isColab():
    missing = [f for f in fileNames if not fileExists(os.path.join(basePath, f))]
    if missing:
        uploaded = colabFiles.upload()
        musicXmlFiles = [os.path.join(basePath, f) for f in uploaded.keys()]

results: Dict[str, List[str]] = {}
for path in musicXmlFiles:
    results[os.path.basename(path)] = tokenizeMusicXml(path)

{f: len(results[f]) for f in results.keys()}

Results 

In [None]:
def tokenStats(tokens: List[str]) -> Dict[str, Any]:
    # Conteo por prefijos
    prefixes = [
        "<BOS>", "<EOS>", "PART_", "CLEF_", "PITCH_", "POS_BAR_", "POS_ABS_",
        "DUR_", "REST_", "BAR_", "TIME_SIG_", "KEY_"
    ]
    counts = {p: 0 for p in prefixes}
    other = 0
    for t in tokens:
        matched = False
        for p in prefixes:
            if t == p or t.startswith(p):
                counts[p] += 1
                matched = True
                break
        if not matched:
            other += 1
    counts["OTHER"] = other
    return counts

def showPreview(tokens: List[str], n: int = 80) -> None:
    print("Preview:")
    print(" ".join(tokens[:n]))
    if len(tokens) > n:
        print("...")

def splitByPart(tokens: List[str]) -> Dict[str, List[str]]:
    parts: Dict[str, List[str]] = {}
    currentPart = None
    buffer: List[str] = []
    for t in tokens:
        if t == "<BOS>":
            buffer = ["<BOS>"]
            currentPart = None
            continue
        if t.startswith("PART_"):
            currentPart = t
            buffer.append(t)
            continue
        if t == "<EOS>":
            buffer.append("<EOS>")
            key = currentPart if currentPart else "PART_UNKNOWN"
            parts[key] = buffer
            buffer = []
            currentPart = None
            continue
        if buffer is not None:
            buffer.append(t)
    return parts

def splitPartByBars(partTokens: List[str]) -> Dict[str, List[str]]:
    bars: Dict[str, List[str]] = {}
    currentBar = None
    buf: List[str] = []
    for t in partTokens:
        if t.startswith("BAR_"):
            if currentBar is not None:
                bars[currentBar] = buf
            currentBar = t
            buf = [t]
        else:
            if currentBar is not None:
                buf.append(t)
    if currentBar is not None:
        bars[currentBar] = buf
    return bars

for f, toks in results.items():
    print("\n" + "=" * 80)
    print(f"File: {f}")
    print(f"Total tokens: {len(toks)}")
    showPreview(toks, n=100)
    print("\nStats:")
    stats = tokenStats(toks)
    for k in sorted(stats.keys()):
        print(f"{k:12s}: {stats[k]}")
    parts = splitByPart(toks)
    print("\nParts found:", list(parts.keys()))
    for partName, ptoks in parts.items():
        print("\n" + "-" * 80)
        print(partName, "| tokens:", len(ptoks))
        bars = splitPartByBars(ptoks)
        barKeys = list(bars.keys())[:3]
        print("First bars:", barKeys)
        for bk in barKeys:
            snippet = " ".join(bars[bk][:60])
            print(f"{bk}: {snippet}" + (" ..." if len(bars[bk]) > 60 else ""))

Global tokenization por archivo:

In [None]:
import pandas as pd
from typing import List, Dict, Any, Optional

def tokensToEventsDf(tokens: List[str]) -> pd.DataFrame:
    rows: List[Dict[str, Any]] = []

    currentPart: Optional[str] = None
    currentBar: Optional[int] = None
    currentTimeSig: Optional[str] = None
    currentKey: Optional[str] = None
    currentClef: Optional[str] = None

    pendingPosBar: Optional[float] = None
    pendingPosAbs: Optional[float] = None
    pendingDur: Optional[float] = None

    def flushPendingAsUnknown():
        nonlocal pendingPosBar, pendingPosAbs, pendingDur
        if pendingPosBar is None and pendingPosAbs is None and pendingDur is None:
            return
        rows.append({
            "part": currentPart,
            "bar": currentBar,
            "timeSig": currentTimeSig,
            "key": currentKey,
            "clef": currentClef,
            "posBar": pendingPosBar,
            "posAbs": pendingPosAbs,
            "durQl": pendingDur,
            "eventType": "UNKNOWN",
            "value": None,
        })
        pendingPosBar = None
        pendingPosAbs = None
        pendingDur = None

    for t in tokens:
        if t == "<BOS>":
            currentPart = None
            currentBar = None
            currentTimeSig = None
            currentKey = None
            currentClef = None
            pendingPosBar = None
            pendingPosAbs = None
            pendingDur = None
            continue

        if t.startswith("PART_"):
            currentPart = t
            continue

        if t.startswith("BAR_"):
            flushPendingAsUnknown()
            try:
                currentBar = int(t.split("_", 1)[1])
            except:
                currentBar = None
            continue

        if t.startswith("TIME_SIG_"):
            currentTimeSig = t
            continue

        if t.startswith("KEY_"):
            currentKey = t
            continue

        if t.startswith("CLEF_"):
            currentClef = t
            continue

        if t.startswith("POS_BAR_"):
            flushPendingAsUnknown()
            try:
                pendingPosBar = float(t.split("_", 2)[2])
            except:
                pendingPosBar = None
            continue

        if t.startswith("POS_ABS_"):
            try:
                pendingPosAbs = float(t.split("_", 2)[2])
            except:
                pendingPosAbs = None
            continue

        if t.startswith("DUR_"):
            try:
                pendingDur = float(t.split("_", 1)[1])
            except:
                pendingDur = None
            continue

        if t.startswith("REST_"):
            rows.append({
                "part": currentPart,
                "bar": currentBar,
                "timeSig": currentTimeSig,
                "key": currentKey,
                "clef": currentClef,
                "posBar": pendingPosBar,
                "posAbs": pendingPosAbs,
                "durQl": pendingDur,
                "eventType": "REST",
                "value": t,
            })
            pendingPosBar = None
            pendingPosAbs = None
            pendingDur = None
            continue

        if t.startswith("PITCH_"):
            rows.append({
                "part": currentPart,
                "bar": currentBar,
                "timeSig": currentTimeSig,
                "key": currentKey,
                "clef": currentClef,
                "posBar": pendingPosBar,
                "posAbs": pendingPosAbs,
                "durQl": pendingDur,
                "eventType": "PITCH",
                "value": t,
            })
            continue

        if t == "<EOS>":
            flushPendingAsUnknown()
            currentPart = None
            currentBar = None
            currentTimeSig = None
            currentKey = None
            currentClef = None
            pendingPosBar = None
            pendingPosAbs = None
            pendingDur = None
            continue

    df = pd.DataFrame(rows)
    if not df.empty:
        df["part"] = df["part"].fillna("PART_UNKNOWN")
        df = df.sort_values(["part", "posAbs", "bar"], kind="mergesort").reset_index(drop=True)
    return df

eventsDfByFile: Dict[str, pd.DataFrame] = {}
for f, toks in results.items():
    eventsDfByFile[f] = tokensToEventsDf(toks)

for f, df in eventsDfByFile.items():
    print("\n" + "=" * 80)
    print("File:", f)
    print("Rows:", len(df))
    display(df.head(60))
    if len(df) > 0:
        display(df["eventType"].value_counts())