### Code section

##### Initialize SC server and MIDI input

In [1]:
import sc3nb as scn
import numpy as np
sc = scn.startup()
%sc MIDIClient.init;
%sc MIDIIn.connectAll;

<IPython.core.display.Javascript object>

Starting sclang process... Done.
Registering OSC /return callback in sclang... Done.
Loading default sc3nb SynthDefs... Done.
Booting SuperCollider Server... Done.
MIDI: device 0 3 -1541700688  (Microsoft GS Wavetable Synth)
MIDI: device 1 4 -1541700680  (V61)
MIDI: device 2 5 -1541700672  (MIDIOUT2 (V61))
MIDI Sources:
	MIDIEndPoint("V61", "V61")
	MIDIEndPoint("MIDIIN2 (V61)", "MIDIIN2 (V61)")
MIDI Destinations:
	MIDIEndPoint("Microsoft GS Wavetable Synth", "Microsoft GS Wavetable Synth")
	MIDIEndPoint("V61", "V61")
	MIDIEndPoint("MIDIOUT2 (V61)", "MIDIOUT2 (V61)")
-> MIDIClient
Requested notification messages from server 'sc3nb_remote'
sc3nb_remote: server process has maxLogins 8 - adjusting my options accordingly.
sc3nb_remote: keeping clientID (0) as confirmed by server process.
-> MIDIIn


##### Setup SC synths and global parameters

In [13]:
%%sc 
// Setting up general parameters used by the training tasks

~metronome_rates = [1, 0.75, 0.75, 0.75];
~metronome_amp = 0.2;
~metronome_amps = Pseq([~metronome_amp], inf);
~number_all_notes = 0;
~dur_per_beat = 1.0;

// Setting up SynthDefs used by the training tasks

~drumstick_buf = Buffer.read(s, "./sounds/drumstick.wav";);
SynthDef(\play_buf, {
    |out=0, buf, amp=1, rate=1|
    var sig = PlayBuf.ar(1, buf, BufRateScale.kr(buf) * rate, doneAction: 2) * amp;
    Out.ar(out, sig!2)
}).add;

SynthDef(\train_note, { 
    |freq=440, amp=0.2, pan=0.5, gate=1, distortion_length=0, 
    freq_reduction_level=0, freq_distort_level=0, amp_reduction_level=0|
    var sig, env, note, noise, env_distortion;
    
    env = EnvGen.kr(Env.adsr, gate, doneAction: Done.freeSelf);
    env_distortion = EnvGen.kr(Env.linen(0, distortion_length * ~dur_per_beat, 0, 1));
    
    note = LFTri.ar((1 - (freq_reduction_level * env_distortion)) * freq + (BrownNoise.ar(30) * freq_distort_level * env_distortion));
    
    sig = (freq_reduction_level * env_distortion) * BrownNoise.ar() + note;
    sig = sig * (amp - (amp_reduction_level * amp * env_distortion));
    sig = sig * env;
    Out.ar(0, 1-pan * sig);
    Out.ar(1, pan * sig);
}).add;

SynthDef(\default_note, { |freq=440, amp=0.2, sustain=0.5, pan=0.5|
    var sig;
    sig = LFTri.ar(freq, mul: amp) * EnvGen.kr(Env.perc(0.001, sustain), doneAction: Done.freeSelf);
    Out.ar(0, 1-pan * sig);
    Out.ar(1, pan * sig);
}).add;

SynthDef(\blip, { |freq=440, amp=0.2, sustain=0.5, pan=0.5|
    var sig;
    sig = Blip.ar(freq, mul: amp) * EnvGen.kr(Env.perc(0.001, sustain), doneAction: Done.freeSelf);
    Out.ar(0, 1-pan * sig);
    Out.ar(1, pan * sig);
}).add;

SynthDef(\tick, { |rate=2, amp=0.3, sustain=0.5, pan=0.5|
    var sig;
    sig = PlayBuf.ar(1, ~drumstick_buf, BufRateScale.kr(~drumstick_buf) * rate, doneAction: Done.freeSelf) * amp;
    Out.ar(0, 1-pan * sig);
    Out.ar(1, pan * sig);
}).add;

RuntimeError: You need to start a SuperCollider SC instance first or provide an sclang/server directly.

In [3]:
%%sc
~notes = Array.newClear(128);    // array has one slot per possible MIDI note

MIDIdef.noteOn(\note_on, { 
    |veloc, num, chan, src, default_synth=\train_note|
    ~notes[num] = Synth(default_synth, [\freq, num.midicps, \amp, veloc * 0.00315]);
    }
);

MIDIdef.noteOff(\note_off, {
    |veloc, num, chan, src|
    ~notes[num].release;
    }
);

-> MIDIdef(note_off, noteOff, nil, nil, nil)


##### Setup SC functions

In [4]:
%%sc
~play_control_note = { |method, midi, pan|
                          switch(method,
                                1, { Synth(\default_note, [\freq, midi.midicps, \pan, pan]) },
                                2, { Synth(\blip, [\freq, midi.midicps, \pan, pan]) },
                                3, { Synth(\tick, [\freq, midi.midicps, \pan, pan]) })
                     };

~setup_control_task = { |train_notes, train_rhythms, method, pan = 0.5|
                        Task({
                            var current_note, task_rhythm = Pseq(train_rhythms, inf).asStream;
                            current_note = 0;
                            loop {
                                train_notes.do({ |midi|
                                    if ((~train_inputs[current_note].size == 0), {
                                        ~play_control_note.value(method, midi, pan);
                                    });
                                    task_rhythm.next.wait;
                                    current_note = current_note + 1;
                                });
                            }
                        })
                      };

~setup_control_task_left = { |train_notes, train_rhythms, method, pan = 0.5|
                        Task({
                            var current_note, task_rhythm = Pseq(train_rhythms, inf).asStream;
                            current_note = 0;
                            loop {
                                train_notes.do({ |midi|
                                    if ((~train_inputs_left[current_note].size == 0), {
                                        ~play_control_note.value(method, midi, pan);
                                    });
                                    task_rhythm.next.wait;
                                    current_note = current_note + 1;
                                });
                            }
                        })
                      };

