In [1]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy import interpolate
import h5py
import time
import pandas as pd


In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
from bender_functions import Bender

In [4]:
bender = Bender()

# Set up movement and activation

## Details about the fish and prep

In [5]:
fishcode = "scup09"

Basic measurements for the fish

In [6]:
segment = 2
fishlen = 261       # mm
fishmass = 324      # g

Location of the bending point along the body. Distance from the head to posterior edge of the anterior clamp

In [7]:
dbend = 165          # mm

Distance between the two clamps:

In [8]:
dclamp = 21         # mm

Vertical and horizontal distance from the transducer to the center of pressure on the fish

In [9]:
dvert = 130          # mm
dhoriz = 0          # mm

Cross sectional size of the fish

In [10]:
xsec_width = 21          # mm
xsec_height = 61.0         # mm

## Output file

Do not need to use double backslashes in the file name anymore, as long as the string starts with `r`

In [11]:
outputfile = r'F:\Data\FishBender\scup\rawdata\test2022_001.h5'
outputfile = bender.increment_file_name(outputfile)

print('Actual output file: {}'.format(outputfile))

Actual output file: F:\Data\FishBender\scup\rawdata\test2022_002.h5


# Movement parameters

Set up a set of different frequencies, curvatures, and activation parameters. Will run through them all in order.

In [75]:
frequencies = np.array([5, 5, 8, 8])
curves = np.array([3, 3, 5, 5])
cycles_per_step = np.array([3, 5, 5, 3])
is_activation = [False, True, True, False]
activation_phase = np.array([0, -0.13, -0.2, 0])

In [76]:
amplitudes = np.rad2deg(curves * (dclamp/1000))
strains = xsec_width/2/1000 * curves
strainrates = 2*np.pi * strains * frequencies

Activation duration is set on the front panel of the S88, so we can't adjust it in the middle of a trial. But if we set it to a particular duty cycle for a low frequency cycle, the same duration will represent a larger fraction of the cycle for a higher frequency cycle

In [95]:
allfreqs = np.concatenate([np.full((cps1,), val) \
    for cps1, val in zip(cycles_per_step, frequencies)])
allcurves = np.concatenate([np.full((cps1,), val) \
    for cps1, val in zip(cycles_per_step, curves)])
is_act_cycle = np.concatenate([np.full((cps1,), val) \
    for cps1, val in zip(cycles_per_step, is_activation)])
allactivation_phase = np.concatenate([np.full((cps1,), val) \
    for cps1, val in zip(cycles_per_step, activation_phase)])


In [174]:
start_activation_duty = 0.3
activation_dur = start_activation_duty / frequencies[0]

activation_duty = activation_dur * frequencies
allactivation_duty = activation_dur * allfreqs
allactivation_duty[np.logical_not(is_act_cycle)] = 0.0

In [175]:
period_by_cycle = 1 / allfreqs

In [176]:
ncycles = len(allfreqs)

allamps = np.rad2deg(allcurves * (dclamp/1000))
allstrains = xsec_width/2/1000 * allcurves
allstrainrates = 2*np.pi * allstrains * allfreqs

print("Amplitudes for {}-{} 1/m curvature are {:.1f}-{:.1f} deg".format(min(allcurves), max(allcurves), min(allamps), max(allamps)))
print("Strain are +-{:.1f}-{:.1f}%".format(min(allstrains)*100, max(allstrains)*100))
print("Strain rates are +-{:.1f}-{:.1f}%/s".format(min(allstrainrates)*100, max(allstrainrates)*100))

amp_step_vel = 2*np.pi*max(allfreqs * allamps)

scale = 6       # output teeth divided by input teeth

waitbefore = 3.0
waitafter = 3.0

Amplitudes for 3-5 1/m curvature are 3.6-6.0 deg
Strain are +-3.1-5.3%
Strain rates are +-99.0-263.9%/s


In [177]:
pd.DataFrame({"freq (Hz)": frequencies, "ncycles": cycles_per_step,
        "curve (1/m)": curves, "amp (deg)": amplitudes,
        "is_active": is_activation, "activation duty": activation_duty,
        "activation phase": activation_phase})

Unnamed: 0,freq (Hz),ncycles,curve (1/m),amp (deg),is_active,activation duty,activation phase
0,5,3,3,3.609634,False,0.3,0.0
1,5,5,3,3.609634,True,0.3,-0.13
2,8,5,5,6.016057,True,0.48,-0.2
3,8,3,5,6.016057,False,0.48,0.0


