# Framework tests

## Sonification class interactive tests

In [None]:
import pandas as pd
import panson as ps
import os
import time

FILE_DIR = '../media/files'
OUT_DIR = os.path.join(FILE_DIR, 'processed')

In [None]:
df = pd.read_csv(os.path.join(OUT_DIR, "phone.csv"), sep=r',\s*', engine='python')
df.head()

In [None]:
import sc3nb as scn

In [None]:
# start scsynth
# sc = scn.startup(start_sclang=False)
sc = scn.startup()
# connect scsynth to the system playback
!jack_connect "SuperCollider:out_1" "system:playback_1"
!jack_connect "SuperCollider:out_2" "system:playback_2"

In [None]:
sc.exit()

In [None]:
# message style (explicit ID allocation)
class AU04ContinuousSonification(ps.Sonification):
    
    def initialize(self, server):
        return []
    
    def start(self, server):
        self.au4_node_id = server.node_ids.allocate(1)[0]
        
        return [
            scn.OSCMessage("/s_new", ["s2", self.au4_node_id, 0, 0, "amp", 0])
        ]
    
    def stop(self, server):
        # actually this does nothing
        server.node_ids.free([self.au4_node_id])
        
        return [
            scn.OSCMessage("/g_freeAll", [0])
        ]
    
    def process(self, row, server):
        msgs = []
        
        # only "max" should be enough (to clip the top part to 0.3)
        amp = scn.linlin(row["AU04_r"], 0, 1, 0, 0.3, "minmax")
        # map the intensity of the AU in one octave range
        freq = scn.midicps(scn.linlin(row["AU04_r"], 0, 5, 69, 81))
        
        msgs.append(scn.OSCMessage("/n_set", [self.au4_node_id, "amp", amp, "freq", freq]))
        
        return msgs

In [None]:
# a (implicit ID allocation)
class AU04ContinuousSonification(ps.Sonification):
    
    def initialize(self, server):
        return []
    
    def start(self, server):
        self.au4_node_id = server.node_ids.allocate(1)[0]
        
        return [
            scn.OSCMessage("/s_new", ["s2", self.au4_node_id, 0, 0, "amp", 0])
        ]
    
    def stop(self, server):
        # actually this does nothing
        server.node_ids.free([self.au4_node_id])
        
        return [
            scn.OSCMessage("/g_freeAll", [0])
        ]
    
    def process(self, row, server):
        msgs = []
        
        # only "max" should be enough (to clip the top part to 0.3)
        amp = scn.linlin(row["AU04_r"], 0, 1, 0, 0.3, "minmax")
        # map the intensity of the AU in one octave range
        freq = scn.midicps(scn.linlin(row["AU04_r"], 0, 5, 69, 81))
        
        msgs.append(scn.OSCMessage("/n_set", [self.au4_node_id, "amp", amp, "freq", freq]))
        
        return msgs

In [None]:
son = AU04ContinuousSonification()

### Realtime usage

#### Offline

In [None]:
# send synthdefs
for msg in son.initialize():
    sc.server.send(msg)

**sc.server.bundler vs Bundler**
* sc.server.bundler: adds server.latency to the timestamp (also the absolute one)
* Bundler: does not add anything to the timestamp

In [None]:
# by default is 0
sc.server.latency = 0.2

In [None]:
t0 = time.time()

# instantiate synths asap
for msg in son.start():
    sc.server.send(msg)

# wait one control cycle to be sure that synths are instantiated
# time.sleep(1 / 44100 * 64)


# iterate over dataframe rows
for _, row in df.iterrows():
    
    msgs = son.process(row)
    
    # bundle and send
    with sc.server.bundler(t0 + row.timestamp) as bundler:
        # the bundle will play at t0 + timestamp
        
        # bundle
        for msg in msgs:
            bundler.add(msg)
    
    # sleep for the missing time
    waiting_time = t0 + row.timestamp - time.time()
    
    if waiting_time > 0:
        time.sleep(waiting_time)

        
# instantiate synths asap
for msg in son.stop():
    sc.server.send(msg)

In [None]:
sc.server.free_all()

In [None]:
with sc.server.bundler(t0 + row.timestamp) as bundler:
    sc.server.free_all()
    
bundler.messages()

#### Online

### NRT usage

In [None]:
from sc3nb import SynthDef

In [None]:
synthdef = SynthDef(
    "s2",
    r"""{ | freq=400, amp=0.3, num=4, pan=0, lg=0.1, gate=1 |
            Out.ar(0, Pan2.ar(
                Blip.ar(freq.lag(lg),  num) * EnvGen.kr(Env.asr(0.0, 1.0, 1.0), gate, doneAction: Done.freeSelf),
                    pan.lag(lg),
                    amp.lag(lg)
                )
            )
        }"""
)

In [None]:
with sc.server.bundler(send_on_exit=False) as bundler:
    synthdef.add()
    
bundler.messages()

In [None]:
with sc.server.bundler(send_on_exit=False) as bundler:
    # setup at the beginning of the score
    synthdef.add()
    
    # instantiate synths
    for msg in son.start():
        bundler.add(0.0, msg)
    
    # iterate over dataframe rows
    for _, row in df.iterrows():
        
        msgs = son.process(row)
        
        # bundle
        for msg in msgs:
            bundler.add(row.timestamp, msg)

    # TODO: this way the last line will not count? do we want to add an offset?
    # /c_set [0, 0] will close the audio file
    bundler.add(row.timestamp, "/c_set", [0, 0])

