In [276]:
import serial, struct, time, datetime, os
import numpy as np

In [277]:
ser = serial.Serial("/dev/ttyUSB2", baudrate=9600, stopbits=1, parity="N")

In [278]:
class UnexpectedDigitException(Exception):
    pass

In [279]:
def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError as exc:  # Python >2.5
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise


def read_one_measurement(n=16):
    ser.flushInput()
    fmt = "b"*n
    buf_size = struct.calcsize(fmt)
    line = ser.read(buf_size)
    unpacked = struct.unpack(fmt, line)
    return [d & 0b1111 for d in unpacked]

def find_measurement_end():
    for i in range(100):
        m = read_one_measurement(n=1)
        if (m[0]==13):
            return
    raise UnexpectedDigitException()


def process_torque_measurement(raw_measurement):

    unpacked = raw_measurement
    
#     print (("{:2d} "*16).format(*unpacked))
#     print (("{: >8d} "*16).format(*unpacked))
#     print (("{: >8b} "*16).format(*unpacked))
#     print (("{: >8x} "*16).format(*unpacked))

    if unpacked[1] != 4: raise UnexpectedDigitException()
    if unpacked[2] != 1: raise UnexpectedDigitException()
    if unpacked[3] != 8: raise UnexpectedDigitException("check torque sensor is connected")
    units = unpacked[4]
    if not 1 <= units <= 3: raise UnexpectedDigitException("check torque sensor is connected")
    units_dict = {
        1 : "Kg·cm",
        2 : "LB·inch",
        3 : "N·cm"}
    units = units_dict[units]
    polarity = unpacked[5]
    if not polarity in [0, 1]: raise UnexpectedDigitException()
    decimal_point_position = unpacked[6]
    if not decimal_point_position in range(4): raise UnexpectedDigitException()
#     print (decimal_point_position)
    digits = unpacked[7:15]
    digits_s = ("{}"*len(digits)).format(*digits)
    measurement = float(digits_s)
    measurement *= (-1)**polarity
    measurement /= 10**decimal_point_position
    if unpacked[15] != 13: raise UnexpectedDigitException("message got corrupted")
    return measurement, units
    

def process_force_measurement(raw_measurement):

    unpacked = raw_measurement
    
#     print (("{:2d} "*16).format(*unpacked))
#     print (("{: >8d} "*16).format(*unpacked))
#     print (("{: >8b} "*16).format(*unpacked))
#     print (("{: >8x} "*16).format(*unpacked))

    if unpacked[1] != 4: raise UnexpectedDigitException()
    if unpacked[2] != 1: raise UnexpectedDigitException()
    if unpacked[3] != 5: raise UnexpectedDigitException("check force sensor is connected")
    units = unpacked[4]
    if not 5 <= units <= 9: raise UnexpectedDigitException("check force sensor is connected")
    units_dict = {
        5 : "Kg",
        6 : "LB",
        7 : "g",
        8 : "oz",
        9 : "N"
    }
    units = units_dict[units]
    polarity = unpacked[5]
    if not polarity in [0, 1]: raise UnexpectedDigitException()
    decimal_point_position = unpacked[6]
    if not decimal_point_position in range(4): raise UnexpectedDigitException()
#     print (decimal_point_position)
    digits = unpacked[7:15]
    digits_s = ("{}"*len(digits)).format(*digits)
    measurement = float(digits_s)
    measurement *= (-1)**polarity
    measurement /= 10**decimal_point_position    
    if unpacked[15] != 13: raise UnexpectedDigitException("message got corrupted")
    return measurement, units 
    


    

In [280]:
def aquire_data(name):
    try:
        force_measurements = []
        torque_measurements = []
        t = time.time()
        while True:
            raw_measurement = read_one_measurement()
            t_now = time.time() - t
            try:
                force_measurements.append((*process_force_measurement(raw_measurement), t_now))
            except UnexpectedDigitException:
                try:
                    torque_measurements.append((*process_torque_measurement(raw_measurement), t_now))
                except UnexpectedDigitException:
                    print ("One measurement failed")
                    find_measurement_end()
    except KeyboardInterrupt:
    #     print (force_measurements, torque_measurements)
        mkdir_p("data_"+ name)
        force_measurements = np.array(force_measurements)
        torque_measurements = np.array(torque_measurements)
        now = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
        np.savetxt("data_" + name + "/" + now + "_force", force_measurements, fmt="%s")
        np.savetxt("data_" + name + "/" + now + "_torque", torque_measurements, fmt="%s")
        print ("Stopped, saved")

In [281]:
aquire_data("Dima")

Stopped, saved


UnexpectedDigitException: check force sensor is connected