# Activation parameters

In [178]:
activation_pulse_rate = 75  # Hz

prepoststim_dur = 0.3 / 5       # duty of 0.3 at 5 Hz
prepoststim_sep = 1             # time between left and right bursts

prestim_time = -2           # time prestim left burst starts
poststim_time = 2           # time *after* end of bending

Calculate the activation duration and duty cycle, accounting for the fact that we're stimulating with pulses at a particular frequency.

In [184]:
actburstdur = np.floor(activation_dur * activation_pulse_rate * 2) / (activation_pulse_rate * 2)

actburstduty = actburstdur * allfreqs
actburstduty[np.logical_not(is_act_cycle)] = 0.0

print("Activation burst duration: {:.3} msec".format(actburstdur*1000))

Activation burst duration: 60.0 msec


Remember to set the S88 Train Duration dial to the number above!

The parameters below are set on the S88 front panel. Make sure you record the correct values!

In [180]:
S1volts = 9
S2volts = 11  
S1pulsedur = 2          # ms
S2pulsedur = 2          # ms
S1side = 'left'
S2side = 'right'

# Sampling parameters and channels

Sampling parameters

In [140]:
samplefreq = 1000.0
outputfreq = 100000.0

In [141]:
device_name = '/Dev1'

## Analog output channel

Sends the pulses to the S88 for muscle activation

In [142]:
bender.set_activation_channels('ao0', 'ao1')

## Digital output channel

Controls the motor

In [143]:
bender.set_motor_channel('port0')

## Analog input channels

Six channels from the force transducer, plus the monitor channel from the S88 stimulator.

In [144]:
SG0_chan = 'ai0'
SG1_chan = 'ai1'
SG2_chan = 'ai2'
SG3_chan = 'ai3'
SG4_chan = 'ai4'
SG5_chan = 'ai5'

In [145]:
activation_monitor_chan = 'ai6'

In [146]:
inchannels = [SG0_chan, SG1_chan, SG2_chan, SG3_chan, SG4_chan, SG5_chan,
                activation_monitor_chan]
inchannel_names = ['SG0', 'SG1', 'SG2', 'SG3', 'SG4', 'SG5',
                    'activation_monitor']

bender.set_input_channels(inchannels, inchannel_names)

Force transducer calibration file

In [147]:
bender.loadCalibration('FT25009.cal')
bender.calibration

array([[-3.81000e-03, -9.08000e-03,  5.18180e+00,  2.50000e-04,
        -8.44500e-02, -7.00000e-04],
       [ 3.92300e-02, -3.73298e+00, -5.56700e-02, -3.94300e-02,
         1.16000e-03, -4.61800e-02],
       [-1.84900e-02, -1.48300e-02,  5.30460e+00,  7.45400e-02,
         4.46500e-02,  8.90000e-04],
       [ 3.11448e+00,  1.82995e+00,  3.32100e-02,  1.94500e-02,
        -3.28900e-02, -4.39400e-02],
       [-2.23900e-02,  2.84000e-03,  5.22002e+00, -7.40300e-02,
         4.17800e-02, -1.00000e-05],
       [-3.11242e+00,  1.74678e+00,  7.10500e-02,  1.79200e-02,
         3.29200e-02, -4.39600e-02]])

## Encoder angle input

In [148]:
encoder_counts_per_rev = 10000
bender.set_encoder_channel('ctr0', counts_per_rev=encoder_counts_per_rev)

Start setting up the output

In [149]:
movedur = np.sum(period_by_cycle)
totaldur = waitbefore + (-prestim_time) + movedur + poststim_time + waitafter

t = np.arange(0, totaldur, 1.0/samplefreq)
t -= waitbefore + (-prestim_time)

Generate the angle and angular velocity signals

In [150]:
rampdur = 0.25

In [151]:
freq = np.zeros_like(t)
amp = np.zeros_like(t)
tnorm = np.zeros_like(t)

cyclestart = np.cumsum(period_by_cycle)
cyclestart = np.insert(cyclestart, 0, 0)

for c, (cycstart1, f1, a1) in enumerate(zip(cyclestart, allfreqs, allamps)):
    cycend1 = cycstart1 + 1/f1

    iscyc = (t >= cycstart1) & (t < cycend1)
    freq[iscyc] = f1
    amp[iscyc] = a1

    np.place(tnorm, iscyc, (t[iscyc] - cycstart1) * f1 + c)

