In [1]:
import os
PATH_DATA = '../data'

PATH_DATA_RAW = os.path.join(PATH_DATA, 'raw')
FILENAME = "datasets.zip"

os.listdir(PATH_DATA_RAW)

['datasets.zip', '.DS_Store', '.gitkeep', 'datasets', 'JKUPDD-Aug2013.zip']

In [2]:
import music21 as mc
import json

MODE_TO_OFFSET = {
    "1": 0,
    "2": 2, 
    "3": 4,
    "4": 5,
    "5": 7,
    "6": 9,
    "7": 11
}

def read_midi_from_archive(archive, path):
    mf = mc.midi.MidiFile()
    mf.readstr(archive.read(path))
    return mc.midi.translate.midiFileToStream(mf)

def read_song_metadata(archive, midi_path):
    symbol_nokey_path = midi_path.replace('/pianoroll/', '/event/').replace('nokey.mid', 'symbol_nokey.json')
    read_data = archive.read(symbol_nokey_path)
    return json.loads(read_data).get("metadata", {})

def read_mode(archive, midi_path):
    m = read_song_metadata(archive, midi_path)
    return m.get("mode", "?") 

def read_beats_in_measure(archive, midi_path):
    m = read_song_metadata(archive, midi_path)
    return m.get("beats_in_measure", "?") 

def translate_to_c(sample, mode, debug=False):
    offset = MODE_TO_OFFSET.get(mode, -1)
    if offset > 0:
        if debug:
            print("Mode: ", mode)
        for part in sample.parts:
            part.transpose(offset, inPlace=True)
    return sample

In [3]:
from collections import Counter

def _estimate_single_part_type(part):
    counts = Counter([type(elem) for elem in part.flat])
    if counts.get(mc.note.Note, 0) > counts.get(mc.chord.Chord, 0):
        return "notes"
    elif counts.get(mc.note.Note, 0) < counts.get(mc.chord.Chord, 0):
        return "chords"
    return "?"
    
    
def extract_parts_types(sample):
    if not sample.parts or len(sample.parts) > 2:
        return (None, None)
    elif len(sample.parts) == 1:
        return (_estimate_single_part_type(sample.parts[0]),)
    else:
        t1 = _estimate_single_part_type(sample.parts[0])
        t2 = _estimate_single_part_type(sample.parts[1])
        if t1 == t2:
            print(f"Same types: {t1} and {t2}")
            return (None, None)
        else:
            return (t1, t2)

In [4]:
def extract_as_notes(part):
    names, durations = [], []
    if part.notesAndRests:
        notes_and_rests = part.notesAndRests
    elif part.voices:
        select_voice = part.voices[np.argmax(len(voice) for voice in part.voices)]
        assert _get_most_common(select_voice)[0][0] == mc.note.Note
        notes_and_rests = select_voice.notesAndRests
    else:
        return names, durations
    
    for nt in notes_and_rests:
        if isinstance(nt, mc.note.Note):
            names.append(nt.pitch.nameWithOctave)
        elif isinstance(nt, mc.note.Rest):
            names.append("REST")
        elif isinstance(nt, mc.chord.Chord):
            # Take only first note
            first = nt.pitches[0]
            names.append(first.nameWithOctave)
        else:
            raise Exception(f"Unknown note type: {type(nt)} ???")
        durations.append(nt.quarterLength)
    return names, durations

def extract_as_chords(part, octave_invariant=False):
    names, durations = [], []
    if part.notesAndRests:
        notes_and_rests = part.notesAndRests
    elif part.voices:
        select_voice = part.voices[np.argmax(len(voice) for voice in part.voices)]
        assert _get_most_common(select_voice)[0][0] == mc.chord.Chord
        notes_and_rests = select_voice.notesAndRests
    else:
        return names, durations
    
    for nt in notes_and_rests:
        if isinstance(nt, mc.note.Note):
            names.append([nt.pitch.name if octave_invariant else nt.pitch.nameWithOctave])
        elif isinstance(nt, mc.note.Rest):
            names.append(["REST"])
        elif isinstance(nt, mc.chord.Chord):
            ps_sorted = sorted(nt.pitches, key=lambda pitch: pitch.ps)
            names.append([
                pitch.name if octave_invariant else pitch.nameWithOctave 
                for pitch in ps_sorted
            ])
        else:
            raise Exception(f"Unknown note type: {type(nt)} ???")
        durations.append(nt.quarterLength)
    return names, durations

def _get_most_common(part):
    if len(part.notesAndRests) == 0:
        return None
    else:
        types = list(map(type, part.notesAndRests))
        return Counter(types).most_common()

def extract_music(midi_sample, chords_octave_invariant=True):
    result = {}
    if len(midi_sample.parts) > 2:
        raise Exception(f"Midi sample given has more than TWO parts ({len(midi_sample.parts)})")
        
    part_types = extract_parts_types(sample)

    for part_type, part in zip(part_types, midi_sample.parts):
        names, durations = None, None
        if part_type == "notes":
            names, durations = extract_as_notes(part)
        elif part_type == "chords":
            names, durations = extract_as_chords(part, octave_invariant=chords_octave_invariant)

        if names and durations:
            result[part_type] = {
                "names": names,
                "durations": durations
            }

    return result

import numpy as np