~setup_control_task_right = { |train_notes, train_rhythms, method, pan = 0.5|
                        Task({
                            var current_note, task_rhythm = Pseq(train_rhythms, inf).asStream;
                            current_note = 0;
                            loop {
                                train_notes.do({ |midi|
                                    if ((~train_inputs_right[current_note].size == 0), {
                                        ~play_control_note.value(method, midi, pan);
                                    });
                                    task_rhythm.next.wait;
                                    current_note = current_note + 1;
                                });
                            }
                        })
                      };

~setup_train_note_on = { |method|
                         MIDIdef(\note_on).free;
                         MIDIdef(\note_on_left).free;
                         MIDIdef(\note_on_right).free; 
                         MIDIdef.noteOn(\note_on, {
                             |veloc, num, chan, src|
                             var error_beat, error_beats, current_train_note;
                             ~notes[num] = Synth(\train_note, [\freq, num.midicps, \amp, veloc * 0.00315]);
                             error_beats =  ~train_clock.beats - ~solutions_time;
                             current_train_note = error_beats.minIndex({|item, i| item.abs});
                             error_beat = error_beats[current_train_note];
                             ~notes[num].set(\distortion_length, error_beat * (-1));
                             switch(method, 
                                    0, {},
                                    1, { ~notes[num].set(\amp_reduction_level, 0.8) },
                                    2, { ~notes[num].set(\freq_distort_level, 1) },
                                    3, { ~notes[num].set(\freq_reduction_level, 1) });
                             ~train_inputs[current_train_note] = ~train_inputs[current_train_note].add([num, ~train_clock.beats]);
                         });
                       };




~setup_train_note_on_2h = { |method|
                         MIDIdef(\note_on).free;
                         MIDIdef(\note_on_left).free;
                         MIDIdef(\note_on_right).free;    
                         MIDIdef.noteOn(\note_on_left, { // Define noteOn function for left note
                             |veloc, num, chan, src|
                             if( num == ~train_note_left, {
                                 var error_beat, error_beats, current_train_note;
                                 ~notes[num] = Synth(\train_note, [\freq, num.midicps, \amp, veloc * 0.00315, \pan, 0]);
                                 error_beats =  ~train_clock.beats - ~solutions_time_left;
                                 current_train_note = error_beats.minIndex({|item, i| item.abs});
                                 error_beat = error_beats[current_train_note];
                                 ~notes[num].set(\distortion_length, error_beat * (-1));
                                 switch(method, 
                                        0, {},
                                        1, { ~notes[num].set(\amp_reduction_level, 0.8) },
                                        2, { ~notes[num].set(\freq_distort_level, 1) },
                                        3, { ~notes[num].set(\freq_reduction_level, 1) });
                                 ~train_inputs_left[current_train_note] = ~train_inputs_left[current_train_note].add([num, ~train_clock.beats]);
                             });
                         });
                         MIDIdef.noteOn(\note_on_right, { // Define noteOn function for right note
                             |veloc, num, chan, src|
                             if( num == ~train_note_right, {
                                 var error_beat, error_beats, current_train_note;
                                 ~notes[num] = Synth(\train_note, [\freq, num.midicps, \amp, veloc * 0.00315, \pan, 1]);
                                 error_beats =  ~train_clock.beats - ~solutions_time_right;
                                 current_train_note = error_beats.minIndex({|item, i| item.abs});
                                 error_beat = error_beats[current_train_note];
                                 ~notes[num].set(\distortion_length, error_beat * (-1));
                                 switch(method, 
                                        0, {},
                                        1, { ~notes[num].set(\amp_reduction_level, 0.8) },
                                        2, { ~notes[num].set(\freq_distort_level, 1) },
                                        3, { ~notes[num].set(\freq_reduction_level, 1) });
                                 ~train_inputs_right[current_train_note] = ~train_inputs_right[current_train_note].add([num, ~train_clock.beats]);
                             });
                         });    
                       };

~setup_default_note_on = {
                            MIDIdef(\note_on).free;
                            MIDIdef(\note_on_left).free;
                            MIDIdef(\note_on_right).free;
                            MIDIdef.noteOn(\note_on, { 
                                |veloc, num, chan, src|
                                ~notes[num] = Synth(\train_note, [\freq, num.midicps, \amp, veloc * 0.00315]);
                                }
                            );
                         }

-> a Function


##### Python classes and functions

In [5]:
class Task:
    def __init__(self, beat, hands, notes, rhythms, offbeats):
        self.beat = beat
        self.hands = hands
        self.notes = notes
        self.rhythms = rhythms
        self.offbeats = offbeats
        self.beats_per_bar = int(beat.split('/')[0])
        self.dur_per_beat = 4 / int(beat.split('/')[1])

class TrainingData:
    def __init__(self, task_name, task, method, bpm, run_number, solutions_note, solutions_time, train_inputs):
        self.task_name = task_name
        self.task = task
        self.method = method
        self.bpm = bpm
        self.run_number = run_number
        self.solutions_note = np.array(solutions_note)
        self.solutions_time = np.array(solutions_time)
        self.train_inputs = []
        for i in range(len(train_inputs)):
            for note in train_inputs[i]:
                note.append(i)
                self.train_inputs.append(note)
        self.train_inputs = np.array(self.train_inputs)

In [6]:
import pickle
import time

def setup_task_sc(task, settings, method):
    global bpm, wait_bars, train_bars, allow_error, task_mode, beats_per_bar, dur_per_beat
    bpm = settings['bpm']
    wait_bars = settings['wait_bars']
    train_bars = settings['train_bars']
    allow_error = settings['allow_error']
    task_mode = settings['task_mode']
    beats_per_bar = task.beats_per_bar
    dur_per_beat = task.dur_per_beat
    %scs ~dur_per_beat = ^dur_per_beat
    setup_metronome_sc(task_mode)
    if task.hands == 1:
        setup_task_1h_sc(task, method)
    elif task.hands == 2:
        setup_task_2h_sc(task, method)
        