Smooth the amplitude transitions

In [152]:
for c, (cycstart1, a1, a2) in enumerate(zip(cyclestart[1:], allamps[:-1], allamps[1:])):
    amp_step_dur2 = (a2 - a1) / amp_step_vel / 2

    isstep = (t >= cycstart1-amp_step_dur2) & (t < cycstart1+amp_step_dur2)
    amp_ramp = np.linspace(a1, a2, np.sum(isstep))
    np.place(amp, isstep, amp_ramp)


In [153]:
angle = amp * np.sin(2*np.pi * tnorm)

angle[t < 0] = 0
angle[t > movedur] = 0

Ramp to the starting and ending amplitudes

In [154]:
rampvel1 = allamps[0] / rampdur
tendramp1 = 0.25 / allfreqs[0]
tstartramp1 = tendramp1 - rampdur

rampvel2 = allamps[-1] / rampdur
tstartramp2 = movedur - 0.25 / allfreqs[-1]
tendramp2 = tstartramp2 + rampdur

if tstartramp1 > 0:
    # actual movement is slower than the ramp, so we won't bother adding the ramp
    pass
else:
    rampangle1 = (t[(t >= tstartramp1) & (t < tendramp1)] - tstartramp1) * rampvel1
    rampangle2 = (t[(t >= tstartramp2) & (t < tendramp2)] - tstartramp2 - rampdur) * rampvel2

    np.place(angle, (t >= tstartramp1) & (t < tendramp1), rampangle1)
    np.place(angle, (t >= tstartramp2) & (t < tendramp2), rampangle2)


Calculate the angular velocity.

In [155]:
anglevel = np.zeros_like(angle)
anglevel[1:-1] = (angle[2:] - angle[:-2]) * (samplefreq/2)

In [156]:
bender.set_bending_signal(t, angle, anglevel)

## Generate the activation signal

In [185]:
S1actcmd = np.zeros_like(t)
S2actcmd = np.zeros_like(t)
Lonoff = []
Ronoff = []

pulsedur = 0.01         # 10ms long pulse to start the S88

# pulse to start the S1 channel burst
tS1trig = t[(t >= 0) & (t < 2*pulsedur)]
S1trig = np.zeros_like(tS1trig)
S1trig[tS1trig <= pulsedur] = 5.0

# generate normal burst
actpulsephase = t[(t >= 0) & (t < np.max(actburstdur))] * activation_pulse_rate
burst = (np.mod(actpulsephase, 1) <= 0.5).astype(float)
burst *= 5.0

# generate the pre/post burst
actpulsephase = t[(t >= 0) & (t < prepoststim_dur)] * activation_pulse_rate
prepostburst = (np.mod(actpulsephase, 1) <= 0.5).astype(float)
prepostburst *= 5.0

bendphase = tnorm - 0.25

# run through each cycle and add the activation burst if needed    
for c, (duty1, ph1, f1) in \
        enumerate(zip(actburstduty, allactivation_phase, allfreqs)):
    if duty1 == 0:
        continue

    k = np.argmax(bendphase >= c + ph1)
    tstart = t[k]
    tend = tstart + actburstdur

    if any(bendphase >= c + ph1):
        Lonoff.append([tstart, tend])
    if any(bendphase >= c + ph1 + 0.5):
        Ronoff.append(np.array([tstart, tend]) + 0.5 / f1)

    np.place(S1actcmd, (t >= tstart) &
                        (t < tstart + pulsedur),
                        S1trig)
    np.place(S2actcmd, (bendphase >= c + 0.5 + ph1) &
                        (bendphase < c + 0.5 + ph1 + duty1),
                        burst)

# left side prestim burst
tstart = prestim_time
tend = tstart + prepoststim_dur

Lonoff.append([tstart, tend])

np.place(S1actcmd, (t >= tstart) & (t < tstart + pulsedur), S1trig)

# left side poststim burst
tstart = movedur + poststim_time
tend = tstart + prepoststim_dur

Lonoff.append([tstart, tend])

np.place(S1actcmd, (t >= tstart) & (t < tstart + pulsedur), S1trig)

# right side prestim burst
tstart = prestim_time + prepoststim_sep
tend = tstart + prepoststim_dur

Ronoff.append([tstart, tend])

np.place(S2actcmd, (t >= tstart) & (t < tend), prepostburst)

