# ReactiveX in Python - asyncio
* https://github.com/ReactiveX/RxPY
* https://rxpy.readthedocs.io/en/latest/get_started.html

# *Status :-)*
* We can start and stop tasks in interaction with widgets, the `asyncio` way.
* The pushbutton callback doesn't work out here, IDK why

# Try out with widgets

In [None]:
import ipywidgets as widgets
from IPython.display import display

In [None]:
out = widgets.Output(layout={
    'border': '1px solid black',
    'height': '200px',
    'overflow_y': 'auto',
})

---

# Cooperative multitasking with `asyncio`

## Tell the notebook to start an asyncio event loop support
* https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
* https://ipython.readthedocs.io/en/stable/config/eventloops.html
* https://ipython.readthedocs.io/en/stable/config/eventloops.html#integrating-with-a-new-event-loop-in-the-kernel

In [None]:
#%autoawait asyncio
#%autoawait trio
%gui asyncio

In [None]:
#import trio
import asyncio

## Future from widget

To get a future value:

In [None]:
def wait_for_change(widget, value):
    future = asyncio.Future()
    def getvalue(change):
        # make the new value available
        try:
            future.set_result(change.new)
        except asyncio.InvalidStateError: # Task may have been cancelled
            pass
        widget.unobserve(getvalue, value)
    widget.observe(getvalue, value)
    return future

To get a button press:

In [None]:
def wait_for_press(button):
    #print(f"<wait_for_press({button})>")
    future = asyncio.Future()
    #print(f"<{future}>")
    def clicked(b):
        #print(f"<clicked({b})>")
        try:
            future.set_result(True)
        except asyncio.InvalidStateError: # Task may have been cancelled
            #pass
            raise #DEBUG
        #print(f"<now {future}>")
        b.on_click(clicked, remove=True)
        #print(f"<{b} on_click remove")
    #print(f"<gonna set on_click>")
    button.on_click(clicked)
    #print(f"<{button} on_click set to {clicked}")
    return future

In [None]:
tmp_b_w = widgets.Button(description="test")
display(out, tmp_b_w)

In [None]:
async def t():
    out.append_stdout(f"waiting\n")
    v = await wait_for_press(tmp_b_w)
    out.append_stdout(f"did press {v}\n")

asyncio.create_task(t())

In [None]:
tmp_b_w.click()

## Demonstrate `async` loop `await`ing on the future

In [None]:
slider = widgets.IntSlider()

In [None]:
%%script echo this works, moving on ...

async def f():
    for i in range(5):
        out.append_stdout(f"did work {i}\n")
        x = await wait_for_change(slider, 'value')
        out.append_stdout(f"async function continued with value {x}\n")
#asyncio.ensure_future(f())
asyncio.create_task(f())

slider

In [None]:
out

## Can we connect to ReactiveX asynchronously?

### Use its `asyncio` scheduler

In [None]:
import rx
from rx import operators as op
from rx.scheduler.eventloop import AsyncIOThreadSafeScheduler
loop = asyncio.get_event_loop()
shed = AsyncIOThreadSafeScheduler(loop)

### Can we connect up a widget as an observable?

#### A means to make a widget call back into an `rx` observable

In [None]:
def make_apush_widget(w):
    async def push_widget(observer, scheduler):
        while True:
            x = await wait_for_change(w, 'value')
            observer.on_next(x)
            if x == 100:
                observer.on_completed()
                break
    return push_widget        

In [None]:
asl = widgets.IntSlider()
display(asl)
out.clear_output()
display(out)

In [None]:
s = rx.subject.Subject()
s.subscribe(lambda i: out.append_stdout(f" at {i}"))
t = make_apush_widget(asl)

The below starts the test. Then slide the slider above around. As before, hitting 100 shuts down the pipeline.

In [None]:
out.clear_output()
task = asyncio.create_task(t(s,shed))

We terminate the test by terminating the asyncio task

In [None]:
task.cancel()

### Will this work while another `asyncio` task is running as well?
Get it running again, with a clear button too:

In [None]:
dots_b_w = widgets.Checkbox(
    value=False,
    description='dots',
    disabled=False,
    indent=False
)
clear_b_w = widgets.Button(description="clear")

display(asl)
display(dots_b_w)
out.clear_output()
display(out)

display(clear_b_w)

s = rx.subject.Subject()
s.subscribe(lambda i: out.append_stdout(f" at {i}"))
t = make_apush_widget(asl)

asl_task = asyncio.create_task(t(s,shed))

In [None]:
out.clear_output()

