In [8]:
%matplotlib inline
import copy
import pandas as pd
import numpy as np
import librosa
import seaborn as sb
import matplotlib.pyplot as plt
import itertools
import re
import random
from os import listdir
from os.path import isfile, join
from __future__ import print_function
from numpy import median, diff

# SongFile:
### Fields
- beat_frames:  
- beat_times: 
- bpm: 
- bpm_string: 
- beat_length: 
- indices: 
- data: 

- pack
- name
- extension
- music_file
- stepfile
### Output
- data/{0}_beat_features.csv
- data/{0}_misc.csv

In [350]:
sample_rate_down = 1
hop_length_down = 8
sr = 11025 * 16 / sample_rate_down
hop_length = 512 / (sample_rate_down * hop_length_down)
samples_per_beat = 24 / 4
steps_per_bar = 24
class SongFile:
    # misc includes
    # - offset
    # - bpm
    def load_misc_from_music(self):
        self.y, _ = librosa.load(self.music_file, sr=sr)
        
        _, beat_frames = librosa.beat.beat_track(y=self.y, sr=sr, hop_length=hop_length)
        beat_times = librosa.frames_to_time(beat_frames, sr=sr, hop_length=hop_length)
        self.offset = beat_times[0]
        self.bpm = get_beats(beat_times, beat_frames)

    def load_misc_from_stepfile(self):
        #self.y, _ = librosa.load(self.music_file, sr=sr)
        
        with open(self.stepfile, "r") as txt:
            step_file = txt.read()
            step_file = step_file.replace('\n', 'n')
            bpm_search = re.search('#BPMS:([0-9.=,]*);', step_file)
            bpm_string = bpm_search.group(1)
            self.bpm = float(bpm_string.split('=')[1]) if len(bpm_string.split(',')) == 1 else 0
            
            offset_search = re.search('#OFFSET:([0-9.-]*);', step_file)
            self.offset = -float(offset_search.group(1))

    def calculate_features(self):
        # take samples_per_beat samples for each beat (need 3rds, 8ths)
        seconds = len(self.y) / sr
        num_samples = int(seconds * samples_per_beat * self.bpm / 60)
        beat_length = 60. / self.bpm
        sample_length = beat_length / samples_per_beat
        
        sample_times = [self.offset + (sample_length * i) for i in range(num_samples)]
        # only take samples where music still playing
        self.indices = [round(time * sr) for time in sample_times if round(time * sr) < len(self.y)]

    def __init__(self, pack, name, load_type):
        self.name = name
        self.folder = 'StepMania/Songs/{0}/{1}/'.format(pack, name)
        self.music_file = self.folder + next(file for file in listdir(self.folder) if file.endswith('.ogg') or file.endswith('.mp3'))
        self.stepfile = self.folder + next(file for file in listdir(self.folder) if file.endswith('.ssc') or file.endswith('.sm'))
        self.key = '{0}~{1}'.format(pack, name)
        
        if load_type == 'from_music':
            self.load_misc_from_music()
            self.calculate_features()
            pd.DataFrame([self.offset, self.bpm]).to_csv('data/{0}_misc.csv'.format(key), index=False)
            pd.DataFrame(self.beat_features).to_csv('data/{0}_beat_features.csv'.format(key), index=False)
        
        if load_type == 'from_stepfile':
            self.load_misc_from_stepfile()
            #self.calculate_features()
            #pd.DataFrame([self.offset, self.bpm]).to_csv('data/{0}_misc.csv'.format(key), index=False)
            #pd.DataFrame(self.beat_features).to_csv('data/{0}_beat_features.csv'.format(key), index=False)
            
        if load_type == 'from_store':
            if not '{0}_beat_features.csv'.format(key) in listdir('data'):
                print ('Song hasnt been loaded yet')
            else:
                self.test = pd.read_csv('data/{0}_misc.csv'.format(key)).values
                self.beat_features = pd.read_csv('data/{0}_beat_features.csv'.format(key)).values

In [351]:
song_from_music = load_songs([('In The Groove', 'Anubis')], 'from_stepfile')
song_from_music

{'In The Groove~Anubis': <__main__.SongFile at 0x11da1e978>}

# Some useful functions to load induvidual or lists of songs
- load_song(pack: String, pack: String, force_new: Bool)
- load_songs(songs: Array(Pair(String~pack, String~title)), force_new: Bool)
- load_all_songs(force_new: Bool)