# right side poststim burst
tstart = movedur + poststim_time + prepoststim_sep
tend = tstart + prepoststim_dur

Ronoff.append([tstart, tend])

np.place(S2actcmd, (t >= tstart) & (t < tend), prepostburst)

Lonoff = np.array(Lonoff)
Ronoff = np.array(Ronoff)

In [186]:
bender.set_activation(S1actcmd, S2actcmd)

and plot them:

In [187]:
fig = make_subplots(rows = 3, cols = 1,
                   shared_xaxes=True)
fig.add_trace(
    go.Scatter(x = t, y = angle, mode="lines", name="angle"),
    row=1, col=1)

for onoff in Lonoff:
    fig.add_vrect(x0 = onoff[0], x1=onoff[1], fillcolor="black", opacity=0.25, line_width=0,
                      row=1, col=1)

for onoff in Ronoff:
    fig.add_vrect(x0 = onoff[0], x1=onoff[1], opacity=0.7, line_width=1,
                      row=1, col=1)

fig.update_yaxes(title_text = "angle (deg)", row=1)

fig.add_trace(
    go.Scatter(x = t, y = S1actcmd, mode="lines", name="S1"),
    row=2, col=1)
fig.add_trace(
    go.Scatter(x = t, y = S2actcmd, mode="lines", name="S2"),
    row=2, col=1)

for onoff in Lonoff:
    fig.add_vrect(x0 = onoff[0], x1=onoff[1], fillcolor="black", opacity=0.25, line_width=0,
                      row=2, col=1)

for onoff in Ronoff:
    fig.add_vrect(x0 = onoff[0], x1=onoff[1], opacity=0.7, line_width=1,
                      row=2, col=1)

fig.update_yaxes(title_text = "activation", row=2)
fig.update_xaxes(title_text = "time (s)", row=2)

fig.add_trace(
    go.Scatter(x = t, y = anglevel, mode="lines", name="anglevel"),
    row=3, col=1)

fig.update_yaxes(title_text = "angular velocity (deg/s)", row=3)
fig.update_xaxes(title_text = "time (s)", row=3)

Generate the motor step and direction pulses. 

In [188]:
tout, dig, step, direction = bender.make_motor_stepper_pulses(outputfreq,
                        scale=scale,
                        stepsperrev=1600)

Use the cell below to debug the step and direction pulses, but don't render it every time. It takes a long time to plot the traces, because the output sampling rate is high.

In [189]:
# fig = make_subplots()
# fig.add_trace(go.Scatter(x=tout, y=step, mode="lines", name="step"))
# fig.add_trace(go.Scatter(x=tout, y=direction, mode="lines", name="dir"))
# fig.add_trace(go.Scatter(x=tout, y=dig, mode="lines", name="port"))

# Do data acquisition

## Main code block

Sets up the DAQ, sends the output, records the input, and writes it to the file.

In [190]:
aidata = bender.run(device_name)


In [191]:
forcetorque = bender.applyCalibration(aidata)
forcetorque_names = ['xForce', 'yForce', 'zForce', 'xTorque', 'yTorque', 'zTorque']

In [192]:
angle_measured = bender.angle