In [None]:
async def clear_button_worker(button, out):
    while True:
        out.append_stdout(f"<{button}>")
        await wait_for_press(button)
        out.append_stdout(f"<r>")
        out.clear_output()
        out.append_stdout(f"<d>")

In [None]:
clear_button_task = asyncio.create_task(clear_button_worker(clear_b_w, out))

In [None]:
async def ticker(period, disp, enafun):
    while True:
        await asyncio.sleep(period)    
        enafun() and disp.append_stdout(' .')

In [None]:
dot_task = asyncio.create_task(ticker(0.4, out, lambda: dots_b_w.value))

In [None]:
asl_task.cancel()

In [None]:
dot_task.cancel()

# OLD

In [None]:
assert False, "Stop here. Below is old code scraps."

In [None]:
async def sc(w):
    with out_w:
        print(f"would send shutdown from shutdown_child to {w}")

async def shutdown_child(w):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(sc, w)

def aw_sc(w):
    await sc(w)

shutdown_b_w.on_click(shutdown_child)

In [None]:
from rx import operators as op
from math import pi

t_src = rx.create(make_push_widget(t_w))

t_src.pipe(
    op.observe_on(shed),
    op.map(lambda i: i/100),
    op.map(lambda t: 2*pi * 2*t)
).subscribe(
    on_next = circulate,
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

In [None]:
aw_sc('bleet')

In [None]:
async def snooze(n):
    print("sleepy")
    for i in range(n):
        print("gonna sleep")
        await trio.sleep(1)
        print(f"{i} slept")
    print("done sleeping")

In [None]:
async def consolidated(n):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(snooze, n)
        #nursery.start_soon(child2)

        print("parent: waiting for children to finish...")
        # -- we exit the nursery block here --
    print("parent: all done!")

In [None]:
await consolidated(30)

## Make a widget observable

In [None]:
def make_push_widget(w):
    def push_widget(observer, scheduler):
        # make the callback to attach to the widget
        def on_value_change(change):
            new_value = change['new']
            #print(f"ovc new_value is {new_value}")
            observer.on_next(change['new'])
            if new_value == 100:
                observer.on_completed()
                w.unobserve(on_value_change, names='value')
        # attach callback to widget
        w.observe(on_value_change, names='value')
    # return the observation function
    return push_widget

## Place some widgets 

In [None]:
from math import pi

v_w = widgets.FloatSlider(
    value=0.0,
    min=-1.0,
    max=1.0,
    step=0.01,
    description='Sin:',
    disabled=False,
    continuous_update=False,
    orientation='vertical',
    readout=True,
    readout_format='.2f',
)

h_w = widgets.FloatSlider(
    value=0.0,
    min=-1.0,
    max=1.0,
    step=0.01,
    description='Cos:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.2f',
)

t_w = widgets.IntSlider(
    description='t',
)

shutdown_b_w = widgets.Button(description="Shutdown worker")
clear_b_w = widgets.Button(description="clear")
out_w = widgets.Output(layout={'border': '1px solid black'})
display(v_w, h_w, t_w, shutdown_b_w, clear_b_w, out_w)

## Interaction functions

In [None]:
from math import sin, cos, pi
import rx

def slider_bender(s, v):
    s.value = v

def circulate(t):
    slider_bender(v_w, sin(t))
    slider_bender(h_w, cos(t))

def shutdown_child(w):
    with out_w:
        print(f"would send shutdown from shutdown_child")

def clear_out(w):
    out_w.clear_output()

### Plug in button callbacks

In [None]:
shutdown_b_w.on_click(shutdown_child)
clear_b_w.on_click(clear_out)

## Build ReactiveX pipeline

In [None]:
from rx import operators as op
from math import pi

t_src = rx.create(make_push_widget(t_w))

t_src.pipe(
    op.map(lambda i: i/100),
    op.map(lambda t: 2*pi * 2*t)
).subscribe(
    on_next = circulate,
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

### Test
Go up to Widgets and manipulate the **t** slider. The pipeline gets stopped when you slide it all the way to 100.

## UI while working
Can these controls and responses be used while doing other work, perhaps work that is controlled by the widgets?

In [None]:
import time
while True:
    print(f"radius is {v_w.value**2 + h_w.value**2}", end='\r' )
    time.sleep(0.2)

:-(

---

# Scratch

In [None]:
assert False, "stop here if entering from above"

#### Demonstrate auto `await`ing

In [None]:
async def foo(m):
    await asyncio.sleep(0.4)
    print(f"yep {m}")

In [None]:
await foo('horse')

In [None]:
import rx
from rx import operators as op
from rx.scheduler.eventloop import AsyncIOThreadSafeScheduler
loop = asyncio.get_event_loop()
shed = AsyncIOThreadSafeScheduler(loop)

#### Verify that it works with that scheduler
This is from https://rxpy.readthedocs.io/en/latest/get_started.html#custom-operator, modified to use the async scheduler instance.

In [None]:
#%%script echo skipping
def lowercase():
    def _lowercase(source):
        def subscribe(observer, scheduler = None):
            def on_next(value):
                observer.on_next(value.lower())

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)
        return rx.create(subscribe)
    return _lowercase

rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
        op.observe_on(shed),
        lowercase()
     ).subscribe(lambda value: print("Received {0}".format(value)), scheduler=shed)

In [None]:
import time

with out_w:
    print("sleepy")
    for i in range(10):
        print("gonna sleep")
        time.sleep(1)
        print("slept")

In [None]:
import rx

In [None]:
from rx import create

def push_five_strings(observer, scheduler):
    observer.on_next("Alpha")
    observer.on_next("Beta")
    observer.on_next("Gamma")
    observer.on_next("Delta")
    observer.on_next("Epsilon")
    observer.on_completed()

source = create(push_five_strings)

source.subscribe(
    on_next = lambda i: print("Received {0}".format(i)),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

In [None]:
from rx import operators as ops

source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))