def setup_metronome_sc(mode):
    if mode == 'test':
        if beats_per_bar == 4:
            sc.lang.cmds(r"""
                ~metronome_amps = Pseq([~metronome_amp], ^wait_bars*^beats_per_bar) ++
                                  Pseq([~metronome_amp], ^beats_per_bar) ++ Pseq([~metronome_amp, 0], 2) ++
                                  Pseq([~metronome_amp], 1) ++ Pseq([0], ^beats_per_bar-1) ++
                                  Pseq([0], inf);
            """)
        else:
            sc.lang.cmds(r"""
                ~metronome_amps = Pseq([~metronome_amp], ^wait_bars*^beats_per_bar) ++
                                  Pseq([~metronome_amp], ^beats_per_bar) ++ 
                                  Pseq([~metronome_amp], 1) ++ Pseq([0], ^beats_per_bar-1) ++
                                  Pseq([0], inf);
            """) 
    else:
        %scs ~metronome_amps = Pseq([~metronome_amp], inf);
    sc.lang.cmds(r"""
        ~metronome = Task({
            var rate_pattern = Pseq(~metronome_rates.reshape(^beats_per_bar), ^wait_bars + ^train_bars).asStream;
            var amp_pattern = ~metronome_amps.asStream;
            loop {
                Synth(\play_buf, [buf: ~drumstick_buf, amp: amp_pattern.next, rate: rate_pattern.next]);
                1.wait;
            }
        });
    """)
        
def setup_task_1h_sc(task, method):
    global train_notes, train_rhythms, train_offbeat, task_mode, use_method
    train_notes = task.notes
    train_rhythms = task.rhythms
    train_offbeat = task.offbeats
    use_method = method
    sc.lang.cmds(r"""
        var task_duration = ^train_offbeat, number_all_notes = 0, all_note_durs = Pseq(^train_rhythms, inf).asStream, timing_all_notes = List[];
        while({task_duration < (^train_bars * ^beats_per_bar)}, {
            number_all_notes = number_all_notes + 1;
            timing_all_notes.add(task_duration);
            task_duration = task_duration + all_note_durs.next;
            });
        ~number_all_notes = number_all_notes;
        ~timing_all_notes = timing_all_notes.array;
    """) 
    if task_mode == 'demo':
        sc.lang.cmds(r"""
            ~control_task = Task({
                var task_rhythm = Pseq(^train_rhythms, inf).asStream;
                loop {
                    ^train_notes.do({ |midi|
                    Synth(\default_note, [\freq, midi.midicps]);
                    task_rhythm.next.wait;
                    });
                }
            })
        """)
    elif task_mode == 'train':
        if method == 0:
            %scs ~control_task = Task({});
        else:
            %scs ~control_task = ~setup_control_task.value(^train_notes, ^train_rhythms, ^use_method);
        %scs ~setup_train_note_on.value(^use_method)
    elif task_mode == 'test':
        %scs ~setup_train_note_on.value(0)

def setup_task_2h_sc(task, method):
    global train_notes_left, train_notes_right, train_rhythms_left, train_rhythms_right, train_offbeat_left, train_offbeat_right, task_mode, use_method
    train_notes_left = task.notes[0]
    train_notes_right = task.notes[1]
    train_rhythms_left = task.rhythms[0]
    train_rhythms_right = task.rhythms[1]
    train_offbeat_left = task.offbeats[0]
    train_offbeat_right = task.offbeats[1]
    use_method = method
    %scs ~train_note_left = ^train_notes_left[0]
    %scs ~train_note_right = ^train_notes_right[0]
    sc.lang.cmds(r"""
        var task_duration = ^train_offbeat_left, 
        number_all_notes_left = 0, all_note_durs_left = Pseq(^train_rhythms_left, inf).asStream, timing_all_notes_left = List[],
        number_all_notes_right = 0, all_note_durs_right = Pseq(^train_rhythms_right, inf).asStream, timing_all_notes_right = List[];
        while({task_duration < (^train_bars * ^beats_per_bar)}, {
            number_all_notes_left = number_all_notes_left + 1;
            timing_all_notes_left.add(task_duration);
            task_duration = task_duration + all_note_durs_left.next;
            });
        task_duration = ^train_offbeat_right;
        while({task_duration < (^train_bars * ^beats_per_bar)}, {
            number_all_notes_right = number_all_notes_right + 1;
            timing_all_notes_right.add(task_duration);
            task_duration = task_duration + all_note_durs_right.next;
            });
        ~number_all_notes_left = number_all_notes_left;
        ~timing_all_notes_left = timing_all_notes_left.array;
        ~number_all_notes_right = number_all_notes_right;
        ~timing_all_notes_right = timing_all_notes_right.array;
    """)
    if task_mode == 'demo':
        sc.lang.cmds(r"""
            ~control_task_left = Task({
                var task_rhythm = Pseq(^train_rhythms_left, inf).asStream;
                loop {
                    ^train_notes_left.do({ |midi|
                    Synth(\default_note, [\freq, midi.midicps, \pan, 0]);
                    task_rhythm.next.wait;
                    });
                }
            });
            ~control_task_right = Task({
                var task_rhythm = Pseq(^train_rhythms_right, inf).asStream;
                loop {
                    ^train_notes_right.do({ |midi|
                    Synth(\default_note, [\freq, midi.midicps, \pan, 1]);
                    task_rhythm.next.wait;
                    });
                }
            });
        """)
    elif task_mode == 'train':
        if method == 0:
            %scs ~control_task_left = Task({});
            %scs ~control_task_right = Task({}); 
        else:
            %scs ~control_task_left = ~setup_control_task_left.value(^train_notes_left, ^train_rhythms_left, ^use_method, pan: 0);
            %scs ~control_task_right = ~setup_control_task_right.value(^train_notes_right, ^train_rhythms_right, ^use_method, pan: 1);
        %scs ~setup_train_note_on_2h.value(^use_method)
    elif task_mode == 'test':
        %scs ~setup_train_note_on_2h.value(0)