In [193]:
with h5py.File(outputfile, 'w') as f:
    f.attrs['EndTime'] = bender.endTime.strftime('%Y-%m-%d %H:%M:%S %Z')
    f.attrs['FishCode'] = fishcode
    f.attrs['Segment'] = segment
    f.attrs['FishLength_mm'] = fishlen
    f.attrs['FishMass_g'] = fishmass
    f.attrs['FishCrossSectionWidth_mm'] = xsec_width
    f.attrs['FishCrossSectionHeight_mm'] = xsec_height

    f.attrs['BendLocation_mm'] = dbend
    f.attrs['ClampDistance_mm'] = dclamp
    f.attrs['DistanceFromTransducerVert_mm'] = dvert
    f.attrs['DistanceFromTransducerHoriz_mm'] = dhoriz
    
    gin = f.create_group('RawInput')
    gin.attrs['SampleFrequency'] = samplefreq
    gin.create_dataset('forcetransducer', data=aidata[:6,:])
    gin.create_dataset('activation_monitor', data=aidata[6,:])

    gcal = f.create_group('Calibrated')
    for ft1, name1 in zip(forcetorque, forcetorque_names):
        gcal.create_dataset(name1, data=ft1)
    gcal.create_dataset('CalibrationMatrix', data=bender.calibration)

    ds = gcal.create_dataset('Encoder', data=bender.angledata)
    ds.attrs['CountsPerRev'] = encoder_counts_per_rev

    # save the output data
    gout = f.create_group('Output')
    gout.attrs['SampleFrequency'] = outputfreq
    gout.create_dataset('DigitalOut', data=dig)
    gout.create_dataset('SyncInTrainDur', data=S1actcmd)
    gout.create_dataset('SyncInS2Del', data=S2actcmd)
    gout.attrs['S1side'] = S1side
    gout.attrs['S2side'] = S2side
    gout.attrs['S1volts'] = S1volts
    gout.attrs['S2volts'] = S2volts
    gout.attrs['S1pulsedur_ms'] = S1pulsedur    
    gout.attrs['S2pulsedur_ms'] = S2pulsedur    
    
    # save the parameters for generating the stimulus
    gout = f.create_group('NominalStimulus')
    gout.attrs['Type'] = 'Frequency Amplitude Combo'

    gout.create_dataset('t', data=t)
    ds = gout.create_dataset('Position', data=angle)
    ds.attrs['Units'] = 'deg'
    ds = gout.create_dataset('Velocity', data=anglevel)
    ds.attrs['Units'] = 'deg/sec'
    gout.create_dataset('tnorm', data=tnorm)
    gout.create_dataset('Lonoff', data=Lonoff)
    gout.create_dataset('Ronoff', data=Ronoff)

    gout.attrs['Amplitudes'] = allamps
    gout.attrs['Curvatures'] = allcurves
    gout.attrs['Frequencies'] = allfreqs
    gout.attrs['CyclesPerStep'] = cycles_per_step
    gout.attrs['IsActiveByCycle'] = is_act_cycle
    gout.attrs['MovementDuration'] = movedur

    gout.attrs['WaitPre'] = waitbefore
    gout.attrs['WaitPost'] = waitafter
    gout.attrs['PrePostStimDur'] = prepoststim_dur
    gout.attrs['ScaleFactor'] = scale

    gout.attrs['ActivationOn'] = is_activation
    gout.attrs['ActivationDuty'] = allactivation_duty
    gout.attrs['ActivationPhase'] = allactivation_phase
    gout.attrs['ActivationPulseRate'] = activation_pulse_rate
    gout.attrs['ActualActivationDuty'] = actburstduty


In [194]:
for ft1 in forcetorque:
    ft1 -= np.mean(ft1[t < 0])

# Plot results

In [195]:
fig = make_subplots(rows = 4, cols = 1,
                   shared_xaxes=True)
fig.add_trace(
    go.Scatter(x = t, y = angle_measured, mode="lines", name="angle_enc"),
    row=1, col=1)
fig.add_trace(
    go.Scatter(x = t, y = angle, mode="lines", name="angle_cmd"),
    row=1, col=1)
fig.add_trace(
    go.Scatter(x = t, y = aidata[6,:], mode="lines", name="stim"),
    row=4, col=1)
fig.add_trace(
    go.Scatter(x = t, y = forcetorque[3,:], mode="lines", name="Tx"),
    row=2, col=1)
# fig.add_trace(
#     go.Scatter(x = t, y = forcetorque[1,:], mode="lines", name="Fy"),
#     row=4, col=1)
fig.add_trace(
    go.Scatter(x = t, y = forcetorque[5,:], mode="lines", name="Tz"),
    row=3, col=1)

for onoff in Lonoff:
    fig.add_vrect(x0 = onoff[0], x1=onoff[1], fillcolor="black", opacity=0.25, line_width=0,
                        row="all", col="all")

for onoff in Ronoff:
    fig.add_vrect(x0 = onoff[0], x1=onoff[1], opacity=0.7, line_width=1,
                      row="all", col="all")

fig.update_yaxes(title_text = "angle (deg)", row=1)
fig.update_yaxes(title_text = "torque (Nm)", row=2)
fig.update_yaxes(title_text = "torque (Nm)", row=3)
fig.update_yaxes(title_text = "force (N)", row=4)
fig.update_xaxes(title_text = "time (s)", row=3)
fig.update_layout(title_text = bender.filename)


In [820]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=angle, y=forcetorque[3,:]))
fig.update_yaxes(title_text="torque (Nm)")
fig.update_xaxes(title_text="angle (deg)")