# Import Packages

In [1]:
import os
import pickle
from multiprocessing import Pool, Lock
from xml.etree import ElementTree as et

import matplotlib
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
from dm_control import mujoco

# Access to enums and MuJoCo library functions.
from dm_control.mujoco.wrapper.mjbindings import mjlib
from dm_control.utils import inverse_kinematics as ik
from IPython.display import clear_output, display, HTML
from PIL import Image
from scipy.special import gamma
# from sklearn.decomposition import PCA


%env MUJOCO_GL = egl
%matplotlib widget


env: MUJOCO_GL=egl


# Define Utility Methods and Constants

In [2]:
# Rendering parameters
dpi = 100
framerate = 30  # (Hz)
width, height = 1280, 720

# IK solver parameters
_MAX_STEPS = 100
_TOL = 1e-12
_TIME_STEP = 2e-3  # Defined in XML. Default to 2e-3

# Define 3D arrays of balls connected with sliders
_M = (20, 20)
radius = 3e-4
mass = 1e-12 / np.prod(_M)
dx = (3e-2 - 2 * radius) / (_M[0] - 1)
offset = dx * (_M[0] - 1) / 2.0
geom_type = "sphere"

# Scene XML
robot_xml = "../models/panda_nohand.xml"
scene_xml = "../models/scene.xml"
# Multiprocessing lock
lock = Lock()


def display_video(frames, framerate=framerate, figsize=None):
    try:
        height, width, _ = frames[0].shape
    except ValueError as e:
        height, width = frames[0].shape
    orig_backend = matplotlib.get_backend()
    # Switch to headless 'Agg' to inhibit figure rendering.
    matplotlib.use("Agg")
    if figsize is None:
        figsize = (width / dpi, height / dpi)
    fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
    ax.set_axis_off()
    ax.set_aspect("equal")
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    matplotlib.use(orig_backend)
    def update(frame):
        im.set_data(frame)
        return [im]

    interval = 1000 / framerate
    anim = animation.FuncAnimation(
        fig=fig, func=update, frames=frames, interval=interval, blit=True, repeat=False
    )
    return anim.to_html5_video()


def J_zero(physics, site_name):
    phys = mujoco.Physics.from_model(physics.model)
    jac_pos = np.zeros((3, phys.model.nv))
    jac_rot = np.zeros((3, phys.model.nv))
    mjlib.mj_jacSite(
        phys.model.ptr,
        phys.data.ptr,
        jac_pos,
        jac_rot,
        phys.model.name2id(site_name, "site"),
    )

    return np.vstack((jac_pos, jac_rot))


def mat2quat(mat):
    quat = np.empty(4)
    mjlib.mju_mat2Quat(quat, mat)
    return quat


def quatLocal(A, C):
    conjugateA = np.empty(4)
    B = np.empty(4)
    mjlib.mju_negQuat(conjugateA, mat2quat(A))
    mjlib.mju_mulQuat(B, conjugateA, mat2quat(C))
    return B


def print_clean(text):
    clear_output(wait = True)
    print(text)


def reach_test(physics, site_name, target_name, joint_names, duration=2.0, rendered=True):
    physics = mujoco.Physics.from_xml_path(scene_xml)
    physics.reset(0)
    
    w = 2*np.pi/duration  # Rotator angular speed
    omega = w*np.array([1, 1, 1])
    ctrl = np.empty(10)
    video = []
    stream = []
    orientations = []
    control_site = physics.data.site(name=site_name)
    target_site = physics.data.site(name=target_name)
    target_quat = mat2quat(control_site.xmat)
    target_xpos = target_site.xpos.copy()
    target_xpos[2] -= 3e-4
    smooth_factor = 0.5
    aid = int(target_name[-1]) # actuator id
    
    # Simulate, saving video frames
    while physics.data.time <= duration:
        move_vec = (target_xpos - control_site.xpos)*min(smooth_factor, physics.data.time)/smooth_factor
        # Compute the inverse kinematics
        if np.linalg.norm(control_site.xpos - target_xpos) > _TOL:
            result = ik.qpos_from_site_pose(
                physics,
                site_name,
                target_pos=control_site.xpos + move_vec,
                target_quat=target_quat,
                joint_names=joint_names,
                tol=_TOL,
                max_steps=_MAX_STEPS,
                inplace=False,
            )
            ctrl[:7] = result.qpos[:7]
        ctrl[-3:] = omega*physics.data.time
        physics.set_control(ctrl)
        physics.step()
        
        # Save contact after 2s
        if len(physics.data.contact) > 0:
            pressure = physics.data.qpos[7:-3].copy().reshape(_M)
            stream.append(pressure)
            oris = physics.data.qpos[aid - 11]
            orientations.append(oris)
            
        # Save video frames
        lock.acquire()
        print_clean("PID {} Progress: {:.1f}%".format(os.getpid(), physics.data.time/duration*100))
        if rendered and len(video) < physics.data.time * framerate:
            pixels = physics.render(
                camera_id="prospective", width=width, height=height
            )
            video.append(pixels.copy())
        lock.release()
    return target_name, video, np.asarray(stream), np.asarray(orientations)