def play_task_sc(task):
    %scs ~train_clock = TempoClock.new
    %scs ~train_clock.tempo = ^bpm/60
    if task.hands == 1:
        play_task_1h_sc()
    elif task.hands == 2:
        play_task_2h_sc()            
            
def play_task_1h_sc():
    sc.lang.cmds(r"""
        ~train_clock.schedAbs(~train_clock.beats.ceil, {
            ~train_inputs = ([]!~number_all_notes);
            ~solutions_time = ~timing_all_notes + ((~train_clock.beats + (^wait_bars * ^beats_per_bar)) ! ~number_all_notes);
            ~solutions_note = Pseq(^train_notes, inf).asStream.nextN(~number_all_notes);
            ~metronome.play(~train_clock);
            if((^task_mode != "test"), {
                ~train_clock.schedAbs(~train_clock.beats + (^wait_bars * ^beats_per_bar) + ^train_offbeat + ^allow_error, {
                    ~control_task.play(~train_clock);
                })
            });
            ~train_clock.sched((^wait_bars + ^train_bars) * ^beats_per_bar - 0.01, {
                ~metronome.stop;
            });      
            ~train_clock.sched((^wait_bars + ^train_bars) * ^beats_per_bar + ^allow_error - 0.01, {
                ~control_task.stop;  
                ~setup_default_note_on.value;
                ~train_clock.stop;
            })
        });
    """)    
    
def play_task_2h_sc():
    sc.lang.cmds(r"""
        ~train_clock.schedAbs(~train_clock.beats.ceil, {
            ~train_inputs_left = ([]!~number_all_notes_left); ~train_inputs_right = ([]!~number_all_notes_right);
            ~solutions_time_left = ~timing_all_notes_left + ((~train_clock.beats + (^wait_bars * ^beats_per_bar)) ! ~number_all_notes_left);
            ~solutions_time_right = ~timing_all_notes_right + ((~train_clock.beats + (^wait_bars * ^beats_per_bar)) ! ~number_all_notes_right);
            ~solutions_note_left = Pseq(^train_notes_left, inf).asStream.nextN(~number_all_notes_left);
            ~solutions_note_right = Pseq(^train_notes_right, inf).asStream.nextN(~number_all_notes_right);
            ~metronome.play(~train_clock);
            if((^task_mode != "test"), {
                ~train_clock.schedAbs(~train_clock.beats + (^wait_bars * ^beats_per_bar) + ^train_offbeat_left + ^allow_error, {
                    ~control_task_left.play(~train_clock);
                });
                ~train_clock.schedAbs(~train_clock.beats + (^wait_bars * ^beats_per_bar) + ^train_offbeat_right + ^allow_error, {
                    ~control_task_right.play(~train_clock);
                })
            });
            ~train_clock.sched((^wait_bars + ^train_bars) * ^beats_per_bar - 0.01, {
                ~metronome.stop;
            }); 
            ~train_clock.sched((^wait_bars + ^train_bars) * ^beats_per_bar + ^allow_error - 0.01, {
                ~control_task_left.stop; ~control_task_right.stop;
                ~setup_default_note_on.value;
                ~train_clock.stop;
            })
        });
    """)

def collect_training_data(training_data_list, task_name, task, method, bpm, run_number):
    if task.hands == 1:
        solutions_note_all = %scgs ~solutions_note
        solutions_time_all = %scgs ~solutions_time
        train_inputs_all = %scgs ~train_inputs
    elif task.hands == 2:
        solutions_note_left = %scgs ~solutions_note_left
        solutions_note_right = %scgs ~solutions_note_right
        solutions_time_left = %scgs ~solutions_time_left
        solutions_time_right = %scgs ~solutions_time_right
        train_inputs_left = %scgs ~train_inputs_left
        train_inputs_right = %scgs ~train_inputs_right
        solutions_note_all = [solutions_note_left, solutions_note_right]
        solutions_time_all = [solutions_time_left, solutions_time_right]
        train_inputs_all = [train_inputs_left, train_inputs_right]
        
    training_data_list.append(
        TrainingData(task_name, task, method, bpm, run_number, solutions_note_all, solutions_time_all, train_inputs_all)
    )

def save_data_to_file(data, name):
    f = open('./experiment_data/' + name + '_' + time.strftime('%b%d%Y_%H%M%S', time.gmtime()), 'wb')
    pickle.dump(data, f)
    f.close()

##### GUI elements

In [25]:
import ipywidgets as widgets
from fractions import Fraction
import time

task_list = {}
training_data = []
method_list = [('No method', 0), ('Method 1', 1), ('Method 2', 2), ('Method 3', 3)]
experiment_tasks = {
                    'Task 0': Task('4/4', 1, [60], [1], 0),
                    'Task 1': Task('4/4', 1, [60], [1/2, 1/2, 1], 0),
                    'Task 2': Task('4/4', 1, [60], [1/2, 1/2, 1], 0.5),
                    'Task 3': Task('4/4', 1, [60], [1, 1/2, 1/2], 0)}
experiment_bpm = 60
experiment_wait_bars = 1
experiment_train_bars = 1
experiment_allow_error = 0.05
runs_per_routine = 2
current_run_number = 0
experiment_mode = False

############################
### FUNCTION DEFINITIONS ###
############################

# General functions used by the GUI
def get_task_duration():
    return 60/bpm_setting.value * (wait_bars_setting.value+train_bars_setting.value) * int(beat_setting.value.split('/')[0])

def play_task(mode):
    settings = {'task_mode': mode,
                'bpm': bpm_setting.value,
                'wait_bars': wait_bars_setting.value,
                'train_bars': train_bars_setting.value,
                'allow_error': allow_error_setting.value
               }
    if mode == 'demo':
        settings.update({'allow_error': 0})
    setup_task_sc(task_list[select_task.value], settings, select_method.value)
    play_task_sc(task_list[select_task.value])

def disable_settings():
    add_save_task_button.disabled = True
    delete_task_button.disabled = True
    select_task.disabled = True
    select_method.disabled = True
    bpm_setting.disabled = True
    wait_bars_setting.disabled = True
    train_bars_setting.disabled = True
    allow_error_setting.disabled = True
    
