# Host Comm

In [None]:
import nest_asyncio
nest_asyncio.apply()

import sys, asyncio
import numpy as np
sys.path.append('/home/pi/iot49/projects/robot/pi_code')

from repl import reset_mcu, rsync_mcu
from robot_comm import RobotComm


def const(x): return x
_FSTATE_VEL1    = const(0)
_FSTATE_VEL2    = const(1)
_FSTATE_VEL1_SP = const(2)
_FSTATE_VEL2_SP = const(3)
_FSTATE_DUTY1   = const(4)
_FSTATE_DUTY2   = const(5)

    
def exception_handler(loop, context):
    print("***** asyncio:", context)
                                         
async def test_echo(rc):
    import os
    for data in [ b'abcdefg', b'\x00\x01\x02\x04\x05', os.urandom(64), os.urandom(256) ]:
        resp = await rc.cmd_echo(data)
        if data != resp:
            print(f"echo: sent {data}\n      got  {resp}")
            return
    print("test_echo SUCCESS!")
    
async def test_set_get(rc):
    v = 5.3
    await rc.cmd_set(_FSTATE_VEL1, v)
    print("get", (await rc.cmd_get(_FSTATE_VEL1))[0], "=?=", v)

update_counter = 0

def state_update_callback(i_state, f_state):
    global update_counter
    update_counter += 1
    if update_counter % 100 == 0:       
        print("cb", i_state, f_state)
        
async def main():
    fs = 1
    async with RobotComm(
            fs=fs,
            state_history=101*fs,
            state_update_callback=state_update_callback) as rc:
        asyncio.get_event_loop().set_exception_handler(exception_handler)
        cmd_task = asyncio.create_task(rc.cmd_processor())
        try:
            await rc.cmd_start_controller(fs, 'duty_control')
            for duty in np.linspace(0, 100, 2):
                await rc.cmd_set(_FSTATE_DUTY1, duty)
                await rc.cmd_set(_FSTATE_DUTY2, 100-duty)
                await asyncio.sleep(0.5)
            await rc.cmd_stop_controller()
            return (rc.i_state, rc.f_state)
        finally:
            cmd_task.cancel()

print("reset mcu & rsync")
reset_mcu()
rsync_mcu()
reset_mcu()

print("run main")
i_state, f_state = asyncio.run(main())

print("Done")

In [None]:
import sys
sys.path.append('/home/pi/iot49/projects/robot/pi_code')


import numpy as np

from array_builder import ShiftMatrix


sm = ShiftMatrix(6)
for i in range(10):
    sm.update(list(range(10*i, 10*i+4)))

print(sm.matrix)
# 3rd column ...
print(sm.matrix[:,2])

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import time
from IPython.display import clear_output

plt.plot()

def live_plot(x, y):
    clear_output(wait=True)
    plt.figure(figsize=(10, 8))
    plt.plot(x, y, 'o-')
    plt.xlim(min(x), max(x))
    plt.ylim(-1.1, 1.1)
    plt.show()
    # time.sleep(0.001)

import sys
sys.path.append('/home/pi/iot49/projects/robot/pi_code')


import numpy as np

from array_builder import ShiftMatrix
import math

N = 25
sm = ShiftMatrix(N)
for t in np.linspace(0, 50, 300):
    sm.update([math.sin(t)])
    x = range(N)
    y = sm.matrix[:,0]
    live_plot(x, y)


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import time
from IPython.display import clear_output

rows = 25
line = plt.plot(0, 0)
line = line[0]
print(line)
plt.xlim(0, rows)
plt.ylim(-1.1, 1.1)

sm = ShiftMatrix(rows)
for t in np.linspace(0, 50, 300):
    sm.update([math.sin(t)])
    y = sm.matrix[:,0]
    line.set_ydata(y)
    plt.draw()