In [347]:
def load_songs(songs, load_type):
    return {'{0}~{1}'.format(song[0], song[1]): SongFile(song[0], song[1], load_type) for song in songs}

def load_all_songs(load_type):
    songs = [('In The Groove', song) for song in listdir('StepMania/Songs/In The Groove') if song != '.DS_Store']
    songs.extend([('a_test', song) for song in ['A', 'B', 'C']])
    return load_songs(songs, load_type)

# Functions to get bpm from song
- get_beats(beat_times: Array(Float), beat_frames: Array(Int))

In [78]:
def get_beats(beat_times, beat_frames):
    changes = []
    changes_time = []
    for i in range(len(beat_frames) - 1):
        changes.append(beat_frames[i + 1] - beat_frames[i])
        changes_time.append(beat_times[i + 1] - beat_times[i])

    sorted_changes = sorted(changes)
    median = sorted_changes[int(len(changes) / 2)]
    median = max(set(sorted_changes), key=sorted_changes.count)

    changes_counted = [False] * len(changes)
    time_changes_sum = 0
    time_changes_count = 0
    for i in range(len(changes)):
        # can use other factors (eg if song has a slow part take double beats into accout)
        # in [0.5, 1, 2]:
        for change_factor in [1]:
            if abs((changes[i] * change_factor) - median) <= hop_length_down:
                changes_counted[i] = True
                time_changes_sum += (changes_time[i] * change_factor)
                time_changes_count += change_factor
            
    average = time_changes_sum / time_changes_count
    
    time_differences = []
    earliest_proper_beat = 1
    for i in range(1, len(beat_times) - 1):
        if changes_counted[i] & changes_counted[i - 1]:
            earliest_proper_beat = i
            break
            
    last_proper_beat = len(beat_times) -2
    for i in range(1, len(beat_times) - 1):
        if changes_counted[len(beat_times) - i - 1] & changes_counted[len(beat_times) - i - 2]:
            last_proper_beat = len(beat_times) - i - 1
            break
    
    time_differences = []
    buffer = 5
    for i in range(20):
        start_beat = earliest_proper_beat + buffer * i
        if changes_counted[start_beat] & changes_counted[start_beat - 1]:
            for j in range(20):
                end_beat = last_proper_beat - buffer * j
                if changes_counted[end_beat] & changes_counted[end_beat - 1]:
                    time_differences.append(beat_times[end_beat] - beat_times[start_beat])
        
    # get num beats, round, and make new average
    new_averages = [time_difference / round(time_difference / average) for time_difference in time_differences]
    #print (new_averages)
    new_averages.sort()
    num_averages = len(new_averages)
    #new_average = sum(new_averages[5:num_averages - 5]) / (num_averages - 10)
    new_average = new_averages[int(num_averages/2)]
    bpm = 60./new_average
    while bpm >= 200:
        bpm /= 2
    while bpm < 100:
        bpm *= 2
    return bpm

# Helpers to test get_beats() above
- get_bpm(file: String)
- get_song_bpms()
- test_get_beats(song_data: Map(SongData))

In [12]:
def get_bpm(file):
    with open(file, "r") as ins:
        result = re.search('#BPMS:(.*);', line)
        bpm_string = result.group(1)
        if len(bpm_string.split(',')) == 1:
            return float(bpm_string.split('=')[1])
        return 0
    return 0

def get_song_bpms(song_data):
    song_bpms = {}
    for key, song in song_data.items():
        song_bpms[key] = get_bpm('StepMania/Songs/{0}/{1}/{1}.sm'.format(song.pack, song.name))
    return song_bpms

def test_get_beats(song_data):
    song_bpms = get_song_bpms(song_data)
    errors = []
    for key in song_data:
        song = song_data[key]
        real_beat = song_bpms[key]
        if real_beat != 0:
            prediced_beat = get_beats(song.beat_times, song.beat_frames)[0][1]
            for i in range (1,4):
                if abs((prediced_beat * (i + 1) / (i)) - real_beat) < abs(prediced_beat - real_beat):
                    prediced_beat *= (i + 1) / (i)
            print ('{0:.3f} - {1:.3f} = {2:.3f} ({3})'.format(prediced_beat, real_beat, prediced_beat - real_beat, song.name))

#songs = [('In The Groove', song) for song in listdir('StepMania/Songs/In The Groove') if song != '.DS_Store'][:15]
#song_data_temp = load_songs(songs, True)
#test_get_beats(song_data_temp)