def enable_settings():
    add_save_task_button.disabled = False
    delete_task_button.disabled = False
    select_task.disabled = False
    select_method.disabled = False
    bpm_setting.disabled = False
    wait_bars_setting.disabled = False
    train_bars_setting.disabled = False
    allow_error_setting.disabled = False
    
def disable_plays():
    play_demo_button.disabled = True
    start_test_button.disabled = True
    start_training_button.disabled = True
        
def enable_plays():
    play_demo_button.disabled = False
    start_test_button.disabled = False
    start_training_button.disabled = False
    
    
# Experiment functions
def start_new_experiment():
    global task_list, training_data, experiment_mode
    task_list = experiment_tasks
    training_data = []
    disable_settings()
    experiment_mode = True
    
def start_new_routine():
    global current_run_number
    disable_plays()
    current_run_number = 0
    run_counter.value = '1. test'
    play_demo_button.disabled = False
    start_test_button.disabled = False
    next_task_button.disabled = True

def proceed_to_next_phase(last_action):
    global current_run_number
    if last_action == 'Play Demo':
        play_demo_button.disabled = False
        if current_run_number == 0 or current_run_number == runs_per_routine + 1:
            start_test_button.disabled = False
        else:
            start_training_button.disabled = False
    elif last_action == 'Start Test':
        if current_run_number == 0:
            play_demo_button.disabled = False
            start_training_button.disabled = False
            current_run_number += 1
            run_counter.value = str(current_run_number)
        elif select_task.index < len(select_task.options) - 1:     # End of routine reached
            next_task_button.disabled = False
            run_counter.value = 'End of this routine. Please proceed to the next task.'
        else:                                                      # End of whole experiment reached
            run_counter.value = 'End of this experiment. You can see results or quit.'
            view_results_button.disabled = False
    elif last_action == 'Start Training':
        play_demo_button.disabled = False
        if current_run_number < runs_per_routine:
            start_training_button.disabled = False
            current_run_number += 1
            run_counter.value = str(current_run_number)
        else:
            start_test_button.disabled = False
            current_run_number += 1
            run_counter.value = '2. test'
    
def quit_experiment():
    global task_list, training_data, experiment_mode
    task_list = {}
    training_data = []
    enable_settings()
    enable_plays()
    experiment_mode = False   
    
    
# Event functions
def add_save_task_from_gui(b):
    if hand_settings.selected_index == 0:
        task_list.update({task_name.value : Task(
            beat_setting.value, 1, 
            [int(x) for x in notes_setting.value.split(',')], 
            [float(Fraction(x)) for x in rhythms_setting.value.split(',')],
            float(Fraction(offbeat_setting.value))
        )})
    else:
        task_list.update({task_name.value : Task(
            beat_setting.value, 2,
            [[int(x) for x in note_left_setting.value.split(',')], [int(x) for x in note_right_setting.value.split(',')]],
            [[float(Fraction(x)) for x in rhythm_left_setting.value.split(',')], [float(Fraction(x)) for x in rhythm_right_setting.value.split(',')]],
            [float(Fraction(offbeat_left_setting.value)), float(Fraction(offbeat_right_setting.value))]
        )})
    select_task.options = task_list.keys()

def delete_task_from_gui(b):
    task_list.pop(task_name.value)
    select_task.options = task_list.keys()

def play_task_from_gui(b): 
    if select_task.value != None: 
        if b.description == 'Play Demo':
            play_task('demo')
        elif b.description == 'Start Test':
            play_task('test')
        elif b.description == 'Start Training':
            if select_method.value == None: return
            else: play_task('train')
        task_duration = get_task_duration() 
        disable_plays()
        time.sleep(task_duration + 1)
        if b.description != 'Play Demo':
            collect_training_data(training_data, select_task.value, task_list[select_task.value], 
                                  select_method.value, bpm_setting.value, current_run_number)
        if experiment_mode == False:
            enable_plays()
        else:
            proceed_to_next_phase(b.description)

def start_experiment_from_gui(b):
    start_new_experiment()
    len(task_list)
    bpm_setting.value = experiment_bpm
    wait_bars_setting.value = experiment_wait_bars
    train_bars_setting.value = experiment_train_bars
    allow_error_setting.value = experiment_allow_error
    select_task.options = task_list.keys()
    select_task.index = 0
    select_method.index = 0
    start_new_routine()
    start_experiment_button.disabled = True
    quit_experiment_button.disabled = False

def proceed_to_next_task(b):
    if select_task.index < len(select_task.options):
        select_task.index += 1
        select_method.index += 1
        start_new_routine()

def quit_experiment_from_gui(b):
    if len(training_data) > 0:
        save_data_to_file(training_data, 'Experiment Data')
    quit_experiment()
    start_experiment_button.disabled = False
    quit_experiment_button.disabled = True
    view_results_button.disabled = True
    select_task.options = task_list.keys()

def view_results(b):
    plot_all_training_data(training_data)

#################
### GUI SETUP ###
#################

# Task Settings
beat_setting = widgets.Combobox(options=[('2/4'), ('3/4'), ('4/4')],value='4/4')


# Settings for 1-hand tasks
notes_setting = widgets.Text(value='60')
rhythms_setting = widgets.Text(value='1')
offbeat_setting = widgets.Text(value='0', layout=widgets.Layout(width='50px'))

one_hand_settings = widgets.GridBox([
    widgets.Label('Notes:'), notes_setting,
    widgets.Label('Rhythm:'), rhythms_setting,
    widgets.Label('Offbeat:'), offbeat_setting
    ], layout=widgets.Layout(grid_template_columns='repeat(6, auto)'))


# Settings for 2-hands tasks
note_left_setting = widgets.Text(value='48')
note_right_setting = widgets.Text(value='60')

rhythm_left_setting = widgets.Text(value='1')
rhythm_right_setting = widgets.Text(value='1')

