# Python library for the Apogee μCache AT-100 datalogger

Python script to connect to the [Apogee μCache AT-100 datalogger](https://www.apogeeinstruments.com/microcache-bluetooth-micro-logger/), makes precision environmental measurements using Apogee’s analog sensors. This script is meant for periodical data collection, while the μCache does internal datalogging. It also verifies the battery charge and alerts the user if the charge gets too low.

### Dependencies
- Bleak (v. 0.16, available in conda-forge. Note: Current version is 0.19)

In [None]:
import asyncio
import logging
from bleak import BleakClient, BleakScanner
import datetime

In [None]:
async def scan_apogee_devices(duration=5):
    devices = {}
    async with BleakScanner() as scanner:
        await asyncio.sleep(duration)
        devices = scanner.discovered_devices
        
        apogee_devices = []
        for device in devices:
            metadata = device.metadata['manufacturer_data']
            if metadata:
                manufacturer_data = [(k,v) for k,v in device.metadata['manufacturer_data'].items()][0]
                company_id      = manufacturer_data[0]
                advertised_data = manufacturer_data[1]
            else:
                manufacturer_sig = None
                advertised_data = None
            
            if(company_id == 0x0644):
                advertised_data = advertised_data.decode('utf-8')
                apogee_devices.append({'address': device.address, 'name': device.name, 'rssi': device.rssi,
                                    'company_id': company_id, 'advertised_data': advertised_data})
    return(apogee_devices)

class apogee_device:
    def __init__(self, address):
        self.address = address
        self.client = BleakClient(address, timeout=5.0)
        self.disconnected = True
        self.device_info = {}
        self.base_apogee_service_uuid ="b3e0xxxx-2594-42a1-a5fe-4e660ff2868f"
        self.uuid_time = "000a"

    async def connect(self):
        try:
            await self.client.connect()
            self.disconnected = False
        except Exception as e:
            logging.error(f"Failed to connect to {self.address}: {e}")
            raise
        pass
            
    async def read_info(self):
        dis_service = "0000180A-0000-1000-8000-00805f9b34fb"
        dis_characteristics = {
            "manufacturer_name": "00002A29-0000-1000-8000-00805f9b34fb",
            "model_number": "00002A24-0000-1000-8000-00805f9b34fb",
            "serial_number": "00002A25-0000-1000-8000-00805f9b34fb",
            "firmware_revision": "00002A26-0000-1000-8000-00805f9b34fb",
            "hardware_revision": "00002A27-0000-1000-8000-00805f9b34fb",
        }
        if self.disconnected:
            raise Exception("Not connected to device")
        try:
            for characteristic in dis_characteristics:
                uuid = dis_characteristics[characteristic]
                value = await self.client.read_gatt_char(uuid)
                value_str = value.decode("utf-8").strip() if value else ""
                self.device_info.update({characteristic: value_str})
            return self.device_info # All the hardware information remains stored in the class
        except Exception as e:
            logging.error(f"Failed to read information from {self.address}: {e}")
            raise
        pass
    
    async def read_battery_level(self):
        battery_service = "0000180F-0000-1000-8000-00805f9b34fb"
        battery_level = "00002A19-0000-1000-8000-00805f9b34fb"
        try:
            battery_value = await self.client.read_gatt_char(battery_level)
            return int.from_bytes(battery_value, byteorder='little')
        except Exception as e:
            logging.error(f"Failed to read battery level from {self.address}: {e}")
            raise
        pass
    
    async def read_time(self):
        service_uuid = self.uuid_time
        time_service = self.base_apogee_service_uuid.replace('xxxx', service_uuid)
        
        try:
            raw_unix_time_value = await self.client.read_gatt_char(time_service)
            current_unix_time = int.from_bytes(raw_unix_time_value, byteorder='little')
            return current_unix_time
        except Exception as e:
            logging.error(f"Failed to read time from {self.address}: {e}")
            raise
        pass
    
    async def check_and_update_time(self, tolerance=2):
        service_uuid = self.uuid_time
        time_service = self.base_apogee_service_uuid.replace('xxxx', service_uuid)
        
        # Check time difference. Assume that the computer time is accurate. This may be error-prone!
        dt_computer = datetime.datetime.utcnow()
        dt_computer_unix = int(dt_computer.timestamp())
        dt_device = self.read_time()
        dt_difference = abs(dt_computer_unix - dt_device)
        
        # Only set the time if there is more than 2s difference,
        # but less than 1 year (to avoid problems, in case the computer time is completely off)
        if((dt_difference > tolerance) & (dt_difference < 31536000)):
            # Convert battery level to bytes
            dt_computer_unix_bytes = bytes([dt_computer_unix])
            # Set the time
            try:
                await self.client.write_gatt_char(time_service, dt_computer_unix_bytes)
            except Exception as e:
                logging.error(f"Failed to set time at {self.address}: {e}")
                raise
            pass
        return dt_difference

    async def disconnect(self):
        if not self.disconnected:
            try:
                await self.client.disconnect()
            except Exception as e:
                logging.error(f"Failed to disconnect from {self.address}: {e}")
                raise
            finally:
                self.disconnected = True
        pass

In [None]:
async def run():
    print('Searching for devices:')
    device_list = await scan_apogee_devices(2)
    if(len(device_list) == 0):
        print('  - Device not found, make sure it is broadcasting through a long press')
        print('Done...')
        return
    # Note: No support for multiple Apogee devices. This takes the first one found!
    device_address = device_list[0]['address']
    print('  - Found device at address', device_address)
    
    print('  - Connecting to device')
    ucache = apogee_device(device_address) # Create an instance and connect
    await ucache.connect()
    
    print('  - Reading device information:')
    device_info = await ucache.read_info()
    print('      Type:  ', device_info['manufacturer_name'], device_info['model_number'])
    print('      Serial:', device_info['serial_number'])
    print('      Firmware rev.:', device_info['firmware_revision'])
    print('      Hardware rev.:', device_info['hardware_revision'])
    
    print('  - Reading battery level:')
    battery_level = await ucache.read_battery_level()
    print('      Level: ' + str(battery_level) + '%')
    
    print('  - Reading time:')
    time = await ucache.read_time()
    print('      Time: ' + str(time))
    
    print('  - Disconnecting')
    await ucache.disconnect()
    print('Done...')
    pass

await run()

In [None]:
device_list = await scan_apogee_devices(3)
print(device_list)

In [None]:
# Convert Unix timestamp to datetime object
timestamp = 1676899394  # Example timestamp
dt_device = datetime.datetime.fromtimestamp(timestamp)
dt_computer = datetime.datetime.utcnow()

# Print datetime object in a readable format
print(dt_device.strftime("%Y-%m-%d %H:%M:%S"))
print(dt_computer.strftime("%Y-%m-%d %H:%M:%S"))

print(abs(dt_device - dt_computer))

print(timestamp)
ts_computer = datetime.datetime.timestamp(dt_computer)
print(ts_computer)
print(datetime.datetime.fromtimestamp(ts_computer))

# Convert datetime object to Unix epoch time
unix_timestamp = int(dt_computer.timestamp())
print(unix_timestamp)

In [None]:
# To run as a standalone program

async def main():
    address = "00:11:22:33:44:55"
    device = apogee_device(address)
    try:
        await device.connect()
        info = await device.read_info()
        print(f"Device info: {info}")
    finally:
        await device.disconnect()

asyncio.run(main())

## OLD

In [None]:
# Further tools

'''
The Live Data Control Characteristic controls averaging time of the Live Data Characteristic. Valid values are 0-127 in units of 0.25 seconds. A value of 0 is interpreted as no averaging with data calculated from a single ADC sample.
'''
def set_averaging_time_apogee(sock, value_sec):
    # Convert the value from seconds to units of 0.25 seconds
    time_units = int(value_sec / 0.25)
    time_units = min(max(time_units, 0), 127)  # Clamp the value to the valid range

    # Write the value to the characteristic
    base_uuid = ble.UUID("b3e00000-2594-42a1-a5fe-4e660ff2868f")
    service_uuid = ble.UUID("0005", base_uuid)
    handle = ble.get_characteristic_handle(sock, service_uuid)
    ble.write_command(sock, handle, time_units.to_bytes(1, 'little'))
    pass
    
'''
Check and set time
'''
import time
import struct
from datetime import datetime
from ntplib import NTPClient
from bluepy.btle import Peripheral, UUID

def set_current_time(ntp_server='pool.ntp.org', timeout=5):
    # Define the UUIDs for the device and service
    base_uuid = UUID("b3e0xxxx-2594-2a1a-5fe4-e660ff2868f")
    service_uuid = UUID("000a")

    # Create a connection to the device
    device = Peripheral('AA:BB:CC:DD:EE:FF')
    service = device.getServiceByUUID(UUID(str(base_uuid).replace('xxxx', str(service_uuid))))

    # Read the current Unix Epoch time from the device
    characteristic = service.getCharacteristics(UUID("0002"))[0]
    device_time = struct.unpack("<I", characteristic.read())[0]

    # Read the current time from the NTP server
    ntp_client = NTPClient()
    ntp_response = ntp_client.request(ntp_server, version=3)
    ntp_time = int(ntp_response.tx_time)

    # Calculate the difference between the device time and the NTP time
    time_diff = abs(device_time - ntp_time)

    # If the difference is greater than 3 seconds, update the device time
    if time_diff > 3:
        # Convert the NTP time to an unsigned 32-bit integer
        ntp_time_bytes = struct.pack("<I", ntp_time)

        # Write the NTP time to the device
        characteristic = service.getCharacteristics(UUID("0001"))[0]
        characteristic.write(ntp_time_bytes, withResponse=True)

    # Disconnect from the device
    device.disconnect()

    # Return the current Unix Epoch time
    return datetime.fromtimestamp(device_time).strftime("%Y-%m-%d %H:%M:%S")