# Helpers to read .sm and return notes and meta data
- get_notes_from_note_string(note_string)
- get_notes_and_metadata(file)
- get_song_steps()

In [13]:
regex_notes_with_metadata = '#NOTES:n     dance-single((?:(?!//-).)*);'
regex_metadata_split = ':n     (.*):n     (.*):n     (.*):n     (.*):n     (.*):(.*);'
def get_notes_from_note_string(note_string):
    measure_comments = re.findall('( )*//( )*measure( )*[0-9]*', note_string)
    note_strings_split = re.split(r'n', note_string)[1:-1]
    notes = []
    bar = []
    for row in note_strings_split:
        if len(row) == 4:
            bar.append(row)
        else:
            notes.append(bar)
            bar = []
    return note_strings_split

def get_notes_and_metadata(file):
    difficulty_map = {}
    with open(file) as txt:
        step_file = txt.read()
        step_file = step_file.replace('\n', 'n')
        notes_with_metadata_groups = re.finditer(regex_notes_with_metadata, step_file)
        for match in notes_with_metadata_groups:
            notes_with_metadata = match.group(0)
            split_data = re.search(regex_metadata_split, notes_with_metadata)
            difficulty = split_data.group(4)
            metadata = split_data.group(5)
            notes = get_notes_from_note_string(split_data.group(6))
            notes_with_metadata_map = {
                'DIFFICULTY': difficulty,
                'METADATA': metadata,
                'NOTES': notes,
            }
            difficulty_map[difficulty] = notes_with_metadata_map
    return difficulty_map

def get_song_steps():
    songs = [song for song in listdir('StepMania/Songs/In The Groove')]
    songs.remove('.DS_Store')
    song_steps = {}
    for song in songs:
        song_steps['In The Groove~{0}'.format(song)] = get_notes_and_metadata('StepMania/Songs/In The Groove/{0}/{0}.sm'.format(song))
    return song_steps

In [76]:
def write_song_header(output_stepfile, song):
    keys = ['VERSION', 'TITLE', 'MUSIC', 'OFFSET', 'SAMPLESTART', 'SAMPLELENGTH']
    
    header_info = {
        'VERSION': 0.82,
        'TITLE': song.name,
        'MUSIC': '{0}.{1}'.format(song.name, song.extension),
        'OFFSET': -song.offset,
        'SAMPLESTART': song.offset + 32 * song.beat_length,
        'SAMPLELENGTH': 32 * song.beat_length
    }
    
    for key in keys:
        print ("#{0}:{1};".format(key, str(header_info[key])), file=output_stepfile)
        
def write_step_header(output_stepfile, song):
    print("//---------------dance-single - ----------------", file=output_stepfile)
    keys = ['NOTEDATA', 'CHARTNAME', 'STEPSTYPE', 'DIFFICULTY', 'METER', 'RADARVALUES', 'BPMS']
        
    step_info = {
        'NOTEDATA': '',
        'CHARTNAME': 'Kommisar',
        'STEPSTYPE': 'dance-single',
        'DIFFICULTY': 'Beginner',
        'METER': 1,
        'RADARVALUES': '0.234,0.292,0.008,0,0,211,212,1,0,0,0,0,0,0,0.234,0.292,0.008,0,0,211,212,1,0,0,0,0,0,0',
        'BPMS': '0={:.3f}'.format(song.bpm)
    }
    for key in keys:
        print ("#{0}:{1};".format(key, str(step_info[key])), file=output_stepfile)
        
def write_notes_simple(output_stepfile, song):
    print ("#NOTES:", file=output_stepfile)
    
    for i in range(40):
        print ("0001\n0001\n0001\n0001\n,", file=output_stepfile)
    print ("0000;", file=output_stepfile)
    
def write_notes(output_stepfile, song):
    print ("#NOTES:", file=output_stepfile)
    
    samples = song.music_samples
    # take steps_per_bar / 4 samples per beat (steps_per_bar per bar)
    steps_per_beat = steps_per_bar / 4
    filter_ammount = samples_per_beat / steps_per_beat
    
    absolute_samples = [samples[i] for i in range(len(samples)) if i % filter_ammount == 0]
    # show 1 / 3 of all notes
    cutoff_index = int(len(absolute_samples) / 3)
    cutoff = sorted(absolute_samples)[-cutoff_index]
    indices = [sample > cutoff for sample in absolute_samples]
    
    for i in range(len(indices)):
        if indices[i]:
            print ("0001", file=output_stepfile)
        else:
            print ("0000", file=output_stepfile)
        if i % steps_per_bar == 0 and i != 0:
            print (",", file=output_stepfile)

    print ("0000;", file=output_stepfile)
    