offbeat_left_setting = widgets.Text(value='0', layout=widgets.Layout(width='50px'))
offbeat_right_setting = widgets.Text(value='0', layout=widgets.Layout(width='50px'))

info_two_hands = widgets.HTML(value='<p><b>Training with two hands only supports tapping a single note per hand.</b><br>Please assign two different notes for the right and the left hand. (Default settings are C for the right hand and CC for the left hand.)</p>')

two_hands_settings = widgets.GridBox([
    widgets.Label('Left hand note:'), note_left_setting,
    widgets.Label('Right hand note:'), note_right_setting,
    widgets.Label('Left hand rhythm:'), rhythm_left_setting,
    widgets.Label('Right hand rhythm:'), rhythm_right_setting,
    widgets.Label('Left hand offbeat:'), offbeat_left_setting,
    widgets.Label('Right hand offbeat:'), offbeat_right_setting,
    ], layout=widgets.Layout(grid_template_columns='repeat(4, auto)'))


# Combining settings into one tab
hand_settings = widgets.Tab(children=[one_hand_settings, widgets.VBox([info_two_hands, two_hands_settings])], selected_index=0)
hand_settings.set_title(0, '1-hand')
hand_settings.set_title(1, '2-hands')


# Add, save and delete task
task_name = widgets.Text()

add_save_task_button = widgets.Button(
    description='Add / Save Task',
    button_style='info',
    tooltip='Create a new task or update the task with the given name',
    icon='plus'
)
add_save_task_button.on_click(add_save_task_from_gui)

delete_task_button = widgets.Button(
    description='Delete Task',
    button_style='danger',
    tooltip='Delete the task with the given name',
    icon='times'
)
delete_task_button.on_click(delete_task_from_gui)

task_settings = widgets.VBox([widgets.HBox([widgets.Label('Beat:'), beat_setting,]),
                              hand_settings, 
                              widgets.HBox([widgets.Label('Name:'), task_name, add_save_task_button, delete_task_button])
                             ])


# General Settings
bpm_setting = widgets.IntSlider(value=60, min=30, max=200, description='BPM:')
wait_bars_setting = widgets.IntSlider(value=1, min=1, max=4, description='Wait:')
train_bars_setting = widgets.IntSlider(value=1, min=1, max=10, description='Train:')
allow_error_setting = widgets.FloatSlider(value=0.05, step=0.01, min=0, max=0.5, description='Allow error:')

general_settings = widgets.VBox([bpm_setting, 
                                 widgets.HBox([wait_bars_setting, widgets.Label('bars')]), 
                                 widgets.HBox([train_bars_setting, widgets.Label('bars')]),
                                 widgets.HBox([allow_error_setting, widgets.Label('beat')])
                                ])


# Combining settings into one accordion
settings = widgets.Accordion(children=[task_settings, general_settings], selected_index=None)
settings.set_title(0, 'Task Settings')
settings.set_title(1, 'General Settings')


# Main layout for testing and training
play_demo_button = widgets.Button(
    description='Play Demo',
    button_style='',
    tooltip='Hear the full task',
    icon='music'
)
play_demo_button.on_click(play_task_from_gui)

start_test_button = widgets.Button(
    description='Start Test',
    button_style='info',
    tooltip='Start the test',
    icon='play'
)
start_test_button.on_click(play_task_from_gui)

start_training_button = widgets.Button(
    description='Start Training',
    button_style='warning',
    tooltip='Start training session for the task',
    icon='dumbbell'
)
start_training_button.on_click(play_task_from_gui)

next_task_button = widgets.Button(
    description='Next Task',
    button_style='success',
    tooltip='Proceed to the next task',
    icon='forward',
    disabled=True
)
next_task_button.on_click(proceed_to_next_task)

start_experiment_button = widgets.Button(
    description='Start New Experiment',
    button_style='success',
    tooltip='Play through a serie of experiment tasks',
    icon='plus',
    layout=widgets.Layout(width='auto')
)
start_experiment_button.on_click(start_experiment_from_gui)

quit_experiment_button = widgets.Button(
    description='Quit Experiment',
    button_style='danger',
    tooltip='Quit experiment',
    icon='door-open',
    disabled=True
)
quit_experiment_button.on_click(quit_experiment_from_gui)

view_results_button = widgets.Button(
    description='View Results',
    button_style='info',
    tooltip='Quit experiment',
    icon='chart-line',
    disabled=True
)
view_results_button.on_click(view_results)

select_task = widgets.Dropdown(options=task_list.keys(), description="Task:")
select_method = widgets.Dropdown(options=method_list, description="Method:")
run_counter = widgets.Label(str(current_run_number))

api_gui = widgets.VBox([settings, 
                        widgets.HBox([start_experiment_button, quit_experiment_button, view_results_button]), 
                        widgets.HBox([select_task, play_demo_button, start_test_button, next_task_button]), 
                        widgets.HBox([select_method, start_training_button, widgets.Label('Current run:'), run_counter])])

In [8]:
%matplotlib widget
import ipywidgets as widgets
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

sorted_training_data = {}
round_decimal = 6
input_selected = []
input_selected_ax = None

############################
### FUNCTION DEFINITIONS ###
############################

# General functions used by the GUI
def compute_absolute_training_errors(solutions, train_inputs, missed_input_error):
    absolutes = [[] for _ in range(len(solutions))]
    for train_input in train_inputs:
        index = int(train_input[2])
        absolute_error = abs(train_input[1] - solutions[index])
        absolutes[index].append(absolute_error)
    for result in absolutes:
        if len(result) == 0:     # There is no input for this note
            result.append(missed_input_error)
        elif len(result) > 1:    # There are multiple inputs registered for this note
            result_mean = np.mean(result)
            result.clear()
            result.append(result_mean)
    absolutes = np.array(absolutes).flatten()
    return absolutes
    
