In [1]:
import time, random, subprocess, os, re
import numpy as np
import numbers
import simpleaudio as sa
import pyaudio
from IPython.core import magic_arguments
from IPython.core.magic import line_magic, cell_magic, line_cell_magic
from IPython.core.magic import Magics, magics_class
from pythonosc import osc_message_builder, udp_client, osc_bundle_builder
if os.name != 'nt':  # sys.platform == 'win32':
    import fcntl, liblo    

In [None]:
def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

def remove_comments(string):
    pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)"
    # fn code by Onur Yıldırım
    # first group captures quoted strings (double or single)
    # second group captures comments (//single-line or /* multi-line */)
    # alternative cares for escaped quotes
    # pattern = r"(\".*?(?<!\\)\"|\'.*?(?<!\\)\')|(/\*.*?\*/|//[^\r\n]*$)"
    regex = re.compile(pattern, re.MULTILINE|re.DOTALL)
    def _replacer(match):
        # if the 2nd group (capturing comments) is not None,
        # it means we have captured a non-quoted (real) comment string.
        if match.group(2) is not None:
            return "" # so we will return empty to remove the comment
        else: # otherwise, we will return the 1st group
            return match.group(1) # captured quoted-string
    return regex.sub(_replacer, string)

In [None]:
class SC:
    """SC is a class to start SuperCollider language as subprocess
    and control it via a pipe. So far all output goes to the stdout
    (c) 2016-18 thermann@techfak.uni-bielefeld.de"""
    sc = None
    
    def __init__(self, sclangpath="/Applications/SuperCollider/SuperCollider.app/Contents/MacOS/sclang",
    ip="127.0.0.1", port=57110, liblo_flag=True):
        self.scp = subprocess.Popen([sclangpath], shell=False,
                                    stdin=subprocess.PIPE,stdout=subprocess.PIPE)
        self.server_ip = ip
        self.server_port = port
        self.liblo_flag = liblo_flag
        if os.name == 'nt': self.liblo_flag = False
        if self.liblo_flag:
            self.target = liblo.Address(self.server_ip, self.server_port)
        else:
            self.target = udp_client.SimpleUDPClient(self.server_ip, self.server_port)
        SC.sc = self
        self.rec_node_id = -1  # i.e. not valid
        self.liblo_time_diff = 2208988800.0   # liblo.time()-time.time()
        
    def cmd(self, cmdstr):
        cmdstr = remove_comments(cmdstr).replace('\n', '').replace('\t', '') + '\n'
        self.scp.stdin.write(bytearray(cmdstr,'utf-8'))
        self.scp.stdin.flush()

    def scpout_read(self):
        retbytes = None
        state = 0
        while state<2:
            rr = non_block_read(self.scp.stdout)
            if state == 0 and rr is not None:
                state = 1
            if state == 1 and rr is None:
                state = 2
            if state == 1:
                retbytes = rr
        return retbytes

    def scpout_empty(self):
        non_block_read(self.scp.stdout)
    
    def cmdv(self, cmdstr, verbose=True, discard_output=True):
        if os.name=='nt':
            self.cmd(cmdstr);
            return
        # here the code for Linux / OSX (where fcntl is available)
        if discard_output: self.scpout_empty()  # clean all past outputs
        cmdstr = remove_comments(cmdstr).replace('\n', '').replace('\t', '') + '\n'
        self.scp.stdin.write(bytearray(cmdstr,'utf-8'))
        self.scp.stdin.flush()
        time.sleep(0.1)   # this is not ideal, better solution welcome
        retbytes = self.scpout_read()  # get output after current command
        str = retbytes.decode('ascii').strip().strip('sc3>')
        ic = str.find('\n')
        str = str[ic+1:]
        r = r'%s' % (str,)
        if verbose:
            print(r)
        return str.split('\n')[1:-1]

    def boot(self):
        self.cmd("s.boot")

    def freeAll(self):
        self.cmd("s.freeAll")

    def exit(self):
        self.scp.stdin.close()
        
    def __del__(self):
        print("delete SC instance")
        self.exit()
        
    def boot_with_blip(self):
        # make sure SC is booted and knows this synths:
        self.cmd(r"""
            Server.default = s = Server.local;
            s.boot.doWhenBooted(
            { Routine({
            /* synth definitions *********************************/
            "load synth definitions".postln;
            SynthDef("s1", { |freq=400,dur=0.4,att=0.01,amp=0.3,num=4,pan=0|
                Out.ar(0, Pan2.ar(Blip.ar(freq,  num)*
                    EnvGen.kr(Env.perc(att, dur, 1, -2), doneAction: 2),
                    pan, amp))
            }).add();
            SynthDef("s2", { | freq=400, amp=0.3, num=4, pan=0, lg=0.1 |
                Out.ar(0, Pan2.ar(Blip.ar(freq.lag(lg),  num),
                                  pan.lag(lg), amp.lag(lg)))
            }).add();
            SynthDef("record", { | bufnum |
                DiskOut.ar(bufnum, In.ar(0, 2));
            }).add();
            s.sync;
            /* test signals ****************************************/
            "create test signals".postln;
            Synth.new(\s1, [\freq, 500, \dur, 0.2, \num, 1]);
            0.2.wait;
            x = Synth.new(\s2, [\freq, 1000, \amp, 0.1, \num, 2]);
            0.1.wait; x.free;
            }).play} , 1000);
        """)
        
    def msg(self, msgAdr, msgArgs):
        """
        Sends an OSC message to the SuperCollider server.

        Parameters
        ----------
        msgAdr : str
            The SuperCollider server command. E.g. '/s_new'.
        msgArgs : list
            The messages to add to the bundle.
        """
        if self.liblo_flag:
            message = liblo.Message(msgAdr)
            for arg in msgArgs:
                message.add(arg)
            liblo.send(self.target, message)
        else:
            self.target.send_message(msgAdr, msgArgs)

    def bundle(self, timetag, msgAdr, msgArgs):
        """
        Sends an OSC bundle to the SuperCollider server.

        Parameters
        ----------
        timetag : float
            An absolute timetag: use time.time() for now).
        msgAdr : str
            The SuperCollider server command. E.g. '/s_new'.
        msgArgs : list
            The messages to add to the bundle.
        """
        if timetag<1e6: timetag = time.time() + timetag
        if self.liblo_flag:
            message = liblo.Message(msgAdr)
            for arg in msgArgs:
                message.add(arg)
            bundle = liblo.Bundle(timetag+self.liblo_time_diff, message)
            liblo.send(self.target, bundle)
        else:
            bundle = osc_bundle_builder.OscBundleBuilder(timetag)
            msg = osc_message_builder.OscMessageBuilder(address=msgAdr)
            for el in msgArgs:
                msg.add_arg(el)
            bundle.add_content(msg.build())
            bundle = bundle.build()
            self.target.send(bundle)
        
    def prepare_for_record(self, onset=0, wavpath="record.wav", bufnum=99, nr_channels=2, rec_header="wav", rec_format="int16"):
        self.rec_bufnum = bufnum
        self.rec_nr_channels = nr_channels
        self.bundle(onset, "/b_alloc", [self.rec_bufnum, 65536, self.rec_nr_channels] )
        self.bundle(onset, "/b_write", [self.rec_bufnum, wavpath, rec_header, rec_format, 0, 0, 1 ])

    def record(self, onset=0, node_id=2001):
        self.rec_node_id = node_id
        self.bundle(onset, "/s_new", ["record", self.rec_node_id, 1, 0, "bufnum", self.rec_bufnum]) 
        # action = 1 = addtotail 

    def stop_recording(self, onset=0):
        self.bundle(onset, "/n_free", [self.rec_node_id])
        self.bundle(onset, "/b_close", [self.rec_bufnum])
        self.bundle(onset, "/b_free", [self.rec_bufnum])
        
    def midi_ctrl_synth(self, synthname='\syn'):
        code = r"""
            MIDIIn.connectAll;
            n.free;
            n = MIDIFunc.noteOn({ | level, pitch |
                var amp = ((level-128)/8).dbamp;
                Synth.new(%s, [\freq, pitch.midicps, \amp, amp]);
            [pitch, amp].postln
            });""" % (synthname,)
        self.cmd(code)
        
    def midi_ctrl_free(self):
        self.cmd("n.free")
        
    def midi_gate_synth(self, synthname='\syn'):
        code = r"""
            MIDIIn.connectAll;
            q = q ? ();
            // q.on.free;
            // q.off.free;
            q.notes = Array.newClear(128);   // array has one slot per possible MIDI note
            q.on = MIDIFunc.noteOn({ |veloc, num, chan, src|
                q.notes[num] = Synth.new(%s, [\freq, num.midicps, \amp, veloc * 0.00315]);
            });
            q.off = MIDIFunc.noteOff({ |veloc, num, chan, src|
                q.notes[num].release;
            });
            q.freeMIDI = { q.on.free; q.off.free; };
            """ % (synthname,)
        self.cmd(code)
        
    def midi_gate_free(self):
        self.cmd("q.on.free; q.off.free")

