-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
35 changed files
with
1,168 additions
and
0 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
"""Parses AMK data""" | ||
|
||
from parsers.can.utils import hex2dec, bytes2int32, twos_complement | ||
from parsers.can.models import BytesParser | ||
|
||
|
||
class AMKParser(BytesParser): | ||
@staticmethod | ||
def bytes2int32(byte_list): | ||
byte_list = [ | ||
hex2dec(x) | ||
for x in byte_list | ||
] | ||
|
||
least_sign = int(byte_list[0]) | ||
most_sign = int(byte_list[1]) | ||
factor = 2 ** 8 | ||
|
||
return factor * most_sign + least_sign | ||
|
||
def get_as_setpoint(self): | ||
byte_num = 2 | ||
num_values = 4 | ||
scaling = 0.1 # deci-percentage | ||
|
||
raw_values = self.split_by_lengths( | ||
num_values * [byte_num] | ||
) | ||
|
||
raw_control = hex2dec(raw_values[0][1]) # first byte is always 0 | ||
raw_values[0] = self.to_binary(raw_control, 8) # control is in bits | ||
|
||
raw_values[1] = self.bytes2int32(raw_values[1]) # target velocity | ||
|
||
raw_values[2] = self.bytes2int32(raw_values[2]) # add 2 extra bytes to convert to int32 | ||
raw_values[2] = twos_complement(raw_values[2], 16) * scaling | ||
|
||
raw_values[3] = bytes2int32(raw_values[3] + ['0', '0']) # add 2 extra bytes to convert to int32 | ||
raw_values[3] = twos_complement(raw_values[3], 16) * scaling | ||
|
||
return raw_values | ||
|
||
def get_as_actual_values_1(self): | ||
byte_num = 2 | ||
num_values = 4 | ||
scaling = 107.2 / 16384.0 | ||
|
||
raw_values = self.split_by_lengths( | ||
num_values * [byte_num] | ||
) | ||
|
||
raw_control = hex2dec(raw_values[0][1]) # first byte is always 0 | ||
raw_values[0] = self.to_binary(raw_control, 8) # status is in bits | ||
|
||
raw_values[1] = self.bytes2int32(raw_values[1]) # actual velocity | ||
raw_values[1] = twos_complement(raw_values[1], 16) * 0.01 # x100 rpm | ||
|
||
raw_values[2] = twos_complement(self.bytes2int32(raw_values[2]), 16) * scaling # torque current | ||
raw_values[3] = twos_complement(self.bytes2int32(raw_values[3]), 16) * scaling # magnet current | ||
|
||
calc_torque = 0.243 * raw_values[2] + (9 * 10 ** -4) * raw_values[2] * raw_values[3] | ||
raw_values.append(calc_torque) | ||
|
||
return raw_values | ||
|
||
def get_as_actual_values_2(self): | ||
byte_num = 2 | ||
num_values = 4 | ||
|
||
raw_values = self.parse( | ||
num_values * [byte_num], [10 ** -1, 10 ** -1, 1, 10 ** -1], | ||
toggle_endianness=False, two_complement=True | ||
) | ||
raw_values[2] = self.to_binary(raw_values[2], 16) # in bits | ||
return raw_values |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
"""Parses Ellipse 2N data""" | ||
|
||
from parsers.can.models import BytesParser | ||
|
||
|
||
class Ellipse2NParser(BytesParser): | ||
def get_as_status_01(self): | ||
raw_values = self.parse([4, 2, 2], scalings=None) | ||
|
||
time_stamp = self.from_micro(raw_values[0]) # micro-seconds | ||
general_status = self.to_binary(raw_values[1], 16) | ||
clock_status = self.to_binary(raw_values[2], 16) | ||
|
||
return time_stamp, general_status, clock_status | ||
|
||
def get_as_utc_0(self): | ||
return self.parse([4, 4], [10 ** -6, 10 ** -3]) | ||
|
||
def get_as_utc_1(self): | ||
return self.parse( | ||
[1, 1, 1, 1, 1, 1, 2], | ||
[1, 1, 1, 1, 1, 1, 1, 10 ** -4] | ||
) | ||
|
||
def get_as_imu_info(self): | ||
raw_values = self.parse( | ||
[4, 2, 2] | ||
) | ||
|
||
time_stamp = self.from_micro(raw_values[0]) # micro-seconds | ||
imu_status = self.to_binary(raw_values[1], 16) | ||
temperature = self.apply_scaling(raw_values[2], 10 ** -2) | ||
|
||
return time_stamp, imu_status, temperature | ||
|
||
def get_as_acc(self): | ||
scaling = 10 ** -2 | ||
|
||
byte_num = 2 | ||
num_values = 3 | ||
|
||
return self.parse( | ||
num_values * [byte_num], num_values * [scaling], two_complement=True | ||
) | ||
|
||
def get_as_gyro(self): | ||
scaling = 10 ** -3 | ||
byte_num = 2 | ||
num_values = 3 | ||
|
||
return self.parse(num_values * [byte_num], num_values * [scaling]) | ||
|
||
def get_as_delta_vel(self): | ||
return self.get_as_acc() | ||
|
||
def get_as_delta_angle(self): | ||
scaling = 10 ** -3 | ||
byte_num = 2 | ||
num_values = 3 | ||
|
||
return self.parse(num_values * [byte_num], num_values * [scaling]) | ||
|
||
def get_as_ekf_quaternion(self): | ||
scaling = 32768 ** -1 | ||
byte_num = 2 | ||
num_values = 4 | ||
|
||
return self.parse(num_values * [byte_num], num_values * [scaling]) | ||
|
||
def get_as_ekf_euler(self): | ||
scaling = 10 ** -4 | ||
byte_num = 2 | ||
num_values = 3 | ||
|
||
return self.parse(num_values * [byte_num], num_values * [scaling]) | ||
|
||
def get_as_ekf_orientation_accuracy(self): | ||
scaling = 10 ** -4 | ||
byte_num = 2 | ||
num_values = 3 | ||
|
||
return self.parse(num_values * [byte_num], num_values * [scaling]) | ||
|
||
def get_as_gps1_pos_info(self): | ||
raw_values = self.parse( | ||
[4, 4] | ||
) | ||
|
||
time_stamp = self.from_micro(raw_values[0]) # micro-seconds | ||
imu_status = self.to_binary(raw_values[1], 32) | ||
|
||
return time_stamp, imu_status | ||
|
||
def get_as_gps1_pos(self): | ||
scaling = 10 ** -7 | ||
byte_num = 4 | ||
num_values = 2 | ||
|
||
return self.parse(num_values * [byte_num], num_values * [scaling]) | ||
|
||
def get_as_gps1_alt(self): | ||
raw_values = self.parse( | ||
[4, 2, 1, 1] | ||
) | ||
|
||
altitude = self.from_milli(raw_values[0]) | ||
undulation = self.apply_scaling(raw_values[1], 0.005) | ||
vehicles = int(raw_values[2]) | ||
age = self.to_binary(raw_values[3], 8) | ||
|
||
return altitude, undulation, vehicles, age | ||
|
||
def get_as_odo_velocity(self): | ||
return self.parse([4]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
"""Parses CAN data""" | ||
|
||
from parsers.can.utils import apply_functions, twos_complement | ||
|
||
|
||
class BytesParser: | ||
""" Parses raw bytes """ | ||
|
||
NANO_SCALING = 10 ** -9 | ||
MILLI_SCALING = 10 ** -6 | ||
MICRO_SCALING = 10 ** -3 | ||
|
||
def __init__(self, byte_0=None, byte_1=None, byte_2=None, byte_3=None, | ||
byte_4=None, byte_5=None, byte_6=None, byte_7=None): | ||
self.bytes = [ | ||
byte_0, byte_1, byte_2, byte_3, byte_4, byte_5, byte_6, byte_7 | ||
] | ||
|
||
@staticmethod | ||
def apply_scaling(x, scaling): | ||
return x * scaling | ||
|
||
@staticmethod | ||
def from_nano(x): | ||
return BytesParser.apply_scaling(x, BytesParser.NANO_SCALING) | ||
|
||
@staticmethod | ||
def from_micro(x): | ||
return BytesParser.apply_scaling(x, BytesParser.MICRO_SCALING) | ||
|
||
@staticmethod | ||
def from_milli(x): | ||
return BytesParser.apply_scaling(x, BytesParser.MILLI_SCALING) | ||
|
||
@staticmethod | ||
def toggle_endianness(values): | ||
"""Toggle endianness of values | ||
:param values: list of bytes, ints ... | ||
:return: values with other endianness (little -> big, big -> little) | ||
""" | ||
|
||
return list(reversed(values)) | ||
|
||
@staticmethod | ||
def to_binary(val, expected_length=8, append_left=True): | ||
"""Converts decimal value to binary | ||
:param val: decimal | ||
:param expected_length: length of data | ||
:param append_left: append extra 0s to the left? or to the right? | ||
:return: binary value | ||
""" | ||
|
||
val = bin(val)[2:] # to binary | ||
|
||
while len(val) < expected_length: | ||
if append_left: | ||
val = "0" + val | ||
else: | ||
val = val + "0" | ||
|
||
return val | ||
|
||
@staticmethod | ||
def hexs_to_dec(hex_values): | ||
try: | ||
return int(hex_values, 16) | ||
except: # is a list | ||
if isinstance(hex_values, list): | ||
all_together = ''.join(hex_values) | ||
return BytesParser.hexs_to_dec(all_together) | ||
|
||
return [] | ||
|
||
@staticmethod | ||
def parse_bytes(raw_values, lengths, scalings): | ||
while len(raw_values) < 8: | ||
raw_values.append('0') | ||
|
||
return BytesParser(*raw_values).parse(lengths, scalings) | ||
|
||
@staticmethod | ||
def parse_string(string, lengths, scalings): | ||
raw_values = string.split() | ||
return BytesParser.parse_bytes(raw_values, lengths, scalings) | ||
|
||
def split_by_lengths(self, lenghts): | ||
start_index = 0 | ||
current_len = 0 | ||
raw_values = [] | ||
|
||
while start_index < len(self.bytes) and current_len < len(lenghts): | ||
raw_len = lenghts[current_len] | ||
end_index = start_index + raw_len | ||
raw_values.append(self.bytes[start_index: end_index]) | ||
|
||
start_index = end_index | ||
current_len += 1 | ||
|
||
return raw_values | ||
|
||
def parse(self, lengths, scalings=None, two_complement=False, | ||
toggle_endianness=True): | ||
def func(index, raw_byte): | ||
x = raw_byte | ||
|
||
if toggle_endianness: | ||
x = self.toggle_endianness(x) | ||
|
||
x = BytesParser.hexs_to_dec(x) | ||
|
||
if two_complement: | ||
x = twos_complement(x, 16) | ||
|
||
if scalings: | ||
x *= scalings[index] | ||
|
||
return x | ||
|
||
transformation_functions = len(lengths) * [func] | ||
|
||
raw_values = self.split_by_lengths(lengths) | ||
raw_values = apply_functions(raw_values, transformation_functions) | ||
|
||
return raw_values |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
"""Parses TI data""" | ||
|
||
from parsers.can.models import BytesParser | ||
from parsers.can.utils import bytes2int32 | ||
|
||
|
||
class TIParser(BytesParser): | ||
def get_as_potentiometers(self): | ||
scaling = 10 ** -7 | ||
|
||
throttle = bytes2int32(self.bytes[:4]) | ||
throttle *= scaling | ||
|
||
brake = bytes2int32(self.bytes[4:]) | ||
brake *= scaling | ||
|
||
return [ | ||
throttle, | ||
brake | ||
] | ||
|
||
def get_as_steering(self): | ||
scaling = 10 ** -7 | ||
|
||
steering = bytes2int32(self.bytes[:4]) | ||
steering *= scaling | ||
steering = steering * 72 / 43 - 15804 / 43 # empirical value | ||
|
||
ti_core_temp = bytes2int32(self.bytes[4:]) | ||
ti_core_temp *= scaling | ||
ti_core_temp -= 100.0 # empirical value | ||
|
||
return [ | ||
steering, | ||
ti_core_temp | ||
] | ||
|
||
def get_as_brakes(self): | ||
scaling = 10 ** -7 | ||
|
||
front_break = bytes2int32(self.bytes[:4]) | ||
front_break *= scaling | ||
front_break -= 111.25 | ||
|
||
rear_break = bytes2int32(self.bytes[4:]) | ||
rear_break *= scaling | ||
rear_break -= 111.25 | ||
|
||
return [front_break, rear_break] | ||
|
||
def get_as_suspensions_1(self): | ||
byte_num = 4 | ||
num_values = 2 | ||
|
||
susps = self.parse( | ||
num_values * [byte_num], [10 ** -4, 10 ** -4], | ||
toggle_endianness=True, two_complement=True | ||
) | ||
for i, susp in enumerate(susps): | ||
susps[i] = (susp - 434) / 70 | ||
|
||
return susps | ||
|
||
def get_as_suspensions_2(self): | ||
return self.get_as_suspensions_1() | ||
|
||
def get_as_current_sensor(self): | ||
raw_curr = bytes2int32(self.bytes[:4], 'f') | ||
real_curr = 100 * ((raw_curr * 5 / 5.1) - 2.5) | ||
real_curr = - real_curr # sensor was mounted the other way around | ||
return [real_curr] |
Oops, something went wrong.