def write_notes_with_onsets(output_stepfile, song):
    print ("#NOTES:", file=output_stepfile)
    onsets = librosa.onset.onset_detect(y=song.y, sr=sr, hop_length=512)
    onsets_scaled = [onset * 512 for onset in onsets]
    
    indices = [song.indices[i] for i in range(len(song.indices)) if i % 2 == 0]

    i = 0
    onset_happened_in_frame = [False] * len(indices)
    for onset in onsets_scaled:
        while abs(onset - indices[i]) > abs(onset - indices[i + 1]):
            i += 1
        onset_happened_in_frame[i] = True
        
    for i in range(len(onset_happened_in_frame)):
        if onset_happened_in_frame[i]:
            print ("0001", file=output_stepfile)
        else:
            print ("0000", file=output_stepfile)
        if i % 24 == 23 and i != 0:
            print (",", file=output_stepfile)

    print ("0000;", file=output_stepfile)
    
def step_song(song):
    output_stepfile=open(song.stepfile, 'w')
    write_song_header(output_stepfile, song)
    write_step_header(output_stepfile, song)
    write_notes_with_onsets(output_stepfile, song)
    output_stepfile.close()

In [27]:
song = load_song('a_test', 'A', True)
backup1 = copy.deepcopy(song)
#song2 = load_song('a_test', 'Fire', True)
#song3 = load_song('a_test', 'WeBelongTogether', True)
#song4 = load_song('a_test', 'CallMeBaby', True)

Calculating beats for a_test~A


  return array(a, dtype, copy=False, order=order)


Saving calculated beats for a_test~A


In [77]:
step_song(song)
#step_song(song2)
#step_song(song3)

In [143]:
y_harmonic, y_percussive = librosa.effects.hpss(song.y)

In [144]:
beat_frames = librosa.samples_to_frames(song.indices)

In [145]:
# Compute MFCC features from the raw signal
mfcc = librosa.feature.mfcc(y=song.y, sr=sr, hop_length=512, n_mfcc=13)

In [146]:
# And the first-order differences (delta features)
mfcc_delta = librosa.feature.delta(mfcc)

In [147]:
# Stack and synchronize between beat events
# This time, we'll use the mean value (default) instead of median
beat_mfcc_delta = librosa.feature.sync(np.vstack([mfcc, mfcc_delta]), beat_frames)

In [148]:
# Compute chroma features from the harmonic signal
chromagram = librosa.feature.chroma_cqt(y=y_harmonic, sr=sr)

  return array(a, dtype, copy=False, order=order)


In [151]:
# Aggregate chroma features between beat events
# We'll use the median value of each feature between beat frames
beat_chroma = librosa.feature.sync(chromagram, beat_frames, aggregate=np.median)

In [182]:
custom_hop = 256
onset_env = librosa.onset.onset_strength(y=song.y, sr=sr, hop_length=custom_hop)
onsets = librosa.onset.onset_detect(y=song.y, sr=sr, onset_envelope=onset_env, hop_length=custom_hop)

i = 0
onset_happened_in_frame = [0] * (len(song.indices) + 1)
for onset in onsets:
    onset_scaled = onset * custom_hop
    while abs(onset_scaled - song.indices[i]) > abs(onset_scaled - song.indices[i + 1]):
        i += 1
    onset_happened_in_frame[i] = max(onset_env[onset], onset_env[onset + 1], onset_env[onset + 2], onset_env[onset + 3], onset_env[onset + 4])

In [252]:
indices = [0]
indices.extend(song.indices)
max_offset_bounds = [(int(indices[i] / custom_hop), int(indices[i + 1] / custom_hop)) for i in range(len(indices) - 1)]
max_offset_strengths = [max(onset_env[bounds[0]:bounds[1]]) for bounds in max_offset_bounds]
max_offset_strengths.append(0)

In [253]:
# Finally, stack all beat-synchronous features together
beat_features = np.vstack([beat_chroma, beat_mfcc_delta, [onset_happened_in_frame, max_offset_strengths]])

In [254]:
beat_features.shape

(40, 9427)