def compute_relative_training_errors(solutions, train_inputs, missed_input_error):
    solutions_relatives = np.diff(solutions)
    relatives = [[] for _ in range(len(solutions_relatives))]
    time_last_notes = [[] for _ in range(len(solutions))]
    for train_input in train_inputs:
        index = int(train_input[2])
        time_last_notes[index].append(train_input[1])
        if index > 0:
            for time in time_last_notes[index-1]:
                relative_distance = train_input[1] - time
                relative_error = abs(relative_distance - solutions_relatives[index-1])
                relatives[index-1].append(relative_error)
    for result in relatives:
        if len(result) == 0:     # There is no input for this note
            result.append(missed_input_error)
        elif len(result) > 1:    # There are multiple inputs registered for this note
            result_mean = np.mean(result)
            result.clear()
            result.append(result_mean)   
    relatives = np.array(relatives).flatten()
    return relatives
    
def sort_all_training_data(training_data_list):
    result = {}
    for data in training_data:
        if not data.task_name in result:
            task_name = data.task_name
            result.update({task_name: []})
        result[task_name].append(data)
    for task_name in result:
        result[task_name].sort(key=lambda training_data: training_data.run_number)
    return result
            
def plot_all_training_data(training_data_list):
    assert len(training_data_list) > 0, 'No training data provided.'
    global sorted_training_data, fig, ax_test1, ax_train, ax_test2
    sorted_training_data = sort_all_training_data(training_data_list)
    select_task_data.options = sorted_training_data.keys()
    select_task_data.index = 0
    plot_full_task(select_task_data.value) 

def plot_full_task(task_name):
    training_data_list = sorted_training_data[task_name]
    global fig, ax_test1, ax_train, ax_test2
    plot_training_data(fig, ax_test1, training_data_list[0])
    if len(training_data_list) > 2:
        plot_training_data(fig, ax_train, training_data_list[1])
        select_run.value = 1
        select_run.max = len(training_data_list) - 2
        plot_training_data(fig, ax_test2, training_data_list[-1])

def plot_training_data(fig, ax, training_data):
    clear_axes(ax)
    start_beat = np.floor(training_data.solutions_time[0])
    x_solutions = training_data.solutions_time - start_beat
    y_solutions = training_data.solutions_note
    inputs = np.hsplit(training_data.train_inputs, 3)
    x_inputs = inputs[1].flatten() - start_beat
    y_inputs = inputs[0].flatten()
    ax.plot(x_solutions, y_solutions, 'x')
    ax.plot(x_inputs, y_inputs, '*', picker=5)
    fig.canvas.draw()
    absolute_errors = compute_absolute_training_errors(training_data.solutions_time, training_data.train_inputs, 0.5)
    relative_errors = compute_relative_training_errors(training_data.solutions_time, training_data.train_inputs, 0.5)
    update_errors(ax, absolute_errors, relative_errors)
    
def update_errors(ax, absolute_errors, relative_errors):
    print("absolutes", absolute_errors)
    print("relatives", relative_errors)
    if ax == ax_test1:
        absolute_error_test1.value = 'mean: ' + str(np.around(np.mean(absolute_errors), round_decimal)) + '; std: ' + str(np.around(np.std(absolute_errors), round_decimal))
        relative_error_test1.value = 'mean: ' + str(np.around(np.mean(relative_errors), round_decimal)) + '; std: ' + str(np.around(np.std(relative_errors), round_decimal))
    elif ax == ax_train:
        absolute_error_train.value = 'mean: ' + str(np.around(np.mean(absolute_errors), round_decimal)) + '; std: ' + str(np.around(np.std(absolute_errors), round_decimal))
        relative_error_train.value = 'mean: ' + str(np.around(np.mean(relative_errors), round_decimal)) + '; std: ' + str(np.around(np.std(relative_errors), round_decimal))
    elif ax == ax_test2:
        absolute_error_test2.value = 'mean: ' + str(np.around(np.mean(absolute_errors), round_decimal)) + '; std: ' + str(np.around(np.std(absolute_errors), round_decimal))
        relative_error_test2.value = 'mean: ' + str(np.around(np.mean(relative_errors), round_decimal)) + '; std: ' + str(np.around(np.std(relative_errors), round_decimal))

def clear_axes(ax):
    ax_title = ax.get_title()
    ax_xlabel = ax.get_xlabel()
    ax_ylabel = ax.get_ylabel()
    ax.clear()
    ax.grid(axis='x')
    ax.set_title(ax_title)
    ax.set_xlabel(ax_xlabel)
    ax.set_ylabel(ax_ylabel)
    
def clear_all():
    clear_axes(ax_test1)
    clear_axes(ax_train)
    clear_axes(ax_test2)
    fig.canvas.draw()

# Event functions
def update_training_run_plot(change):
    training_data_list = sorted_training_data[select_task_data.value]
    plot_training_data(fig, ax_train, training_data_list[change.new])

def update_task_plot(change):
    plot_full_task(change.new)

def note_index_changed(change):
    print('stuff')
    update_note_index_button.disabled = False

def update_note_index_for_input_selected(b):
    print('update index')
    new_index = int(select_note_index.value)
    training_data = input_selected[0]
    input_index = input_selected[1]
    training_data.train_inputs[input_index][2] = new_index
    plot_training_data(fig, input_selected_ax, training_data)
    update_note_index_button.disabled = True

def on_pick(event):
    global input_selected, input_selected_ax
    if isinstance(event.artist, Line2D):
        thisline = event.artist
        xdata = thisline.get_xdata()
        ydata = thisline.get_ydata()
        ind = event.ind
        if thisline in ax_test1.lines:
            training_data_index = 0
            input_selected_ax = ax_test1
        elif thisline in ax_train.lines:
            training_data_index = select_run.value
            input_selected_ax = ax_train
        elif thisline in ax_test2.lines:
            training_data_index = -1
            input_selected_ax = ax_test2
        training_data = sorted_training_data[select_task_data.value][training_data_index]
        input_selected = [training_data, ind[0]]
        select_note_index.options = list(range(len(training_data.solutions_time)))
        select_note_index.index = int(training_data.train_inputs[ind[0]][2])
        input_selected_description.value = '[' + str(np.around(np.take(xdata, ind)[0], 2)) + ', ' + str(int(np.take(ydata, ind))) + ']'
        

    
