# Python USB library for the Consumer Physics SCIO

__*IMPORTANT NOTE*__: This file contains the entire library (for testing and development purposes)

### Dependencies
- pySerial

In [None]:
import json
import struct
import logging
import datetime
import os
import csv
import base64
import serial
import serial.tools.list_ports as pyserial
import numpy as np

# Logging/output format setup
log = logging.getLogger('root')
log.setLevel(logging.DEBUG)
logging.basicConfig(format='[%(asctime)s] %(levelname)8s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

In [None]:
#### Search for device
def search_scio_usb(silent=False):
    if not silent: print('Searching for devices:')
    scio_ID = '0451:16AA'
    device_list = []
    # Find the scio port. Returns an empty list if not found
    scio_port = list(pyserial.grep(scio_ID))
    if(len(scio_port) == 0):
        print('  - Device not found. Try a long press (device should respond with blue blinking)')
        return(device_list)
    device_list.append({'address': scio_port[0].device, 'id': scio_ID})

    return(device_list)

class scio_usb:
    def __init__(self, address):
        self.address = address
        self.client = None
        self.device_info = {}
        self.scan_rawdata = {}
        self.calibration_data = {}
        self.key = b''
        self.disconnected = True
        self.cmd_base_protocol =  -70 # 0xba
        self.cmd_dev_id        =    1 # 0x01
        self.cmd_scan          =    2 # 0x02
        self.cmd_temp          =    4 # 0x04
        self.cmd_bat           =    5 # 0x05
        self.cmd_led           =   11 # 0x0b
        self.cmd_file_list     = -108 # 0x94
        self.cmd_file_head     = -121 # 0x87
        self.cmd_ble_id        = -124 # 0x84
        # TODO: Test
        self.ready_for_wr      =   14 # TODO: Test. Tells the device to prepare for calibration reading?
        self.cmd_get_ble       = -101 # 0x9b # Is this the BLE status?
        self.cmd_set_name      = -111 # 0x91
        self.cmd_ble_status    = -123 # 0x85
        self.cmd_reset_dev     = -125 # 0x83
        
    async def connect(self):
        try:
            self.client = serial.Serial(self.address)
            self.disconnected = False
        except Exception as e:
            logging.error(f'Failed to connect to {self.address}: {e}')
            raise
        pass
    
    async def disconnect(self):
        if not self.disconnected:
            try:
                self.client.close()
            except Exception as e:
                logging.error(f'Failed to disconnect from {self.address}: {e}')
                raise
            finally:
                self.disconnected = True
        pass
        
    def create_command(self, cmd):
        if(cmd == self.cmd_led):
            # Setting the LED colour is special (longer command)
            byte_cmd = struct.pack('<bbbbbbbbbbbbbbb',1,self.cmd_base_protocol,cmd,9,0,0,0,0,0,0,0,0,0,0)
        else:
            byte_cmd = struct.pack('<bbbbb',1,self.cmd_base_protocol,cmd,0,0)
        return(byte_cmd)
    
    async def send_command(self, byte_cmd):
        try:
            self.client.write(byte_cmd)
        except Exception as e:
            logging.error(f'Failed to send command to {self.address}: {e}')
            raise
        pass
    
    async def read_response(self):
        try:
            # Start reading the response
            s = self.client.read(1)
        except Exception as e:
            logging.error(f'Failed to read command from {self.address}: {e}')
            raise
        response_type = struct.unpack('<b',s)[0]
        # Error check
        assert (response_type == self.cmd_base_protocol), 'Wrong response type: {}'.format(response_type)
        # Read data
        s = self.client.read(1)
        response_content = struct.unpack('<b',s)[0]
        s = self.client.read(2)
        response_length = struct.unpack('<H',s)[0]
        # Read the number of values specified by the response length
        response_data = self.client.read(response_length)
        return([response_content, response_length, response_data])
    
    async def read_temperature(self):
        byte_cmd = self.create_command(self.cmd_temp)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        # Parse response
        num_vars = response_length / 4 # divide by 4 because we are dealing with longs
        data_struct = '<' + str(int(num_vars)) + 'L' # This is usually '<3l' or '<lll'
        # Convert bytes
        response_data = struct.unpack(data_struct, response_data_raw)
        # Convert to actual temperatures
        cmosTemperature   = (response_data[0] - 375.22) / 1.4092 # From the disassembled Android app
        chipTemperature   = response_data[1] / 100
        objectTemperature = response_data[2] / 100
        temperatures = {'cmos_t': cmosTemperature, 'chip_t': chipTemperature, 'obj_t': objectTemperature}
        return(temperatures)
    
    # This is what the app does, but its meaning is unclear
    async def read_file_list(self):
        byte_cmd = self.create_command(self.cmd_file_list)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        
        data_struct = '<16I' # File list is a list of integers
        resp = list(struct.unpack(data_struct, response_data_raw))
        file_list = {}
        for file_nb in range(0, int(len(resp)/2)):
            if((resp[file_nb*2] >= 87) & (resp[file_nb*2] <= 95)):
                file_list.update({file_nb:{'file_type':resp[file_nb*2], 'file_version':resp[file_nb*2+1]}})
                key_start_byte = file_nb*8
                key_end_byte   = file_nb*8+4
                val_start_byte = file_nb*8+4
                val_end_byte   = file_nb*8+8
                attr = {'key':response_data_raw[key_start_byte:key_end_byte].hex(),
                        'value':response_data_raw[val_start_byte:val_end_byte].hex()}
                file_list.update({file_nb:attr})
            else:
                attr = {'file_type':resp[file_nb*2], 'file_version':resp[file_nb*2+1]}
                file_list.update({file_nb:attr})
        return(file_list)
    
    # This is what the app does, but its meaning is unclear
    async def read_file_header(self):
        byte_cmd = self.create_command(self.cmd_file_head)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        
        data_struct = '<4I'
        resp = list(struct.unpack(data_struct, response_data_raw))
        return(resp)
    
    async def read_battery(self):
        # Firmware-version dependent handling (unclear if this is needed)
        # Check if firmware version was already read, to figure out if firmware version is available
        #if(len(self.device_info) == 0):
        #    await self.read_device_id()
        #if(self.device_info['firmwareVersion'] >= 149):
        #    #get_percent
        #    pass
        #else:
        #    #get_volt
        #    pass
            
        # Now read the information
        byte_cmd = self.create_command(self.cmd_bat)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        
        # Parse
        data_struct = '<HBBHH8x' # Battery logic
        resp = list(struct.unpack(data_struct, response_data_raw))
        # Battery voltage
        resp[4] = resp[4]/1000
        # Charge status
        resp[3] = resp[3] & 3
        if(resp[3] == 0):
            charge_status = 'not charging'
        elif(resp[3] == 4):
            charge_status = 'full'
        elif(resp[3] != 6):
            charge_status = 'charging'
        else:
            charge_status = 'battery error'
        # Store information
        battery_status = {'bat_charge_percent': resp[0],
                          'bat_health_percent': resp[1],
                          'bat_health_status':  resp[2],
                          'bat_charge_status':  charge_status,
                          'bat_voltage':        resp[4]}
        return(battery_status)
    
    async def read_ble_id(self):
        byte_cmd = self.create_command(self.cmd_ble_id)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        # Parse response
        data_struct = '<8xH40s16s64s'
        ble_data = list(struct.unpack(data_struct, response_data_raw))
        ble_data[1] = ble_data[1].decode('utf-8').strip()       # Serial number
        ble_data[2] = ble_data[2].decode('utf-8').strip('\x00') # Device name
        ble_data[3] = ble_data[3].decode('utf-8').strip()       # Production information
        # Store information
        # Note: The remaining (first) 8 bytes are a device BLE ID in Hex
        self.device_info.update({'deviceBleId':    response_data_raw[0:8].hex(),
                                 'bleFWVersion':   ble_data[0],
                                 'serial_number':  ble_data[1],
                                 'device_name':    ble_data[2],
                                 'i2s_tag_config': ble_data[3]})
        return(self.device_info)
    
    async def read_device_id(self):
        byte_cmd = self.create_command(self.cmd_dev_id)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        # Parse response
        deviceDspId = response_data_raw[0:8].hex()
        aptinaHexId = response_data_raw[8:24].hex()
        aptinaId = ''
        # Reverse byte order of every 2 bytes
        for i in range(int(len(aptinaHexId)/4)):
            aptinaId += aptinaHexId[i*4+2] + aptinaHexId[i*4+3]
            aptinaId += aptinaHexId[i*4+0] + aptinaHexId[i*4+1]
        fw_data = list(struct.unpack('<24xH2x', response_data_raw))
        # Store information
        self.device_info.update({'deviceDspId':     deviceDspId,
                                 'aptinaId':        aptinaId,
                                 'firmwareVersion': fw_data[0]})
        # 256-bit AES key
        # No idea whether this is right, but there is an indication in the Android app for it
        self.key = bytes.fromhex(aptinaId)
        return(self.device_info)
    
    # Scan data is returned in three batches # TODO
    async def scan(self):
        byte_cmd = self.create_command(self.cmd_scan)
        await self.send_command(byte_cmd)
        # Read scan
        rawdata = []
        for i in range(3):
            [response_content, response_length, response_data_raw] = await self.read_response()
            rawdata.append([response_content, response_length, response_data_raw])
        
        # Parse response (Store as base64 strings)
        self.scan_rawdata.update({'sample':          self.encode_b64(rawdata[0][2]),
                                  'sample_dark':     self.encode_b64(rawdata[1][2]),
                                  'sample_gradient': self.encode_b64(rawdata[2][2])})
        return(self.scan_rawdata)
    
    async def calibrate(self, calibration_fn = 'calibration.json'):
        temp_before      = await self.read_temperature()
        calibration_data = await self.scan()
        temp_after       = await self.read_temperature()
        self.calibration_data.update({'t_cmos_before':   temp_before['cmos_t'],
                                      't_chip_before':   temp_before['chip_t'],
                                      't_cmos_after':    temp_after['cmos_t'],
                                      't_chip_after':    temp_after['chip_t'],
                                      'sample':          calibration_data['sample'],
                                      'sample_dark':     calibration_data['sample_dark'],
                                      'sample_gradient': calibration_data['sample_gradient']})
        self.write_data_file(calibration_fn, calibration_data, temp_before, temp_after)
        return(self.calibration_data)
    
    def parse_scan(self):
        # Convert to bytes
        sample_bytes          = base64.urlsafe_b64decode(self.scan_rawdata['sample']) 
        sample_dark_bytes     = base64.urlsafe_b64decode(self.scan_rawdata['sample_dark'])
        sample_gradient_bytes = base64.urlsafe_b64decode(self.scan_rawdata['sample_gradient'])
        
        # Same for calibration
        sample_cal_bytes          = base64.urlsafe_b64decode(self.calibration_data['sample']) 
        sample_dark_cal_bytes     = base64.urlsafe_b64decode(self.calibration_data['sample_dark'])
        sample_gradient_cal_bytes = base64.urlsafe_b64decode(self.calibration_data['sample_gradient'])
        
        # Decode
        sample_raw          = np.array(struct.unpack('>4x400I196x', sample_bytes))
        sample_dark_raw     = np.array(struct.unpack('>4x400I196x', sample_dark_bytes))
        sample_gradient_raw = np.array(struct.unpack('>4x400I52x',  sample_gradient_bytes))
        sample_cal          = np.array(struct.unpack('>4x400I196x', sample_cal_bytes))
        sample_dark_cal     = np.array(struct.unpack('>4x400I196x', sample_dark_cal_bytes))
        sample_gradient_cal = np.array(struct.unpack('>4x400I52x',  sample_gradient_cal_bytes))
        
        # Calibration
        sample          = sample_raw - sample_cal
        sample_dark     = sample_dark_raw - sample_dark_cal
        sample_gradient = sample_gradient_raw - sample_gradient_cal
        
        # Calculate reflectance
        corrected_sample   = sample - sample_dark
        corrected_gradient = sample_gradient - sample_dark
        reflectance        = corrected_sample / corrected_gradient
        
        return(reflectance)
    
    def load_calibration_file(self, calibration_fn = 'calibration.json'):
        # Load the JSON file
        with open(calibration_fn, 'r') as file:
            json_data = json.load(file)

        # Load the "scan" object
        self.calibration_data = json_data['scan']
        
        return (self.calibration_data)
    
    def write_logfile(self, file_path, data, header):
        # Check if file already exists to determine whether to write header
        write_header = not os.path.isfile(file_path)
        
        # Open the file in write mode and create a CSV writer
        with open(file_path, mode='a', newline='') as f:
            writer = csv.writer(f)
            # Write header if file is new
            if write_header:
                writer.writerow(header)
            # Write data
            writer.writerow(data)
        return
    
    # Saves a single scan as raw data to JSON
    def write_data_file(self, output_fn, scan_rawdata, temp_before, temp_after):
        timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        
        # Rename temperature reading dicts
        temp_before['t_cmos_before'] = temp_before.pop('cmos_t')
        temp_before['t_chip_before'] = temp_before.pop('chip_t')
        temp_before['t_obj_before']  = temp_before.pop('obj_t')
        temp_after['t_cmos_after']   = temp_after.pop('cmos_t')
        temp_after['t_chip_after']   = temp_after.pop('chip_t')
        temp_after['t_obj_after']    = temp_after.pop('obj_t')
        
        # Create the json string
        jsondata = {}
        jsondata['device'] = self.device_info
        jsondata['scan'] = {'timestamp': timestamp}
        jsondata['scan'].update(temp_before)
        jsondata['scan'].update(temp_after)
        jsondata['scan'].update(scan_rawdata)
        # Write data to JSON file
        with open(output_fn, 'w') as outfile:
            json.dump(jsondata, outfile, indent=4)
        return
    
    def encode_b64(self, bytestring):
        urlSafeEncodedBytes = base64.urlsafe_b64encode(bytestring)
        urlSafeEncodedStr = str(urlSafeEncodedBytes, 'utf-8')
        return(urlSafeEncodedStr)
    
    async def run_scan(self, logfile, datafile, silent=True):
        if not silent: print('  - Connecting to device')
        await self.connect()
        
        if not silent: print('  - Reading BLE & device information')
        await self.read_ble_id()
        await self.read_device_id()
        for i in self.device_info:
            if(i == 'aptinaId'):
                if not silent: print('      ' + i + ':\t\t', self.device_info[i])
            else:
                if not silent: print('      ' + i + ':\t', self.device_info[i])
                pass
            pass
        
        if not silent: print('  - Reading battery status')
        battery_status = await self.read_battery()
        if not silent: print('      Battery charge:\t {:.0f}%'.format(battery_status['bat_charge_percent']))
        if not silent: print('      Battery is ' + battery_status['bat_charge_status'])
        
        # cmosTemperature, chipTemperature, objectTemperature
        if not silent: print('  - Reading temperature (before scan):')
        temp_before = await self.read_temperature()
        if not silent: print('      CMOS T:\t\t {:.3f}°C'.format(temp_before['cmos_t']))
        if not silent: print('      Chip T:\t\t {:.3f}°C'.format(temp_before['chip_t']))
        if not silent: print('      Obj. T:\t\t {:.3f}°C'.format(temp_before['obj_t']))
        
        if not silent: print('  - Scanning')
        scan_rawdata = await self.scan()
        print(scan_rawdata)
        
        if not silent: print('  - Reading temperature (after scan):')
        temp_after = await self.read_temperature()
        if not silent: print('      CMOS T:\t\t {:.3f}°C'.format(temp_after['cmos_t']))
        if not silent: print('      Chip T:\t\t {:.3f}°C'.format(temp_after['chip_t']))
        if not silent: print('      Obj. T:\t\t {:.3f}°C'.format(temp_after['obj_t']))
        
        if not silent: print('  - Testing other commands:')
        raw_resp = await self.test_cmd(self.cmd_get_ble)
        print(raw_resp)
        #out = await self.test_parse_response(raw_resp)
        #print(out)
        
        if not silent: print('  - Disconnecting')
        await self.disconnect()
        
        # TODO: save calibration data in the data file as well
        if not silent: print('  - Writing data to ' + datafile)
        self.write_data_file(datafile, scan_rawdata, temp_before, temp_after)
        
        if not silent: print('  - Loading calibration file')
        self.load_calibration_file()
        
        if not silent: print('  - Parsing scan data')
        reflectance = self.parse_scan()
        print(reflectance)
        
        if not silent: print('  - Writing log to ' + logfile)
        timestamp_utc = datetime.datetime.utcnow()
        timestamp = datetime.datetime.strftime(timestamp_utc, '%Y-%m-%d %H:%M:%S')
        data = [str(timestamp)] +\
                list(self.device_info.values()) +\
               [battery_status['bat_charge_percent']] +\
                list(temp_before.values()) +\
                list(temp_after.values())
        header = ['timestamp'] +\
                  list(self.device_info.keys()) +\
                 ['battery_level',
                  't_cmos_before',
                  't_chip_before',
                  't_obj_before',
                  't_cmos_after',
                  't_chip_after',
                  't_obj_after']
        # TODO: Works, but I don't want output every time
        #self.write_logfile(logfile, data, header) # TODO: Works, but I don't want output every time
        
        return
    
    async def run_calibration(self, silent=True):
        if not silent: print('  - Connecting to device')
        await self.connect()
        
        if not silent: print('  - Reading BLE & device information')
        await self.read_ble_id()
        await self.read_device_id()
        for i in self.device_info:
            if(i == 'aptinaId'):
                if not silent: print('      ' + i + ':\t\t', self.device_info[i])
            else:
                if not silent: print('      ' + i + ':\t', self.device_info[i])
                pass
            pass
        
        if not silent: print('  - Reading battery status')
        battery_status = await self.read_battery()
        if not silent: print('      Battery charge:\t {:.0f}%'.format(battery_status['bat_charge_percent']))
        if not silent: print('      Battery is ' + battery_status['bat_charge_status'])
        
        if not silent: print('  - Running calibration scan')
        await self.calibrate()
        
        if not silent: print('  - Disconnecting')
        await self.disconnect()
        
        return
    
    # Generic commands, can be used for testing
    async def test_cmd(self, cmd):
        byte_cmd = self.create_command(cmd)
        await self.send_command(byte_cmd)
        [response_content, response_length, response_data_raw] = await self.read_response()
        return([response_content, response_length, response_data_raw])
    
    # Generic, for testing
    async def test_parse_response(self, response_data_raw):
        # Parse response
        print('As hex:', response_data_raw[2].hex())
        print(len(response_data_raw[2]))
        
        try:
            data_struct = '<4I' # File list
            resp = list(struct.unpack(data_struct, response_data_raw[2]))
            print(resp)
            file_list = {}
            for file_nb in range(0, int(len(resp)/2)):
                if((resp[file_nb*2] >= 87) & (resp[file_nb*2] <= 95)):
                    file_list.update({file_nb:{'file_type':resp[file_nb*2], 'file_version':resp[file_nb*2+1]}})
                    key_start_byte = file_nb*8
                    key_end_byte   = file_nb*8+4
                    val_start_byte = file_nb*8+4
                    val_end_byte   = file_nb*8+8
                    attr = {'key':response_data_raw[2][key_start_byte:key_end_byte].hex(),
                            'value':response_data_raw[2][val_start_byte:val_end_byte].hex()}
                    file_list.update({file_nb:attr})
                else:
                    attr = {'file_type':resp[file_nb*2], 'file_version':resp[file_nb*2+1]}
                    file_list.update({file_nb:attr})
                pass
            #print(file_list)
            
            #file_list.update({'file':1, 'file_type':resp[2], 'file_version':resp[3]})
            
            
        except:
            await self.disconnect()
            raise
        #dev_id = struct.unpack(data_struct, response_data_raw)
        return(resp)

In [None]:
# Search for devices
device_list = search_scio_usb()
if(len(device_list) != 0):
    # Note: No support for multiple Apogee devices. This takes the first one found!
    device_address = device_list[0]['address']
    print('  - Found device at', device_address)
    # Now set up device
    scio = scio_usb(device_address) # Create an instance for this device address
    await scio.run_calibration(silent=False)
print('Done...')

In [None]:
logfile = './logfile.csv'
datafile = './scan.json'

# Search for devices
device_list = search_scio_usb()
if(len(device_list) != 0):
    # Note: No support for multiple Apogee devices. This takes the first one found!
    device_address = device_list[0]['address']
    print('  - Found device at', device_address)
    # Now set up device
    scio = scio_usb(device_address) # Create an instance for this device address
    await scio.run_scan(logfile, datafile, silent=False)
print('Done...')

In [None]:
logfile = './logfile.csv'
datafile = './scan.json'

# Search for devices
device_list = search_scio_usb()
if(len(device_list) != 0):
    # Note: No support for multiple Apogee devices. This takes the first one found!
    device_address = device_list[0]['address']
    print('  - Found device at', device_address)
    # Now set up device
    scio = scio_usb(device_address) # Create an instance for this device address
    await scio.run(logfile, datafile, silent=False)
print('Done...')

In [None]:
device_list = await search_scio_ble(3)
print(device_list)

### Additional information

Firmware file codes:
- no_file(0),
- ble(89),
- dsp_boot(90),
- dsp_dec(91),
- dsp_op(92),
- deadPixelsIndices(100),
- centers(101),
- bins(102),
- nPixelsPerBin(103);

AptinaId is a 32-byte key, i.e. 256-bit. This could work for AES-256

In [None]:
sample =         614582552
sampleDark =     4269192318
sampleGradient = 405697630

correctedSample = sample - sampleDark
correctedGradient = sampleGradient - sampleDark
reflectance = correctedSample / correctedGradient

print(reflectance)

In [None]:
sample =         int('18c9a124', 16)
sampleDark =     int('7eb476fe', 16)
sampleGradient = int('5e742e18', 16)

print(sample)

correctedSample = sample - sampleDark
correctedGradient = sampleGradient - sampleDark
reflectance = correctedSample / correctedGradient

print(reflectance)

In [None]:
sample = list(struct.unpack('<bbH4x2I', bytearray.fromhex('ba0208070000000018c9a124fc3f2e57')))

print(sample)

test = struct.unpack('>I', bytearray.fromhex('18c9a124'))
print(test)