def transform_notes_to_tokens(music):
    return list(
        map(
            lambda x : f"{x[0]}_{str(x[1])}", zip(
                music["notes"]["names"], 
                music["notes"]["durations"]
            )
        )
    ) if "notes" in music else []

def transform_chords_to_tokens(music):
    return list(
        map(
            lambda x : f"{'.'.join(x[0])}_{str(x[1])}", zip(
                music["chords"]["names"], 
                music["chords"]["durations"]
            )
        )
    ) if "chords" in music else []

In [26]:
def pad_by_beats_in_measure(pieces, beats_in_measure):
    duration = sum(float(x.split('_')[1]) for x in pieces)
    if duration % beats_in_measure != 0.0:
        rem = bim - (duration % beats_in_measure)
        #print("> pad by", rem)
        last_piece_pitch, last_piece_duration = pieces[-1].split('_')
        new_last_piece = '_'.join([last_piece_pitch, str(float(last_piece_duration)+rem)])
        return [*pieces[:-1], new_last_piece]
    else:
        return pieces

In [27]:
from zipfile import ZipFile

archive = ZipFile(os.path.join(PATH_DATA_RAW, FILENAME))

MAX_SONGS = float('inf')
midi_paths = []
for path in archive.namelist():
    if path.endswith('.mid') and 'nokey' in path and path.startswith('datasets'):
        midi_paths.append(path)
        if len(midi_paths) >= MAX_SONGS:
            break

notes = []
chords = []
bims = []

parse_success = []
parse_failures = []

modes = []
artists = []
songparts = []

for idx, path in enumerate(midi_paths):
    sample = read_midi_from_archive(archive, path)
    mode = read_mode(archive, path)
    bim = float(read_beats_in_measure(archive, path))
    if bim == "?" or int(bim) <= 1:
        continue
    sample = translate_to_c(sample, mode)
    try:
        music = extract_music(sample, chords_octave_invariant=True)
        ns = transform_notes_to_tokens(music)
        chs = transform_chords_to_tokens(music)
        
        ns = pad_by_beats_in_measure(ns, bim)
        chs = pad_by_beats_in_measure(chs, bim)
        
        songpart = path.split('/')[-1].split('_nokey')[0]
        
        notes.append(ns)
        chords.append(chs)
        bims.append(bim)
        modes.append(mode)
        parse_success.append(path)
        artists.append(path.split('/')[-3])
        songparts.append(songpart)
        
    except Exception as e:
        parse_failures.append(path)
        print(path.replace('datasets/pianoroll/', ''), e)
        continue
        
    if idx % 100 == 0:
        print(idx)
        
len(notes), len(chords), len(bims), len(artists)

0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
1400
1500
Same types: chords and chords
1600
1700
1800
Same types: chords and chords
1900
n/naoki-kodaka/nes-batman---stage-4---laboratory-ruins/intro_nokey.mid 
2000
Same types: chords and chords
2100
Same types: chords and chords
2200
Same types: chords and chords
2300
Same types: chords and chords
Same types: chords and chords
2400
2500
g/game-freak/vs-frontier-brain---sinnoh/verse_nokey.mid 
Same types: chords and chords
2600
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
2700
2800
Same types: chords and chords
2900
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
3000
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
Same types: chords and chords
3100
Sam

(19866, 19866, 19866, 19866)

In [28]:
def _sum_chords_duration(chords):
    return sum(float(ch.split('_')[1]) for ch in chords)

def _compare_chords(old, new):
    old_filtered = [ch for ch in old if "R" not in ch]
    for o, n in zip(old_filtered, new):
        assert o.split("_")[0] == n.split("_")[0]

def fix_chords(chords):
    fixed_chords = []

    for idx, chs in enumerate(chords):
        new_chs = []
        ch_idx = 0
        
        prev_chord, prev_duration = None, 0.0
        while ch_idx < len(chs):
            ch = chs[ch_idx]
            current_chord, current_duration = ch.split('_')
            current_duration = float(current_duration)
            if current_chord == "REST":
                prev_duration += current_duration
            else:
                if prev_chord:
                    new_chs.append(f"{prev_chord}_{str(prev_duration)}")
                    prev_chord, prev_duration = current_chord, current_duration
                else:
                    prev_chord = current_chord
                    prev_duration += current_duration

            ch_idx += 1

        new_chs.append(f"{prev_chord}_{str(prev_duration)}")
        s1, s2 = _sum_chords_duration(chs), _sum_chords_duration(new_chs)
        assert s1 == s2
        _compare_chords(chs, new_chs)
        fixed_chords.append(new_chs)
        
    return fixed_chords

In [29]:
parsed_dict = {
    "notes": notes,
    "chords": fix_chords(chords),
    "parse_success": parse_success,
    "parse_failures": parse_failures,
    "mode": modes,
    "beats_in_measure": bims,
    "artist": artists,
    "songpart": songparts
}
parsed_dict["info"] = "New parsed data. Chords are OCTAVE-INVARIANT. There are NO RESTS in CHORDS!"

import json

PATH_DATA_INTERIM = os.path.join(PATH_DATA, 'interim')

with open(os.path.join(PATH_DATA_INTERIM, "parsed.json"), "w") as handle:
    json.dump(parsed_dict, handle)