In [None]:
# service functions (linlin etc.)
def linlin(x, smi, sma, dmi, dma):
    return (x-smi)/(sma-smi)*(dma-dmi) + dmi

def midicps(m):
    return 440.0*2**((m-69)/12.0)

def cpsmidi(c):
    return 69+12*np.log2(c/440.0)

def clip(v, min=-float("inf"), max=float("inf")):
    if(v<min): return min
    if(v>max): return max
    return v

def dbamp(db):
    return 10**(db/20.0)

def ampdb(amp):
    return 20*np.log10(amp)

In [None]:
# service functions to play and record data arrays from within python

def play(sig, num_channels=1, sr=44100, norm=True, block=False):
    factor = 1
    if isinstance(norm, bool) and norm==True:
        # print("norm bool")
        factor = 1 / np.max(np.abs(sig))
    elif isinstance(norm, numbers.Number):
        # print("norm is number")
        factor = norm
    asig = (32767 * factor * sig).astype(np.int16)
    play_obj = sa.play_buffer(asig, num_channels, 2, sr)
    if block:
        play_obj.wait_done()      # wait for playback to finish before returning
    return play_obj

## test code
# play(np.sin(np.linspace(0, 2*np.pi*44, 2205)), sr=22050)

def record(dur=2, channels=1, rate=44100, chunk=256):
    p = pyaudio.PyAudio()
    stream = p.open(format=pyaudio.paInt16, channels=channels, rate=rate, input=True,
        output=True, frames_per_buffer=chunk)
    buflist = []
    for i in range(0, int(rate/chunk*dur)):
        data = stream.read(chunk)
        buflist.append(data)
    stream.stop_stream()
    stream.close()
    p.terminate()
    return np.frombuffer(b''.join(buflist), dtype=np.int16)