# Update The Scene XML File
* [Mujoco XML Reference](https://mujoco.readthedocs.io/en/latest/XMLreference.html?highlight=site#body-site)
* [The ElementTree XML API](https://docs.python.org/3/library/xml.etree.elementtree.html)

## Tactile Sensor

1. Sensor Pad 
* for visual only

2. Collision Balls (single or dual layer)
* _M: (x-count, y-count)
* geom_type: [sphere, capsule, ellipsoid, cylinder, box], “sphere”
* dx: the distance between each site
* offset: uniform distance to shift all sites

3. Slide Joints
* connect the balls to the pad
* joint positions serve as sensory values
* small time constant

4. Tendons to Connect Balls
* planar spring connection
* creates contact-force distribution

## Setup Key Frames
* Initial poses

In [20]:
tree = et.parse(robot_xml)
root = tree.getroot()
for body in root.findall(".//body"):
    if body.get("name") == "sensor_pad":
        sensor_pad = body

# If balls and tendons are already defined, remove them first
if sensor_pad.find("body") is not None:
    for body in sensor_pad.findall("body"):
        sensor_pad.remove(body)
tendon = root.find("tendon")
if tendon is not None:
    for spatial in tendon.findall("spatial"):
        tendon.remove(spatial)


def create_linked_ball(sid, pos, euler, springdamper=(1.2, 0.1)):
    # Create base ball
    elem = et.Element("body")
    elem.set("name", "taxel" + str(sid))
    elem.set("pos", "{:.6f} {:.6f} {:.6f}".format(pos[0], pos[1], pos[2]))
    elem.set("euler", euler)
    # Inertia
    inertia = et.Element("inertial")
    inertia.set("mass", str(mass))
    inertia.set("pos", "0 0 0")
    inertia.set("fullinertia", "1e-6 1e-6 1e-6 0 0 0")
    # Joint
    joint = et.Element("joint")
    joint.set("type", "slide")
    joint.set("axis", "0 0 1")
    joint.set("springdamper", "{} {}".format(springdamper[0], springdamper[1]))
    joint.set("frictionloss", "1e-2")
    # Geom
    geom = et.Element("geom")
    geom.set("name", "ball" + str(sid))
    geom.set("type", geom_type)
    geom.set("size", str(radius))
    geom.set("class", "collision")
    # Site
    site = et.Element("site")
    site.set("name", "tendon_site" + str(sid))
    site.set("type", "sphere")
    site.set("size", "1e-4")
    site.set("group", "1")
    # Add to dynamic tree
    elem.append(inertia)
    elem.append(joint)
    elem.append(geom)
    elem.append(site)
    return elem


def create_tendon(tid, site_names):
    spatial = et.Element("spatial")
    spatial.set("name", "tendom" + str(tid))
    spatial.set("width", "2e-4")
    spatial.set("limited", "true")
    spatial.set("range", "0 {:.6f}".format(4 * dx))
    spatial.set("frictionloss", "1e-2")
    spatial.set("stiffness", "1e-6")
    spatial.set("damping", "0.5")
    for sn in site_names:
        site = et.Element("site")
        site.set("site", sn)
        spatial.append(site)
    return spatial


# Define balls with sliders and touch sensors
sid = 0
for i in range(_M[0]):
    for j in range(_M[1]):
        sid += 1
        parent = create_linked_ball(
            sid, (dx * i - offset, dx * j - offset, 1e-3), "0 0 0", (2e-3, 0.9)
        )
        # child = create_linked_ball(sid + _M[0]*_M[1], (0, 0, 2e-3), '0 0 0', (1e-3, 0.9))
        # parent.append(child)
        sensor_pad.append(parent)

# Define tendons
tid = 0
for m in range(_M[0]):
    for n in range(_M[1]):
        tid += 1
        if n != _M[0] - 1:
            site_ids = [n + 1 + k + m * _M[0] for k in range(2)]
            spatial = create_tendon(
                tid, ["tendon_site" + str(i) for i in site_ids])
            tendon.append(spatial)
        if m != _M[0] - 1:
            tid += 1
            site_ids = [n + 1 + (m + k) * _M[0] for k in range(2)]
            spatial = create_tendon(
                tid, ["tendon_site" + str(i) for i in site_ids])
            tendon.append(spatial)

tree.write(robot_xml)  # Save robot

# Modify the keyframe home key qpos
njoints = np.prod(_M) + 10
tree = et.parse(scene_xml)
root = tree.getroot()
key = root.find(".//keyframe/key")
qpos_value = "0 0 0 -1.570796 0 1.570796 0.785398 0 0 0"
key.attrib["ctrl"] = qpos_value
if njoints > 10:
    qpos_value += " " + " ".join(["0" for i in range(njoints - 10)])
key.attrib["qpos"] = qpos_value
tree.write(scene_xml)

# Solo for XML testing
# pixels = physics.render(camera_id="prospective", width=width, height=height)
# Image.fromarray(pixels)


# Robotic Simulation for Tactile Encoding Test

* Contact the Objects and Gain Tactile Feedback on Multiple Edge Types
1. curvy
2. round
3. plateau

* Save Edge Data Streams

In [None]:
# Load scene and define simulation variables
scene_xml = "models/scene.xml"
site_name = "attachment_site"
reach_sites = ["reach_site" + str(8 + i) for i in range(3)]
joint_names = ["joint{}".format(i + 1) for i in range(7)]

def sim_process(target_name):
    return reach_test(scene_xml, site_name, target_name, joint_names, duration=7.0, rendered=False)

with Pool(processes=3) as pool:
    ret = pool.map(sim_process, reach_sites)

dataset = dict()
for ds in ret:
    dataset[ds[0]] = {'video': ds[1], 'stream': ds[2], 'orientation': ds[3]}
with open('touch3edge.pkl', 'wb') as f: 
    pickle.dump(dataset, f)

In [8]:
ret = reach_test(scene_xml, site_name, 'reach_site10', joint_names, duration=3.0, rendered=True)

PID 180300 Progress: 100.1%


In [12]:
ret[2].min()

-0.00013755758338596647

In [9]:
HTML(display_video(ret[2], 30, (9, 9)))

# Edge Orientation and Width Detection
## Generalized Gaussian Encoding for Tactile Signals
* F. Pascal, L. Bombrun, J. -Y. Tourneret and Y. Berthoumieu, "Parameter Estimation For Multivariate Generalized Gaussian Distributions," in IEEE Transactions on Signal Processing, vol. 61, no. 23, pp. 5960-5971, Dec.1, 2013, doi: 10.1109/TSP.2013.2282909.

In [None]:
# Load dataset
streams = np.load("touch_stream.npz")
[sCurvy, sRound, sPlateau, oCurvy, oRound, oPlateau] = [streams[file] for file in streams.files]
frames = np.min([len(sCurvy), len(sRound), len(sPlateau)])


def compute_vec(data):
    pca = PCA(n_components=2)
    mat = -np.squeeze(data)
    mat = (mat - np.min(mat)) / np.std(mat)
    x, y = np.where(mat > np.mean(mat))
    X = np.squeeze(np.dstack((x, y)))
    x0, y0 = np.ceil(np.mean(X, axis=0))
    pca.fit(X)
    V = pca.components_
    a = np.sqrt(pca.singular_values_)
    V[0, :] *= a[1]  # Edge width
    V[1, :] *= 1.7 * a[0]  # Edge direction
    origin = np.array([[y0, y0], [x0, x0]])
    W = np.vstack((V[0, :], -V[0, :]))
    V = np.vstack((V[1, :], -V[1, :]))
    m = 1
    beta = a[1]
    mu = np.array([x0, y0])
    M = pca.get_covariance()
    mggd = M
    return origin, V, W


# Visualize data stream
def visualize(dataset, title):
    orig_backend = matplotlib.get_backend()
    # Switch to headless 'Agg' to inhibit figure rendering.
    matplotlib.use("Agg")
    fig, ax = plt.subplots(1, 1, figsize=(5, 5), dpi=100)
    matplotlib.use(orig_backend)  # Switch back to the original backend.
    ax.set_title(title)
    im = ax.imshow(dataset[0])
    origin, V, W = compute_vec(dataset[0])

    quiverV = ax.quiver(*origin, V[:, 0], V[:, 1], color=["r"], scale=20)
    quiverW = ax.quiver(
        *origin,
        W[:, 0],
        W[:, 1],
        color=["b"],
        scale=20,
        headaxislength=0,
        headwidth=0,
        headlength=0
    )

    def update(frame):
        im.set_data(dataset[frame])
        origin, V, W = compute_vec(dataset[frame])
        quiverV.set_offsets(origin.T)
        quiverV.set_UVC(V[:, 0], V[:, 1])
        quiverW.set_offsets(origin.T)
        quiverW.set_UVC(W[:, 0], W[:, 1])
        return [im, quiverV, quiverW]

    anim = animation.FuncAnimation(
        fig=fig, func=update, frames=frames, interval=30, blit=True, repeat=False
    )
    return HTML(anim.to_html5_video())


#visualize(sCurvy, 'Curvy Edge')
#visualize(sRound, 'Round Edge')
#visualize(sPlateau, "Plateu Edge")


# Test TacNet - tactile encoding spiking network
1. A. Parvizi-Fard, M. Amiri, D. Kumar, M. M. Iskarous, and N. V. Thakor, “A functional spiking neuronal network for tactile sensing pathway to process edge orientation,” Sci Rep, vol. 11, no. 1, p. 1320, Dec. 2021, doi: 10.1038/s41598-020-80132-4.
2. J. A. Pruszynski and R. S. Johansson, “Edge-orientation processing in first-order tactile neurons,” Nat Neurosci, vol. 17, no. 10, pp. 1404–1409, Oct. 2014, doi: 10.1038/nn.3804.
3. J. M. Yau, S. S. Kim, P. H. Thakur, and S. J. Bensmaia, “Feeling form: the neural basis of haptic shape perception,” Journal of Neurophysiology, vol. 115, no. 2, pp. 631–642, Feb. 2016, doi: 10.1152/jn.00598.2015.
4. G. Sutanto, Z. Su, S. Schaal, and F. Meier, “Learning Sensor Feedback Models from Demonstrations via Phase-Modulated Neural Networks,” in 2018 IEEE International Conference on Robotics and Automation (ICRA), Brisbane, QLD, May 2018, pp. 1142–1149. doi: 10.1109/ICRA.2018.8460986.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import shutil

from brian2 import *
from brian2tools import brian_plot, plot_state

from TacNet import TacNet


set_device('cpp_standalone')

if 'initiated' in locals():
    device.reinit()
    device.activate()
else:
    initiated = True

# Load
streams = np.load("touch_stream.npz")
[sCurvy, sRound, sPlateau, oCurvy, oRound, oPlateau] = [streams[file] for file in streams.files]
frames = np.min([len(sCurvy), len(sRound), len(sPlateau)])
data = sPlateau

dt = 1*ms
inputs = data.reshape(data.shape[0], -1)
inputs -= inputs.min()
I = TimedArray(inputs/inputs.max()*200*pA, dt=dt)
duration = I.values.shape[0]*dt

num_neurons = [400, 10, 36]
model = TacNet(num_neurons)
mons = model.run(I, duration)


In [None]:
display_video(sRound, 30, (10, 10))

In [None]:
fig, axs = plt.subplots(7, 1, figsize=(20, 21))
for i in range(3):
    axs[i].set_title('Spike L'+str(i+1))
    brian_plot(mons['SpikeMonitor_L'+str(i+1)], axes=axs[i])

axs[3].set_title('L3 Membrane Potential')
plot_state(mons['StateMonitor_L3'].t, mons['StateMonitor_L3'].v.T, axes=axs[3])
axs[4].set_title('Syn23 X')
plot_state(mons['StateMonitor_Syn23'].t,
           mons['StateMonitor_Syn23'].X.T, axes=axs[4])
axs[5].set_title('Syn12 Connectivity')
brian_plot(model.synapses['Syn12'], axes=axs[5])
axs[6].set_title('Syn23 Final X')
axs[6].set_xlabel('target neuron index')
axs[6].set_ylabel('source neuron index')
axs[6].imshow(model.synapses['Syn23'].X_[:].reshape(
    (num_neurons[1], num_neurons[2])))
plt.tight_layout()
plt.show()