In [None]:
bundler.messages()

In [None]:
from sc3nb import Score
Score.record_nrt(bundler.messages(), "/tmp/score.osc", "score.wav", header_format="WAV")

## Bundling experiments

In [None]:
from sc3nb import Synth

In [None]:
synth_def = SynthDef('random',
"""{ |out|
    var osc, env, freq;
    freq = Rand(400, 800);
    osc = SinOsc.ar(freq, 0, 0.2);
    env = Line.kr(1, 0, 1, doneAction: Done.freeSelf);
    Out.ar(out, osc * env);
}""")

In [None]:
synth_def.add()

The context is able to capture instantiation messages (even inside functions)

In [None]:
# with sc.server.bundler(send_on_exit=False) as bundler:
#     Synth("random")
    
# bundler.messages()

def alloc_synth():
    Synth("random")
    
with sc.server.bundler(send_on_exit=False) as bundler:
    alloc_synth()
    
bundler.messages()

## Thread behaviour

The threads outputs results in the lastly executed cell.

The cell return when the main thread finishes.

In [None]:
import threading
import time

def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(10)
    print(threading.current_thread().getName(), 'Exiting')


def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(5)
    print(threading.current_thread().getName(), 'Exiting')


t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name

w.start()
w2.start()
t.start()

# # wait until threads terminate
# w.join()
# w2.join()
# t.join()

In [None]:
print('test')

## Misc

Save frame information as csv as expected by the videoviewer

In [None]:
timestamps = df.timestamp.to_numpy()
np.savetxt('phone.avi.csv', timestamps, delimiter=',')

### Video

#### ipywidgets

In [None]:
from ipywidgets import Video
video = Video.from_file('NRT_videos/phone-processed-son.mp4')
video

In [None]:
from ipywidgets import Image
image = Image()

#### PyQt

The following cell enable PyQt5 event loop integration. This is done by opening a QApplication for your notebook. This means that we have to avoid creating QApplication objects in our code (and consequenctly to start the event loop).
* https://stackoverflow.com/questions/30606462/closing-a-pyqt-widget-in-ipython-notebook-without-using-sys-exit

We can run the following magic as many times as we want, but we have to avoid creating QApplications in our code.

In [None]:
%gui qt5

from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg

In [None]:
QtCore.QCoreApplication.instance()

The following disables IPython GUI event loop integration.

In [None]:
# %gui

In [None]:
# # GUI initialization
# window = QtWidgets.QWidget()

# imggv = pg.GraphicsView()
# viewbox = pg.ViewBox()
# viewbox.setAspectLocked()
# viewbox.invertY(True)

# imggv.setCentralItem(viewbox)

# img = pg.ImageItem(np.zeros((100, 100, 3)))  # Todo: 3 -> channel variable
# viewbox.addItem(img)

# layout = QtWidgets.QGridLayout()
# layout.addWidget(imggv, 0, 0, 4, 4)
# window.setLayout(layout)

# # window always on top
# window.setWindowFlag(QtCore.Qt.WindowStaysOnTopHint)

# window.show()

In [None]:
from videoviewer import NBVideoViewer

vv = NBVideoViewer()
vv.start()

In [None]:
import skvideo
import skvideo.io

frames = skvideo.io.vread('files/processed/phone.avi')
frames.shape

In [None]:
# frame = frames[0, ::5, ::5, [0,1,2]].T
# img.setImage(frame)

In [None]:
import ipywidgets as widgets

In [None]:
au4_node_id = 1234
# instantiate synths
sc.server.msg("/s_new", ["s2", au4_node_id, 0, 0, "amp", 0])

In [None]:
sc.server.free_all()

In [None]:
max_ = 332

# out = widgets.Output()

def on_change(change):
    # with out:
    #    print(change.new)
    
    idx = change.new
    
    row = df.iloc[idx]

    # only "max" should be enough (to clip the top part to 0.3)
    amp = scn.linlin(row["AU04_r"], 0, 1, 0, 0.3, "minmax")   # TODO: exponential mapping
    # map the intensity of the AU in one octave range
    freq = scn.midicps(scn.linlin(row["AU04_r"], 0, 5, 69, 81))

    with sc.server.bundler(0.1) as bundler:
        sc.server.msg("/n_set", [au4_node_id, "amp", amp, "freq", freq], bundle=True)

    frame = frames[idx, ::5, ::5, [0,1,2]]
    vv.update(frame)
    
def on_play_stop(change):
    
    # when the video is paused
    if change.now == False:
        # silence all the synths
        with sc.server.bundler() as bundler:
            sc.server.msg("/n_set", [au4_node_id, "amp", 0], bundle=True)
    
        
slider = widgets.IntSlider(max=max_)

# can't play data with unregolar sampling rate
play = widgets.Play(
    max=max_,
    step=1,
    interval=33,
    disabled=False
)

play.observe(on_change, names='value')
play.observe(on_play_stop, names='_playing')

widgets.jslink((play, 'value'), (slider, 'value'))
widgets.HBox([play, slider])

In [None]:
def sonify_row(df_row):
    pass

In [None]:
def sonification_callback(event):
    frame_idx = event.cls.value
    
#     sonify_row(df.iloc[frame_idx])
    img.setImage(frames[frame_idx, ::5, ::5, [0,1,2]].T)