## test code for record:
# sig = record(3)
# plot(sig)
# play(sig/32768.0, norm=None)
# freqs, times, Sxx = signal.spectrogram(sig, 44100, nfft=1024)
# pcolormesh(times, freqs, log(Sxx+0.00001)); colorbar(); ylabel('Freq. [Hz]'); xlabel('Time [sec]');
# axis([0.2,2.6,0,5000])

In [None]:
# boot sc and register blip and play sound
def startup(sclangpath=None, wait_time=3, boot=True, magic=True, verbose=True,
            discard_output=False, liblo_flag=True):
    """boot sc3 and register SC3Magics"""
    if sclangpath is None:
        sc = SC(liblo_flag=liblo_flag) # use default path
    else:
        sc = SC(sclangpath, liblo_flag=liblo_flag) 
        # SC argument = path to SuperCollider3 sclang executable
    time.sleep(wait_time) # to be sure that sc has started...
    sc.cmdv(r""""sc3nb started".postln;""", verbose, discard_output=discard_output)
    if boot: 
        sc.boot_with_blip()
    if magic:
        ip = get_ipython()
        ip.register_magics(SC3Magics)
    return sc

In [None]:
# SC3 magics
@magics_class
class SC3Magics(Magics):
    @cell_magic
    @line_magic
    def scv(self, line='', cell=None):
        if line: 
            SC.sc.cmdv(line)  # reference to sc alone doesn't work, hence class var
        if cell:
            SC.sc.cmdv(cell)
    @cell_magic
    @line_magic
    def sc(self, line='', cell=None):
        if line: 
            SC.sc.cmd(line)
        if cell:
            SC.sc.cmd(cell)

In [None]:
%%javascript
Jupyter.keyboard_manager.command_shortcuts.add_shortcut('cmd-.', {
    help : 'sc.cmd("s.freeAll")',
    help_index : 'zz',
    handler : function (event) {
        IPython.notebook.kernel.execute("sc.freeAll()")
        return true;
    }}
);