In [1]:
# Find project root
import subprocess, os
cmd = 'git rev-parse --show-toplevel'
root = subprocess.check_output(cmd, shell=True, text=True)
os.chdir(root.strip())
print('CWD:', os.getcwd())

CWD: C:\Users\AndBondStyle\Projects\lab-cartpole


In [2]:
from sessions.collector import SessionData
from misc.simple_server import session_widget
from pathlib import Path
import pandas
import struct
import json
import math

PATH = Path('data/sessions/pb_const_v5')
DATA = SessionData.load(PATH / 'session.json')

In [3]:
DATA.values

{'state.position': SessionData.Value(id='state.position', name='state.position', unit='?'),
 'state.velocity': SessionData.Value(id='state.velocity', name='state.velocity', unit='?'),
 'state.acceleration': SessionData.Value(id='state.acceleration', name='state.acceleration', unit='?'),
 'state.pole_angle': SessionData.Value(id='state.pole_angle', name='state.pole_angle', unit='?'),
 'state.pole_angular_velocity': SessionData.Value(id='state.pole_angular_velocity', name='state.pole_angular_velocity', unit='?'),
 'state.error_code': SessionData.Value(id='state.error_code', name='state.error_code', unit='?'),
 'state.accelerometer_value': SessionData.Value(id='state.accelerometer_value', name='state.accelerometer_value', unit='?'),
 'state.motor_angle': SessionData.Value(id='state.motor_angle', name='state.motor_angle', unit='?'),
 'state.motor_velocity': SessionData.Value(id='state.motor_velocity', name='state.motor_velocity', unit='?'),
 'target.acceleration': SessionData.Value(id='tar

In [4]:
serial_tx_data = pandas.read_csv(
    PATH / '2_async_serial.csv',
    header=0,
    names=['time', 'value', 'parity_err', 'framing_err'],
)
serial_tx_data['time'] *= 1_000_000
analyzer_offset = serial_tx_data.iloc[0]['time'] - 2_000_000
serial_tx_data['time'] -= analyzer_offset
print('Analyzer time offset:', analyzer_offset, 'us')
serial_tx_data

FileNotFoundError: [Errno 2] No such file or directory: 'data\\sessions\\pb_const_v5\\2_async_serial.csv'

In [None]:
i2c_data = pandas.read_csv(
    PATH / '3_i2c.csv',
    header=0,
    names=['time', 'packet_id', 'address', 'data', 'read_write', 'ack_nak'],
)
i2c_data['time'] = i2c_data['time'] * 1_000_000 - analyzer_offset
i2c_data

In [None]:
enc_address = 0x36  # I2C encoder address
msb_address = 0x0C  # Raw angle MSB register address
lsb_address = 0x0D  # Raw angle LSB register address
zero_angle = 2.9406 # Const from encoder constructor
reverse = True      # ... from firmware

def convert_raw_angle(raw_angle):
    angle = raw_angle / 4096 * 2 * math.pi - zero_angle
    if angle < 0: angle += 2 * math.pi
    if reverse: angle = 2 * math.pi - angle
    return angle

curr_mode = 'write'
curr_address = msb_address
raw_angle = 0
value_id ='analyzer.encoder_angle'
hw_encoder_angle = DATA.values[value_id] = SessionData.Value(value_id, value_id, 'rad')
for _, row in i2c_data.iterrows():
    if row.time < 0: continue
    address = int(row.address, 16)
    data = int(row.data, 16)
    mode = row.read_write.lower()
    # print(hex(address), hex(data), mode)
    if address >> 1 != enc_address:
        print('!')
        continue
    if mode != curr_mode:
        print('!')
        continue
        
#     if abs(row.time - 7.212 * 1_000_000) < 0.05 * 1_000_000:
#         print((row.time + analyzer_offset) / 1_000_000)
#         print(curr_mode, hex(curr_address), bin(raw_angle), data == curr_address)
    
    # print(curr_mode, hex(curr_address), bin(raw_angle), data == curr_address)
    if mode == 'write' and data == curr_address:
        curr_mode = 'read'
    elif mode == 'read' and curr_address == msb_address:
        raw_angle = data << 8
        curr_address = lsb_address
        curr_mode = 'write'
    elif mode == 'read' and curr_address == lsb_address:
        raw_angle |= data
        angle = convert_raw_angle(raw_angle)
        hw_encoder_angle.x.append(int(row.time))
        hw_encoder_angle.y.append(angle)
        # print('*', bin(raw_angle), raw_angle, angle)
        curr_address = msb_address
        curr_mode = 'write'
    else:
        print('!')
        
    #if len(raw_angle_list) > 10: break

print('Number of parsed values:', len(hw_encoder_angle.x))

In [None]:
def convert_and_print(hex_value):
    angle = convert_raw_angle(hex_value)
    print(f'{angle:.5f} {hex_value:04x} {hex_value:016b} {hex_value}')
    return angle

for i in range(16):
    mask = 1 << i
    print('MASK:', bin(mask))
    a = convert_and_print(0x0ff3)
    convert_and_print(0x0f02)  # <-- anomaly
    x = convert_and_print(0x0f02 ^ mask)  # <-- anomaly fix
    b = convert_and_print(0x0023)
    print(a >= x >= b)
    print()
    a = convert_and_print(0x00DE)
    convert_and_print(0x0002)  # <-- anomaly
    x = convert_and_print(0x0002 ^ mask)  # <-- anomaly fix
    b = convert_and_print(0x0111)
    print(a >= x >= b)
    print()

In [None]:
(5.32 - 5.68) / 2 / math.pi * 360

In [6]:
abs_pole_angles = []
rotations_arr = []
rotations = 0
prev = 0
max_delta = math.pi
for curr in DATA.values['state.pole_angle'].y:
    delta = curr - prev
    if delta > max_delta:
        rotations -= 1
    elif delta < -max_delta:
        rotations += 1
    abs_angle = 2 * math.pi * rotations + curr
    abs_pole_angles.append(abs_angle)
    rotations_arr.append(2 * math.pi * rotations)
    prev = curr
    
id = 'abs_pole_angle'
DATA.values[id] = SessionData.Value(id, id, 'rad', DATA.values['state.pole_angle'].x, abs_pole_angles)
id = 'pole_rotations'
DATA.values[id] = SessionData.Value(id, id, 'rad', DATA.values['state.pole_angle'].x, rotations_arr)

In [8]:
DATA.groups = [
#     SessionData.Group(name='Position', values=['state.position']),
#     SessionData.Group(name='Velocity', values=['state.velocity']),
#     SessionData.Group(name='Acceleration', values=['state.acceleration', 'target.acceleration']),
    SessionData.Group(name='Pole angle', values=['state.pole_angle']),
    SessionData.Group(name='Absolute pole angle', values=['abs_pole_angle']),
    SessionData.Group(name='Pole velocity', values=['state.pole_angular_velocity']),
]
session_widget(DATA)

IFrame URL: http://localhost:62678/?id=_


In [None]:
t1 = 6.721909
a1 = 5.221629
t2 = 6.728088
a2 = 4.908697

da = a2 - a1
print('delta angle:', da)
dt = t2 - t1
print('delta time:', dt)
if abs(da) > 1.8 * math.pi:
    print('fixing rotation')
    da -= (2 * math.pi) if da > 0 else (-2 * math.pi)
print('delta angle out:', da)
vel = da / dt
print('velocity:', vel)
if abs(vel) > 10 * 2 * math.pi:
    print('spike!')
print(5 * 2 * math.pi)