In [None]:
from rx import of, operators as op

of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
    op.map(lambda s: len(s)),
    op.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))

In [None]:
import rx

def lowercase():
    def _lowercase(source):
        def subscribe(observer, scheduler = None):
            def on_next(value):
                observer.on_next(value.lower())

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)
        return rx.create(subscribe)
    return _lowercase

tramp = rx.scheduler.TrampolineScheduler()

rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
        lowercase()
     ).subscribe(lambda value: print("Received {0}".format(value)), scheduler=tramp)

In [None]:
import trio

In [None]:
%autoawait trio
#%autoawait asyncio

In [None]:
import asyncio

In [None]:
import rx
from rx import operators as op

In [None]:
from trioscheduler import TrioScheduler
from rx.scheduler.eventloop import AsyncIOThreadSafeScheduler

In [None]:
#sch = rx.scheduler.TrampolineScheduler()
#sch = TrioScheduler()
loop = asyncio.get_event_loop()
sch = AsyncIOThreadSafeScheduler(loop)
loop, sch

In [None]:
def lowercase():
    def _lowercase(source):
        def subscribe(observer, scheduler = None):
            def on_next(value):
                observer.on_next(value.lower())

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)
        return rx.create(subscribe)
    return _lowercase

rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
        op.observe_on(sch),
        lowercase()
     ).subscribe(lambda value: print("Received {0}".format(value)), scheduler=sch)

---

In [None]:
w = widgets.IntSlider()
x = widgets.IntSlider()

In [None]:
w.value = x.value

In [None]:
from rx import create

def push_widget_x(observer, scheduler):
    def on_value_change(change):
        new_value = change['new']
        #print(f"ovc new_value is {new_value}")
        observer.on_next(change['new'])
        if new_value == 100:
            observer.on_completed()
    x.observe(on_value_change, names='value')
    #observer.on_completed()

source = create(push_widget_x)

def w_bender(v):
    w.value = v

source.subscribe(
    #on_next = lambda i: print("Received {0}".format(i)),
    on_next = lambda i: w_bender(100-i),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)


In [None]:
display(x)
display(w)
display(x)

In [None]:
import time
time.sleep(5)

In [None]:
source.subscribe(
    #on_next = lambda i: print("Received {0}".format(i)),
    on_next = lambda i: w_bender(100-i),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

In [None]:
source.subscribe(
    on_next = lambda i: print("Received {0}".format(i)),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

In [None]:
int_range = widgets.IntSlider()
output2 = widgets.Output()

display(int_range, output2)

def on_value_change(change):
    with output2:
        print(change['new'])

int_range.observe(on_value_change, names='value')

In [None]:
display(x)
display(w)
display(x)

In [None]:
from rx import create

source = create(make_push_widget(x))

source.subscribe(
    #on_next = lambda i: print("Received {0}".format(i)),
    on_next = lambda i: slider_bender(w, 100-i),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),
)

---

In [None]:
from rx.subject import Subject

In [None]:
suzie = Subject()

In [None]:
suzie.subscribe(on_next = lambda i: print("Received {0}".format(i)))

In [None]:
suzie.on_next('bird')

In [None]:
suzie.subscribe(print)