#################
### GUI SETUP ###
#################

# Setup GUI for plotting results   
plt.ioff()

select_task_data = widgets.Dropdown(options=sorted_training_data.keys(), description="Task:")
select_task_data.observe(update_task_plot, names='value')

input_selected_description = widgets.Label('none')
select_note_index = widgets.Dropdown(options=[], layout=widgets.Layout(width='50px'))
select_note_index.observe(note_index_changed, names='value')
update_note_index_button = widgets.Button(
    description='Update Index',
    button_style='info',
    tooltip='Change the note to which this input belongs',
    icon='',
    disabled=True
)
update_note_index_button.on_click(update_note_index_for_input_selected)

select_run = widgets.IntSlider(
    description='Train run:',
    value=1,
    min=1,
    max=3
)
select_run.observe(update_training_run_plot, names='value')

absolute_error_test1 = widgets.Label(value='mean: N/A; std: N/a')
relative_error_test1 = widgets.Label(value='mean: N/A; std: N/a')
absolute_error_train = widgets.Label(value='mean: N/A; std: N/a')
relative_error_train = widgets.Label(value='mean: N/A; std: N/a')
absolute_error_test2 = widgets.Label(value='mean: N/A; std: N/a')
relative_error_test2 = widgets.Label(value='mean: N/A; std: N/a')

errors_test1 = widgets.VBox([widgets.HBox([widgets.Label('Absolute error'), absolute_error_test1]),
                             widgets.HBox([widgets.Label('Relative error'), relative_error_test1])])
errors_train = widgets.VBox([widgets.HBox([widgets.Label('Absolute error'), absolute_error_train]),
                             widgets.HBox([widgets.Label('Relative error'), relative_error_train])])
errors_test2 = widgets.VBox([widgets.HBox([widgets.Label('Absolute error'), absolute_error_test2]),
                             widgets.HBox([widgets.Label('Relative error'), relative_error_test2])])
show_errors = widgets.HBox([errors_test1, errors_train, errors_test2], layout=widgets.Layout(justify_content='space-around', width='985px'))

# Plot figure
fig, (ax_test1, ax_train, ax_test2) = plt.subplots(1, 3)
fig.set_size_inches((10, 3))
ax_test1.grid(axis='x')
ax_test1.set_title('1. Test Run')
ax_test1.set_xlabel('Time in Beats')
ax_test1.set_ylabel('Note in MIDI')
ax_train.grid(axis='x')
ax_train.set_title('Training Runs')
ax_train.set_xlabel('Time in Beats')
ax_test2.grid(axis='x')
ax_test2.set_title('2. Test Run')
ax_test2.set_xlabel('Time in Beats')
fig.tight_layout()
fig.canvas.header_visible = False

fig.canvas.mpl_connect('pick_event', on_pick)

select_task_point = widgets.HBox([select_task_data, 
                                  widgets.HBox([widgets.Label('Selected input:'), input_selected_description,
                                                widgets.Label('at note index'), select_note_index, update_note_index_button])], 
                                 layout=widgets.Layout(justify_content='space-between', width='800px'))
results_plot = widgets.VBox([select_run, fig.canvas, show_errors])
results_plot.layout.align_items = 'center'

plot_gui = widgets.VBox([select_task_point, results_plot])
plot_gui.layout.align_items = 'center'

# rhythm_training
This application is dedicated to the accurate performing of a given rhythm. You can setup tasks to play after and hear real-time audio feedback while playing.

### Initialization
It is recommended to use JupyterLab to open this notebook. The following Python packages are required:
* [sc3nb](https://pypi.org/project/sc3nb/)
* [ipywidgets](https://pypi.org/project/ipywidgets/) (version 7.5.6 and later)
* [ipympl](https://matplotlib.org/ipympl/) for interactive matplotlib functions

Please make sure a MIDI keyboard is connected to the device this notebook is running on, then restart the kernel and run all cells.

## Free Play
You can setup and play training tasks manually.

### Task Settings
<b>Beat:</b> The desired beat of the task. You can choose between 2/4, 3/4 and 4/4. Manual inputs should follow the same format.

<b>Notes:</b> A sequence of notes to be repeatedly played during the task. The values should be MIDI values: Middle C is 60, C# is 61, D is 62 etc. Notes are seperated by a single comma. E.g. a sequence C D E will be written as 60,62,64

Please note that this application only deals with rhythm and timing issues, so even if you can specify the notes you play, only temporal errors are processed.

<b>Rhythm:</b> A sequence of note length to be repeatedly played during the task. The values should be relative to quarternote: a quarter note is 1, an eighth note is 1/2 etc. Note lengths are seperated by a single comma. E.g. a sequence if one quarter note and two eighth notes will be written as 1,1/2,1/2

<b>Offbeat:</b> An offset at the beginning of the Task. Should be written as whole numbers or fractions (1/2 for half beat off).

<b>Name:</b> The name of the task. If the name already exist in the task list, the corresponding task in the task list will be updated.

### General Settings
<b>BPM:</b> Tempo of the task.

<b>Wait:</b> How many bars to wait before the task starts. The metronome will play during the waiting bars.

<b>Train:</b> How many bars to be played during the task.

<b>Allow error:</b> Input latency within this range will not be considered missing the note, so no control sound will be played to remind you to play the note.

### Start Test
Play the selected Task without any help. You can use Play Demo to hear the full task.

### Start Training
Play the selected Task with the selected sonification method. Different method will use different sound cues when you play the note at a wrong time.

## Experiment
Play through a set of designed tasks. After you complete the experiment, you can view your training results in the following cell.

In [26]:
display(api_gui)

VBox(children=(Accordion(children=(VBox(children=(HBox(children=(Label(value='Beat:'), Combobox(value='4/4', o…

In [10]:
display(plot_gui)

VBox(children=(HBox(children=(Dropdown(description='Task:', options=(), value=None), HBox(children=(Label(valu…

## Exit
Run the following cell to exit the SC server.

In [11]:
# plt.close(fig)
# sc.exit()

Quitting SCServer... Done.
Exiting sclang... Done.
