From d29cd256f0c17a8e484d62849370a4ed9b9a08d9 Mon Sep 17 00:00:00 2001 From: Nicola <62206011+n-elia@users.noreply.github.com> Date: Thu, 17 Mar 2022 01:21:22 +0100 Subject: [PATCH] Refactoring for PEP compliance. Bumps to v0.3.4. --- README.md | 157 +++---- example/boot.py | 10 +- example/main.py | 63 +-- max30102/__init__.py | 807 ------------------------------------ max30102/circular_buffer.py | 41 ++ max30102/max30102.py | 717 ++++++++++++++++++++++++++++++++ sdist_upip.py | 78 ++-- setup.py | 5 +- 8 files changed, 921 insertions(+), 957 deletions(-) create mode 100644 max30102/circular_buffer.py create mode 100644 max30102/max30102.py diff --git a/README.md b/README.md index ee988b0..61ed0ea 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,9 @@ -[![Upload Python Package](https://github.com/n-elia/MAX30102-MicroPython-driver/actions/workflows/python-publish.yml/badge.svg)](https://github.com/n-elia/MAX30102-MicroPython-driver/actions/workflows/python-publish.yml) [![PyPI version](https://badge.fury.io/py/micropython-max30102.svg)](https://badge.fury.io/py/micropython-max30102) +[![PyPi Build and Upload](https://github.com/n-elia/MAX30102-MicroPython-driver/actions/workflows/python-publish.yml/badge.svg?event=release)](https://github.com/n-elia/MAX30102-MicroPython-driver/actions/workflows/python-publish.yml) [![PyPI version](https://badge.fury.io/py/micropython-max30102.svg)](https://badge.fury.io/py/micropython-max30102) # Maxim MAX30102 MicroPython driver A port of the SparkFun driver for Maxim MAX30102 sensor to MicroPython. -It _should_ work for MAX30105, too. Please check if it works for MAX30105 and report in the Discussions section :) + +It _should_ work for MAX30105, too. If you have the chance to test this library with a MAX30105, please leave your feedback in the Discussions section. ## Aknowledgements @@ -31,14 +32,19 @@ This work is not intended to be used in professional environments, and there are - Driver: `./max30102` - Example: `./example` -## Additional information - -This driver has been tested with Maxim Integrated MAX30102 sensor. -However, it *should* work with MAX30105 sensor, too. +## Changelog +- v0.3.4 + - The package has been refactored to be compliant to PEP standards. +- v0.3.3 + - Made a PyPi package. Now you can install this package with upip. + - Tested with Raspberry Pi Pico and non-genuine sensors. +- v0.3 + - Tested with TinyPico board (based on ESP32-D4) and genuine Maxim MAX30102 sensor. -### How to import the library and run the example +## How to import the library and run the example +Important note: the library will load the default TinyPico ESP32 board I2C configuration (SDA Pin 21, SCL Pin 22, 400kHz speed). If you're using a different board, follow the instructions given below, in *Setup and configuration* section. -#### Including this library into your project (network-enabled MicroPython ports) +### Including this library into your project (**network-enabled MicroPython ports**) To include the library into a network-enabled MicroPython project, it's sufficient to install the package: ```python @@ -50,19 +56,23 @@ Make sure that your firmware runs these lines **after** an Internet connection h To run the example in `./example` folder, please set your WiFi credentials in `boot.py` and then upload `./example` content into your microcontroller. If you prefer, you can perform a manual install as explained below. -#### Including this library into your project (manual way) +### Including this library into your project (**manual way**) -To directly include the library into a MicroPython project, it's sufficient to copy the `max30102` module next to your `main.py`, and then import it as follows: +To directly include the library into a MicroPython project, it's sufficient to copy `max30102/circular_buffer.py` and `max30102/max30102.py` next to your `main.py` file. Then, import the constructor as follows: ```python from max30102 import MAX30102 ``` -For instance, to run the example in `./example` folder, copy the `./max30102` directory and paste it in the `./example` folder. Then, upload `./example` content into your microcontroller and run it. +To run the example in `./example` folder, copy `max30102/circular_buffer.py` and `max30102/max30102.py` into the `./example` directory. Then, upload the `./example` directory content into your microcontroller. + + +### Setup and configuration +#### I2C pins -#### Setup and configuration +When creating a sensor instance, if you leave the arguments empty, the library will load the default TinyPico ESP32 board I2C configuration (SDA Pin 21, SCL Pin 22, 400kHz speed). -At first, create a sensor instance. If you leave the arguments empty, the library will load the default TinyPico ESP32 board I2C configuration (SDA Pin 21, SCL Pin 22, 400kHz speed). +If you have a different board, you can set different I2C pins as shown in the following example: ```python # Default config (ESP32): @@ -70,12 +80,13 @@ sensor = MAX30102() # Alternative: from machine import SoftI2C, Pin -i2cInstance = SoftI2C(sda=Pin(my_SDA_pin), - scl=Pin(my_SCL_pin), - freq=100000) +i2c = SoftI2C(sda=Pin(my_SDA_pin), + scl=Pin(my_SCL_pin), + freq=100000) sensor = MAX30102(i2cHexAddress = 0x57, i2c = i2cInstance) ``` +#### Sensor setup Then, the sensor has to be setup. The library provides a method to setup the sensor at once. Leaving the arguments empty, makes the library load their default values. > Default configuration values: @@ -104,44 +115,44 @@ The library provides the methods to change the configuration parameters one by o ```python # Set the number of samples to be averaged by the chip -SAMPLE_AVG = 8 # Options: 1, 2, 4, 8, 16, 32 -self.setFIFOAverage(SAMPLE_AVG) +SAMPLE_AVG = 8 # Options: 1, 2, 4, 8, 16, 32 +self.set_fifo_average(SAMPLE_AVG) # Set the ADC range -ADC_RANGE = 4096 # Options: 2048, 4096, 8192, 16384 -self.setADCRange(ADC_RANGE) +ADC_RANGE = 4096 # Options: 2048, 4096, 8192, 16384 +self.set_adc_range(ADC_RANGE) # Set the sample rate -SAMPLE_RATE = 400 # Options: 50, 100, 200, 400, 800, 1000, 1600, 3200 -self.setSampleRate(SAMPLE_RATE) +SAMPLE_RATE = 400 # Options: 50, 100, 200, 400, 800, 1000, 1600, 3200 +self.set_sample_rate(SAMPLE_RATE) # Set the Pulse Width -PULSE_WIDTH = 118 # Options: 69, 118, 215, 411 -self.setPulseWidth(PULSE_WIDTH) +PULSE_WIDTH = 118 # Options: 69, 118, 215, 411 +self.set_pulse_width(PULSE_WIDTH) # Set the LED mode -LED_MODE = 2 # Options: 1 (red), 2 (red + IR), 3 (red + IR + g - MAX30105 only) -self.setLEDMode(LED_MODE) +LED_MODE = 2 # Options: 1 (red), 2 (red + IR), 3 (red + IR + g - MAX30105 only) +self.set_led_mode(LED_MODE) # Set the LED brightness of each LED LED_POWER = MAX30105_PULSEAMP_MEDIUM # Options: -# MAX30105_PULSEAMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch -# MAX30105_PULSEAMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch -# MAX30105_PULSEAMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch -# MAX30105_PULSEAMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch -self.setPulseAmplitudeRed(LED_POWER) -self.setPulseAmplitudeIR(LED_POWER) -self.setPulseAmplitudeGreen(LED_POWER) +# MAX30105_PULSE_AMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch +# MAX30105_PULSE_AMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch +# MAX30105_PULSE_AMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch +# MAX30105_PULSE_AMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch +self.set_pulse_amplitude_red(LED_POWER) +self.set_pulse_amplitude_it(LED_POWER) +self.set_pulse_amplitude_green(LED_POWER) # Set the LED brightness of all the active LEDs LED_POWER = MAX30105_PULSEAMP_MEDIUM # Options: -# MAX30105_PULSEAMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch -# MAX30105_PULSEAMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch -# MAX30105_PULSEAMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch -# MAX30105_PULSEAMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch -sensor.setActiveLEDsAmplitude(LED_POWER) +# MAX30105_PULSE_AMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch +# MAX30105_PULSE_AMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch +# MAX30105_PULSE_AMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch +# MAX30105_PULSE_AMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch +sensor.set_active_leds_amplitude(LED_POWER) ``` #### Data acquisition @@ -153,20 +164,20 @@ The `check()` method polls the sensor to check if new samples are available in t As a consequence, this is an example on how the library can be used to read data from the sensor: ```python -while(True): - # The check() method has to be continuously polled, to check if - # there are new readings into the sensor's FIFO queue. When new - # readings are available, this function will put them into the storage. - sensor.check() - - # Check if the storage contains available samples - if(sensor.available()): - # Access the storage FIFO and gather the readings (integers) - red_sample = sensor.popRedFromStorage() - ir_sample = sensor.popIRFromStorage() - - # Print the acquired data (can be plot with Arduino Serial Plotter) - print(red_sample, ",", ir_sample) +while (True): + # The check() method has to be continuously polled, to check if + # there are new readings into the sensor's FIFO queue. When new + # readings are available, this function will put them into the storage. + sensor.check() + + # Check if the storage contains available samples + if (sensor.available()): + # Access the storage FIFO and gather the readings (integers) + red_sample = sensor.pop_red_from_storage() + ir_sample = sensor.pop_ir_from_storage() + + # Print the acquired data (can be plot with Arduino Serial Plotter) + print(red_sample, ",", ir_sample) ``` #### Data acquisition rate @@ -183,7 +194,7 @@ The library computes this value, that can be accessed with: ```python # Get the estimated acquisition rate -acquisition_rate = sensor.getAcquisitionFrequency() +acquisition_rate = sensor.get_acquisition_frequency() ``` However, there are some limitations on sensor side and on micropocessor side that may affect the acquisition rate (see issue #6 for more details about it). Is is possible to measure the real throughput as in [this](https://github.com/sparkfun/SparkFun_MAX3010x_Sensor_Library/blob/master/examples/Example9_RateTesting/Example9_RateTesting.ino) example sketch by SparkFun, using the following snippet: @@ -195,33 +206,33 @@ from utime import ticks_diff, ticks_ms # Starting time of the acquisition t_start = ticks_ms() # Number of samples that has been collected -samples_n = 0 - -while(True): - sensor.check() - - if(sensor.available()): - # Access the storage FIFO and gather the readings (integers) - red_sample = sensor.popRedFromStorage() - ir_sample = sensor.popIRFromStorage() - - # We can compute the real frequency at which we receive data - if (compute_frequency): - samples_n=samples_n+1 - if ( ticks_diff(ticks_ms(), t_start) > 999 ): - f_HZ = samples_n/1 - samples_n = 0 - t_start = ticks_ms() - print("Acquisition frequency = ",f_HZ) +samples_n = 0 + +while (True): + sensor.check() + + if (sensor.available()): + # Access the storage FIFO and gather the readings (integers) + red_sample = sensor.pop_red_from_storage() + ir_sample = sensor.pop_ir_from_storage() + + # We can compute the real frequency at which we receive data + if (compute_frequency): + samples_n = samples_n + 1 + if (ticks_diff(ticks_ms(), t_start) > 999): + f_HZ = samples_n / 1 + samples_n = 0 + t_start = ticks_ms() + print("Acquisition frequency = ", f_HZ) ``` #### Die temperature reading -The `readTemperature()` method allows to read the internal die temperature. An example is proposed below. +The `read_temperature()` method allows to read the internal die temperature. An example is proposed below. ```python # Read the die temperature in Celsius degree -temperature_C = sensor.readTemperature() +temperature_C = sensor.read_temperature() print("Die temperature: ", temperature_C, "°C") ``` diff --git a/example/boot.py b/example/boot.py index 8b8ba15..394077e 100644 --- a/example/boot.py +++ b/example/boot.py @@ -1,6 +1,6 @@ -# This file is executed on every boot (including wake-boot from deepsleep) +# This file is executed on every boot (including wake-boot from deep sleep) -def do_connect(ssid:str, password:str): +def do_connect(ssid: str, password: str): import network wlan = network.WLAN(network.STA_IF) wlan.active(True) @@ -13,17 +13,17 @@ def do_connect(ssid:str, password:str): if __name__ == '__main__': - # Put yor WiFi credentials here + # Put yor Wi-Fi credentials here my_ssid = "my_ssid" my_pass = "my_password" try: - import max30102 + from max30102 import MAX30102 except: + print("'max30102' not found!") try: import upip do_connect(my_ssid, my_pass) upip.install("micropython-max30102") except: print("Unable to get 'micropython-max30102' package!") - diff --git a/example/main.py b/example/main.py index 59aaa5f..903bc90 100644 --- a/example/main.py +++ b/example/main.py @@ -1,15 +1,16 @@ # main.py from machine import sleep -from max30102 import MAX30102 from utime import ticks_diff, ticks_ms +from max30102 import MAX30102 + if __name__ == '__main__': # Sensor instance. If left empty, loads default ESP32 I2C configuration sensor = MAX30102() # Alternatively (for other boards): # sensor = MAX30102(i2cHexAddress = 0x57) # sensor = MAX30102(i2cHexAddress = 0x57, i2c = i2cInstance) - + # The default sensor configuration is: # Led mode: 2 (RED + IR) # ADC range: 16384 @@ -17,53 +18,53 @@ # Led power: maximum (50.0mA - Presence detection of ~12 inch) # Averaged samples: 8 # pulse width: 411 - - # It's possible to setup the sensor at once with the setup_sensor() method. + + # It's possible to set up the sensor at once with the setup_sensor() method. # If no parameters are supplied, the default config is loaded. print("Setting up sensor with default configuration.", '\n') sensor.setup_sensor() - + # It is also possible to tune the configuration parameters one by one. # Set the sample rate to 800: 800 samples/s are collected by the sensor - sensor.setSampleRate(800) + sensor.set_sample_rate(800) # Set the number of samples to be averaged per each reading - sensor.setFIFOAverage(8) - + sensor.set_fifo_average(8) + sleep(1) # The readTemperature() method allows to extract the die temperature in °C print("Reading temperature in °C.", '\n') - print(sensor.readTemperature()) - - # Select wether to compute the acquisition frequency or not - compute_frequency = False - + print(sensor.read_temperature()) + + # Select whether to compute the acquisition frequency or not + compute_frequency = True + print("Starting data acquisition from RED & IR registers...", '\n') sleep(1) - - t_start = ticks_ms() # Starting time of the acquisition - samples_n = 0 # Number of samples that has been collected - - while(True): + + t_start = ticks_ms() # Starting time of the acquisition + samples_n = 0 # Number of samples that has been collected + + while True: # The check() method has to be continuously polled, to check if # there are new readings into the sensor's FIFO queue. When new # readings are available, this function will put them into the storage. sensor.check() - + # Check if the storage contains available samples - if(sensor.available()): + if sensor.available(): # Access the storage FIFO and gather the readings (integers) - red_reading = sensor.popRedFromStorage() - IR_reading = sensor.popIRFromStorage() - - # Print the acquired data (can be plot with Arduino Serial Plotter) - print(red_reading, ",", IR_reading) - + red_reading = sensor.pop_red_from_storage() + ir_reading = sensor.pop_ir_from_storage() + + # Print the acquired data (so that it can be plotted with a Serial Plotter) + print(red_reading, ",", ir_reading) + # We can compute the real frequency at which we receive data - if (compute_frequency): - samples_n=samples_n+1 - if ( ticks_diff(ticks_ms(), t_start) > 999 ): - f_HZ = samples_n/1 + if compute_frequency: + samples_n = samples_n + 1 + if ticks_diff(ticks_ms(), t_start) > 999: + f_HZ = samples_n / 1 samples_n = 0 t_start = ticks_ms() - print("acquisition frequency = ",f_HZ) + print("acquisition frequency = ", f_HZ) diff --git a/max30102/__init__.py b/max30102/__init__.py index de4f56d..e69de29 100644 --- a/max30102/__init__.py +++ b/max30102/__init__.py @@ -1,807 +0,0 @@ -''' -This work is a lot based on: -- https://github.com/sparkfun/SparkFun_MAX3010x_Sensor_Library - Written by Peter Jansen and Nathan Seidle (SparkFun) - This is a library written for the Maxim MAX30105 Optical Smoke Detector - It should also work with the MAX30102. However, the MAX30102 does not have a Green LED. - These sensors use I2C to communicate, as well as a single (optional) - interrupt line that is not currently supported in this driver. - Written by Peter Jansen and Nathan Seidle (SparkFun) - BSD license, all text above must be included in any redistribution. - -- https://github.com/kandizzy/esp32-micropython/blob/master/PPG/ppg/MAX30105.py - A port of the library to MicroPython by kandizzy - -With this driver, I want to give an almost full access to Maxim MAX30102 sensor -functionalities. -This code is being tested on TinyPico Board with Maxim genuine sensors. - n-elia -''' - -from machine import Pin, SoftI2C -from ucollections import deque -from ustruct import unpack -from utime import sleep_ms, ticks_diff, ticks_ms - - -# These I2C default settings work for TinyPico (ESP32-based board) -MAX3010X_I2C_ADDRESS = 0x57 -I2C_SPEED_FAST = 400000 # 400kHz speed -I2C_SPEED_NORMAL = 100000 # 100kHz speed -I2C_DEF_SDA_PIN = 21 -I2C_DEF_SCL_PIN = 22 - -# Status Registers -MAX30105_INTSTAT1 = 0x00 -MAX30105_INTSTAT2 = 0x01 -MAX30105_INTENABLE1 = 0x02 -MAX30105_INTENABLE2 = 0x03 - -# FIFO Registers -MAX30105_FIFOWRITEPTR = 0x04 -MAX30105_FIFOOVERFLOW = 0x05 -MAX30105_FIFOREADPTR = 0x06 -MAX30105_FIFODATA = 0x07 - -# Configuration Registers -MAX30105_FIFOCONFIG = 0x08 -MAX30105_MODECONFIG = 0x09 -MAX30105_PARTICLECONFIG = 0x0A # Sometimes listed as 'SPO2' in datasheet (pag.11) -MAX30105_LED1_PULSEAMP = 0x0C # IR -MAX30105_LED2_PULSEAMP = 0x0D # RED -MAX30105_LED3_PULSEAMP = 0x0E # GREEN (when available) -MAX30105_LED_PROX_AMP = 0x10 -MAX30105_MULTILEDCONFIG1 = 0x11 -MAX30105_MULTILEDCONFIG2 = 0x12 - -# Die Temperature Registers -MAX30105_DIETEMPINT = 0x1F -MAX30105_DIETEMPFRAC = 0x20 -MAX30105_DIETEMPCONFIG = 0x21 - -# Proximity Function Registers -MAX30105_PROXINTTHRESH = 0x30 - -# Part ID Registers -MAX30105_REVISIONID = 0xFE -MAX30105_PARTID = 0xFF # Should always be 0x15. Identical for MAX30102. - -# MAX30105 Commands -# Interrupt configuration (datasheet pag 13, 14) -MAX30105_INT_A_FULL_MASK = ~0b10000000 -MAX30105_INT_A_FULL_ENABLE = 0x80 -MAX30105_INT_A_FULL_DISABLE = 0x00 - -MAX30105_INT_DATA_RDY_MASK = ~0b01000000 -MAX30105_INT_DATA_RDY_ENABLE = 0x40 -MAX30105_INT_DATA_RDY_DISABLE = 0x00 - -MAX30105_INT_ALC_OVF_MASK = ~0b00100000 -MAX30105_INT_ALC_OVF_ENABLE = 0x20 -MAX30105_INT_ALC_OVF_DISABLE = 0x00 - -MAX30105_INT_PROX_INT_MASK = ~0b00010000 -MAX30105_INT_PROX_INT_ENABLE = 0x10 -MAX30105_INT_PROX_INT_DISABLE = 0x00 - -MAX30105_INT_DIE_TEMP_RDY_MASK = ~0b00000010 -MAX30105_INT_DIE_TEMP_RDY_ENABLE = 0x02 -MAX30105_INT_DIE_TEMP_RDY_DISABLE = 0x00 - -# FIFO data queue configuration -MAX30105_SAMPLEAVG_MASK = ~0b11100000 -MAX30105_SAMPLEAVG_1 = 0x00 -MAX30105_SAMPLEAVG_2 = 0x20 -MAX30105_SAMPLEAVG_4 = 0x40 -MAX30105_SAMPLEAVG_8 = 0x60 -MAX30105_SAMPLEAVG_16 = 0x80 -MAX30105_SAMPLEAVG_32 = 0xA0 - -MAX30105_ROLLOVER_MASK = 0xEF -MAX30105_ROLLOVER_ENABLE = 0x10 -MAX30105_ROLLOVER_DISABLE = 0x00 -# Mask for 'almost full' interrupt (defaults to 32 samples) -MAX30105_A_FULL_MASK = 0xF0 - -# Mode configuration commands (page 19) -MAX30105_SHUTDOWN_MASK = 0x7F -MAX30105_SHUTDOWN = 0x80 -MAX30105_WAKEUP = 0x00 -MAX30105_RESET_MASK = 0xBF -MAX30105_RESET = 0x40 - -MAX30105_MODE_MASK = 0xF8 -MAX30105_MODE_REDONLY = 0x02 -MAX30105_MODE_REDIRONLY = 0x03 -MAX30105_MODE_MULTILED = 0x07 - -# Particle sensing configuration commands (pgs 19-20) -MAX30105_ADCRANGE_MASK = 0x9F -MAX30105_ADCRANGE_2048 = 0x00 -MAX30105_ADCRANGE_4096 = 0x20 -MAX30105_ADCRANGE_8192 = 0x40 -MAX30105_ADCRANGE_16384 = 0x60 - -MAX30105_SAMPLERATE_MASK = 0xE3 -MAX30105_SAMPLERATE_50 = 0x00 -MAX30105_SAMPLERATE_100 = 0x04 -MAX30105_SAMPLERATE_200 = 0x08 -MAX30105_SAMPLERATE_400 = 0x0C -MAX30105_SAMPLERATE_800 = 0x10 -MAX30105_SAMPLERATE_1000 = 0x14 -MAX30105_SAMPLERATE_1600 = 0x18 -MAX30105_SAMPLERATE_3200 = 0x1C - -MAX30105_PULSEWIDTH_MASK = 0xFC -MAX30105_PULSEWIDTH_69 = 0x00 -MAX30105_PULSEWIDTH_118 = 0x01 -MAX30105_PULSEWIDTH_215 = 0x02 -MAX30105_PULSEWIDTH_411 = 0x03 - -# LED brigthness level. It affects the distance of detection. -MAX30105_PULSEAMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch -MAX30105_PULSEAMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch -MAX30105_PULSEAMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch -MAX30105_PULSEAMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch - -# Multi-LED Mode configuration (datasheet pag 22) -MAX30105_SLOT1_MASK = 0xF8 -MAX30105_SLOT2_MASK = 0x8F -MAX30105_SLOT3_MASK = 0xF8 -MAX30105_SLOT4_MASK = 0x8F -SLOT_NONE = 0x00 -SLOT_RED_LED = 0x01 -SLOT_IR_LED = 0x02 -SLOT_GREEN_LED = 0x03 -SLOT_NONE_PILOT = 0x04 -SLOT_RED_PILOT = 0x05 -SLOT_IR_PILOT = 0x06 -SLOT_GREEN_PILOT = 0x07 - -MAX_30105_EXPECTEDPARTID = 0x15; - -TAG = 'MAX30105' - -# Size of the queued readings -STORAGE_QUEUE_SIZE = 4 - -# Very rough implementation of a circular buffer based on deque -class CircularBuffer(object): - def __init__(self, maxSize): - self.data = deque((), maxSize, True) - self.maxSize = maxSize - - def __len__(self): - return len(self.data) - - def isEmpty(self): - return not bool(self.data) - - def append(self, item): - try: - self.data.append(item) - except IndexError: - # deque full, popping 1st item out - self.data.popleft() - self.data.append(item) - - def pop(self): - return self.data.popleft() - - def clear(self): - self.data = deque((), self.maxSize, True) - - def popHead(self): - bufferSize = len(self.data) - temp = deque((), self.maxSize, True) - temp = self.data - if (bufferSize == 1): - pass - elif (bufferSize > 1): - self.data.clear() - for x in range(bufferSize - 1): - self.data = temp.popleft() - else: - return 0 - return temp.popleft() - -# Data structure to hold the last readings -class SensorData(): - def __init__(self): - self.red = CircularBuffer(STORAGE_QUEUE_SIZE) - self.IR = CircularBuffer(STORAGE_QUEUE_SIZE) - self.green = CircularBuffer(STORAGE_QUEUE_SIZE) - -# Sensor class -class MAX30102(object): - def __init__(self, - i2cHexAddress=MAX3010X_I2C_ADDRESS, - i2c=SoftI2C(sda=Pin(I2C_DEF_SDA_PIN), - scl=Pin(I2C_DEF_SCL_PIN), - freq=I2C_SPEED_FAST) - ): - self._address = i2cHexAddress - self._i2c = i2c - self._activeLEDs = None - self._pulseWidth = None - self._multiLedReadMode = None - # Store current config values to compute acquisition frequency - self._sampleRate = None - self._sampleAvg = None - self._acqFrequency = None - self._acqFrequencyinv = None - # Circular buffer of readings from the sensor - self.sense = SensorData() - - try: - self._i2c.readfrom(self._address, 1) - except OSError as error: - raise SystemExit(error) - - if not (self.checkPartID()): - raise SystemExit() - - # Sensor setup method - def setup_sensor(self, LED_MODE=2, ADC_RANGE=16384, SAMPLE_RATE=400, - LED_POWER=MAX30105_PULSEAMP_HIGH, SAMPLE_AVG=8, - PULSE_WIDTH=411): - # Reset the sensor's registers from previous configurations - self.softReset() - - # Set the number of samples to be averaged by the chip to 8 - self.setFIFOAverage(SAMPLE_AVG) - - # Allow FIFO queue to wrap/roll over - self.enableFIFORollover() - - # Set the LED mode to the default value of 2 (RED + IR) - # Note: the 3rd mode is available only with MAX30105 - self.setLEDMode(LED_MODE) - - # Set the ADC range to default value of 16384 - self.setADCRange(ADC_RANGE) - - # Set the sample rate to the default value of 400 - self.setSampleRate(SAMPLE_RATE) - - # Set the Pulse Width to the default value of 411 - self.setPulseWidth(PULSE_WIDTH) - - # Set the LED brightness to the default value of 'low' - self.setPulseAmplitudeRed(LED_POWER) - self.setPulseAmplitudeIR(LED_POWER) - self.setPulseAmplitudeGreen(LED_POWER) - self.setPulseAmplitudeProximity(LED_POWER) - - # Clears the FIFO - self.clearFIFO() - - def __del__(self): - self.shutDown() - - # Methods to read the two interrupt flags - def getINT1(self): - # Load the Interrupt 1 status (configurable) from the register - rev_id = self.i2c_read_register(MAX30105_INTSTAT1) - return rev_id - - def getINT2(self): - # Load the Interrupt 2 status (DIE_TEMP_DRY) from the register - rev_id = self.i2c_read_register(MAX30105_INTSTAT2) - return rev_id - - # Methods to setup the interrupt flags - def enableAFULL(self): - # Enable the almost full interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_A_FULL_MASK, - MAX30105_INT_A_FULL_ENABLE) - - def disableAFULL(self): - # Disable the almost full interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_A_FULL_MASK, - MAX30105_INT_A_FULL_DISABLE) - - def enableDATARDY(self): - # Enable the new FIFO data ready interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_DATA_RDY_MASK, - MAX30105_INT_DATA_RDY_ENABLE) - - def disableDATARDY(self): - # Disable the new FIFO data ready interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_DATA_RDY_MASK, - MAX30105_INT_DATA_RDY_DISABLE) - - def enableALCOVF(self): - # Enable the ambient light limit interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_ALC_OVF_MASK, - MAX30105_INT_ALC_OVF_ENABLE) - - def disableALCOVF(self): - # Disable the ambient light limit interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_ALC_OVF_MASK, - MAX30105_INT_ALC_OVF_DISABLE) - - def enablePROXINT(self): - # Enable the proximity interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_PROX_INT_MASK, - MAX30105_INT_PROX_INT_ENABLE) - - def disablePROXINT(self): - # Disable the proximity interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE1, - MAX30105_INT_PROX_INT_MASK, - MAX30105_INT_PROX_INT_DISABLE) - - def enableDIETEMPRDY(self): - # Enable the die temp. conversion finish interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE2, - MAX30105_INT_DIE_TEMP_RDY_MASK, - MAX30105_INT_DIE_TEMP_RDY_ENABLE) - - def disableDIETEMPRDY(self): - # Disable the die temp. conversion finish interrupt (datasheet pag. 13) - self.bitMask(MAX30105_INTENABLE2, - MAX30105_INT_DIE_TEMP_RDY_MASK, - MAX30105_INT_DIE_TEMP_RDY_DISABLE) - - # Configuration reset - def softReset(self): - TAG = 'softReset' - # When the RESET bit is set to one, all configuration, threshold, - # and data registers are reset to their power-on-state through - # a power-on reset. The RESET bit is cleared automatically back to zero - # after the reset sequence is completed. (datasheet pag. 19) - self.set_bitMask(MAX30105_MODECONFIG, - MAX30105_RESET_MASK, - MAX30105_RESET) - curr_status = -1 - while not ( (curr_status & MAX30105_RESET) == 0 ): - sleep_ms(10) - curr_status = ord(self.i2c_read_register(MAX30105_MODECONFIG)) - - # Power states methods - def shutDown(self): - # Put IC into low power mode (datasheet pg. 19) - # During shutdown the IC will continue to respond to I2C commands but - # will not update with or take new readings (such as temperature). - self.set_bitMask(MAX30105_MODECONFIG, - MAX30105_SHUTDOWN_MASK, - MAX30105_SHUTDOWN) - - def wakeUp(self): - # Pull IC out of low power mode (datasheet pg. 19) - self.set_bitMask(MAX30105_MODECONFIG, - MAX30105_SHUTDOWN_MASK, - MAX30105_WAKEUP) - - # LED Configuration - def setLEDMode(self, LED_mode): - # Set LED mode: select which LEDs are used for sampling - # Options: RED only, RED + IR only, or ALL (datasheet pag. 19) - if LED_mode == 1: - self.set_bitMask(MAX30105_MODECONFIG, - MAX30105_MODE_MASK, - MAX30105_MODE_REDONLY) - elif LED_mode == 2: - self.set_bitMask(MAX30105_MODECONFIG, - MAX30105_MODE_MASK, - MAX30105_MODE_REDIRONLY) - elif LED_mode == 3: - self.set_bitMask(MAX30105_MODECONFIG, - MAX30105_MODE_MASK, - MAX30105_MODE_MULTILED) - else: - raise ValueError('Wrong LED mode:{0}!'.format(LED_mode)) - - # Multi-LED Mode Configuration: enable the reading of the LEDs - # depending on the chosen mode - self.enableSlot(1, SLOT_RED_LED) - if (LED_mode > 1): - self.enableSlot(2, SLOT_IR_LED) - if (LED_mode > 2): - self.enableSlot(3, SLOT_GREEN_LED) - - # Store the LED mode used to control how many bytes to read from - # FIFO buffer in multiLED mode: a sample is made of 3 bytes - self._activeLEDs = LED_mode - self._multiLedReadMode = LED_mode * 3 - - # ADC Configuration - def setADCRange(self, ADC_range): - # ADC range: set the range of the conversion - # Options: 2048, 4096, 8192, 16384 - # Current draw: 7.81pA. 15.63pA, 31.25pA, 62.5pA per LSB. - if ADC_range == 2048: - range = MAX30105_ADCRANGE_2048 - elif ADC_range == 4096: - range = MAX30105_ADCRANGE_4096 - elif ADC_range == 8192: - range = MAX30105_ADCRANGE_8192 - elif ADC_range == 16384: - range = MAX30105_ADCRANGE_16384 - else: - raise ValueError('Wrong ADC range:{0}!'.format(ADC_range)) - - self.set_bitMask(MAX30105_PARTICLECONFIG, - MAX30105_ADCRANGE_MASK, - range) - - # Sample Rate Configuration - def setSampleRate(self, sample_rate): - TAG = 'setSampleRate' - # Sample rate: select the number of samples taken per second. - # Options: 50, 100, 200, 400, 800, 1000, 1600, 3200 - # Note: in theory, the resulting acquisition frequency for the end user - # is sampleRate/sampleAverage. However, it is worth testing it before - # assuming that the sensor can effectively sustain that frequency - # given its configuration. - if sample_rate == 50: - sr = MAX30105_SAMPLERATE_50 - elif sample_rate == 100: - sr = MAX30105_SAMPLERATE_100 - elif sample_rate == 200: - sr = MAX30105_SAMPLERATE_200 - elif sample_rate == 400: - sr = MAX30105_SAMPLERATE_400 - elif sample_rate == 800: - sr = MAX30105_SAMPLERATE_800 - elif sample_rate == 1000: - sr = MAX30105_SAMPLERATE_1000 - elif sample_rate == 1600: - sr = MAX30105_SAMPLERATE_1600 - elif sample_rate == 3200: - sr = MAX30105_SAMPLERATE_3200 - else: - raise ValueError('Wrong sample rate:{0}!'.format(sample_rate)) - - self.set_bitMask(MAX30105_PARTICLECONFIG, - MAX30105_SAMPLERATE_MASK, - sr) - - # Store the sample rate and recompute the acq. freq. - self._sampleRate = sample_rate - self.updateAcquisitionFrequency() - - # Pulse width Configuration - def setPulseWidth(self, pulse_width): - TAG = 'setPulseWidth' - # Pulse width of LEDs: The longer the pulse width the longer range of - # detection. At 69us and 0.4mA it's about 2 inches, - # at 411us and 0.4mA it's about 6 inches. - if pulse_width == 69: - pw = MAX30105_PULSEWIDTH_69 - elif pulse_width == 118: - pw = MAX30105_PULSEWIDTH_118 - elif pulse_width == 215: - pw = MAX30105_PULSEWIDTH_215 - elif pulse_width == 411: - pw = MAX30105_PULSEWIDTH_411 - else: - raise ValueError('Wrong pulse width:{0}!'.format(pulse_width)) - self.set_bitMask(MAX30105_PARTICLECONFIG, - MAX30105_PULSEWIDTH_MASK, - pw) - - # Store the pulse width - self._pulseWidth = pw - - # LED Pulse Amplitude Configuration methods - def setActiveLEDsAmplitude(self, amplitude): - if (self._activeLEDs > 0): - self.setPulseAmplitudeRed(amplitude) - if (self._activeLEDs > 1): - self.setPulseAmplitudeIR(amplitude) - if(self._activeLEDs > 2): - self.setPulseAmplitudeGreen(amplitude) - - def setPulseAmplitudeRed(self, amplitude): - self.i2c_set_register(MAX30105_LED1_PULSEAMP, amplitude) - - def setPulseAmplitudeIR(self, amplitude): - self.i2c_set_register(MAX30105_LED2_PULSEAMP, amplitude) - - def setPulseAmplitudeGreen(self, amplitude): - self.i2c_set_register(MAX30105_LED3_PULSEAMP, amplitude) - - def setPulseAmplitudeProximity(self, amplitude): - self.i2c_set_register(MAX30105_LED_PROX_AMP, amplitude) - - def setProximityThreshold(self, threshMSB): - # Set the IR ADC count that will trigger the beginning of particle- - # sensing mode.The threshMSB signifies only the 8 most significant-bits - # of the ADC count. (datasheet page 24) - self.i2c_set_register(MAX30105_PROXINTTHRESH, threshMSB) - - # FIFO averaged samples number Configuration - def setFIFOAverage(self, number_of_samples): - TAG = 'setFIFOAverage' - # FIFO sample avg: set the number of samples to be averaged by the chip. - # Options: MAX30105_SAMPLEAVG_1, 2, 4, 8, 16, 32 - if number_of_samples == 1: - ns = MAX30105_SAMPLEAVG_1 - elif number_of_samples == 2: - ns = MAX30105_SAMPLEAVG_2 - elif number_of_samples == 4: - ns = MAX30105_SAMPLEAVG_4 - elif number_of_samples == 8: - ns = MAX30105_SAMPLEAVG_8 - elif number_of_samples == 16: - ns = MAX30105_SAMPLEAVG_16 - elif number_of_samples == 32: - ns = MAX30105_SAMPLEAVG_32 - else: - raise ValueError( - 'Wrong number of samples:{0}!'.format(number_of_samples)) - self.set_bitMask(MAX30105_FIFOCONFIG, MAX30105_SAMPLEAVG_MASK, ns) - - # Store the number of averaged samples and recompute the acq. freq. - self._sampleAvg = number_of_samples - self.updateAcquisitionFrequency() - - def updateAcquisitionFrequency(self): - TAG = 'updateAcquisitionFrequency' - if (None in [self._sampleRate, self._sampleAvg]): - return - else: - self._acqFrequency = self._sampleRate / self._sampleAvg - from math import ceil - - # Compute the time interval to wait before taking a good measure - # (see note in setSampleRate() method) - self._acqFrequencyInv = int(ceil(1000/self._acqFrequency)) - - def getAcquisitionFrequency(self): - return self._acqFrequency - - def clearFIFO(self): - # Resets all points to start in a known state - # Datasheet page 15 recommends clearing FIFO before beginning a read - self.i2c_set_register(MAX30105_FIFOWRITEPTR, 0) - self.i2c_set_register(MAX30105_FIFOOVERFLOW, 0) - self.i2c_set_register(MAX30105_FIFOREADPTR, 0) - - def enableFIFORollover(self): - # FIFO rollover: enable to allow FIFO tro wrap/roll over - self.set_bitMask(MAX30105_FIFOCONFIG, - MAX30105_ROLLOVER_MASK, - MAX30105_ROLLOVER_ENABLE) - - def disableFIFORollover(self): - # FIFO rollover: disable to disallow FIFO tro wrap/roll over - self.set_bitMask(MAX30105_FIFOCONFIG, - MAX30105_ROLLOVER_MASK, - MAX30105_ROLLOVER_DISABLE) - - def setFIFOAlmostFull(self, number_of_samples): - # Set number of samples to trigger the almost full interrupt (page 18) - # Power on default is 32 samples. Note it is reverse: 0x00 is - # 32 samples, 0x0F is 17 samples - self.set_bitMask(MAX30105_FIFOCONFIG, - MAX30105_A_FULL_MASK, - number_of_samples) - - def getWritePointer(self): - # Read the FIFO Write Pointer from the register - wp = self.i2c_read_register(MAX30105_FIFOWRITEPTR) - return wp - - def getReadPointer(self): - # Read the FIFO Read Pointer from the register - wp = self.i2c_read_register(MAX30105_FIFOREADPTR) - return wp - - # Die Temperature method: returns the temperature in C - def readTemperature(self): - # DIE_TEMP_RDY interrupt must be enabled - # Config die temperature register to take 1 temperature sample - self.i2c_set_register(MAX30105_DIETEMPCONFIG, 0x01) - - # Poll for bit to clear, reading is then complete - reading = ord(self.i2c_read_register(MAX30105_INTSTAT2)) - sleep_ms(100); - while ((reading & MAX30105_INT_DIE_TEMP_RDY_ENABLE) > 0): - reading = ord(self.i2c_read_register(MAX30105_INTSTAT2)) - sleep_ms(1); - - # Read die temperature register (integer) - tempInt = ord(self.i2c_read_register(MAX30105_DIETEMPINT)) - # Causes the clearing of the DIE_TEMP_RDY interrupt - tempFrac = ord(self.i2c_read_register(MAX30105_DIETEMPFRAC)) - - # Calculate temperature (datasheet pg. 23) - return float(tempInt) + (float(tempFrac) * 0.0625) - - def setPROXINTTHRESH(self, val): - # Set the PROX_INT_THRESH (see proximity function on datasheet, pag 10) - self.i2c_set_register(MAX30105_PROXINTTHRESH, val) - - # DeviceID and Revision methods - def readPartID(self): - # Load the Device ID from the register - part_id = self.i2c_read_register(MAX30105_PARTID) - return part_id - - def checkPartID(self): - # Checks the correctness of the Device ID - part_id = ord(self.readPartID()) - return part_id == MAX_30105_EXPECTEDPARTID - - def getRevisionID(self): - # Load the Revision ID from the register - rev_id = self.i2c_read_register(MAX30105_REVISIONID) - return ord(rev_id) - - # Time slots management for multi-LED operation mode - def enableSlot(self, slotNumber, device): - # In multi-LED mode, each sample is split into up to four time slots, - # SLOT1 through SLOT4. These control registers determine which LED is - # active in each time slot. (datasheet pag 22) - # Devices are SLOT_RED_LED or SLOT_RED_PILOT (proximity) - # Assigning a SLOT_RED_LED will pulse LED - # Assigning a SLOT_RED_PILOT will detect the proximity - if (slotNumber == 1): - self.bitMask(MAX30105_MULTILEDCONFIG1, MAX30105_SLOT1_MASK, device) - elif (slotNumber == 2): - self.bitMask(MAX30105_MULTILEDCONFIG1, MAX30105_SLOT2_MASK, device << 4) - elif (slotNumber == 3): - self.bitMask(MAX30105_MULTILEDCONFIG2, MAX30105_SLOT3_MASK, device) - elif (slotNumber == 4): - self.bitMask(MAX30105_MULTILEDCONFIG2, MAX30105_SLOT4_MASK, device << 4) - else: - raise ValueError('Wrong slot number:{0}!'.format(slotNumber)) - - def disableSlots(self): - # Clear all the slots assignments - self.i2c_set_register(MAX30105_MULTILEDCONFIG1, 0) - self.i2c_set_register(MAX30105_MULTILEDCONFIG2, 0) - - # Low-level I2C Communication - def i2c_read_register(self, REGISTER, n_bytes=1): - self._i2c.writeto(self._address, bytearray([REGISTER])) - return self._i2c.readfrom(self._address, n_bytes) - - def i2c_set_register(self, REGISTER, VALUE): - self._i2c.writeto(self._address, bytearray([REGISTER, VALUE])) - return - - # Given a register, read it, mask it, and then set the thing - def set_bitMask(self, REGISTER, MASK, NEW_VALUES): - newCONTENTS = (ord(self.i2c_read_register(REGISTER)) & MASK) | NEW_VALUES - self.i2c_set_register(REGISTER, newCONTENTS) - return - - # Given a register, read it and mask it - def bitMask(self, reg, slotMask, thing): - originalContents = ord(self.i2c_read_register(reg)) - originalContents = originalContents & slotMask - self.i2c_set_register(reg, originalContents | thing) - - def FIFO_bytes_to_int(self, FIFO_bytes): - value = unpack(">i", b'\x00' + FIFO_bytes) - return (value[0] & 0x3FFFF) >> self._pulseWidth - - # Returns how many samples are available - def available(self): - numberOfSamples = len(self.sense.red) - return numberOfSamples - - # Get a new red value - def getRed(self): - # Check the sensor for new data for 250ms - if (self.safeCheck(250)): - return self.sense.red.popHead() - else: - # Sensor failed to find new data - return 0 - - # Get a new IR value - def getIR(self): - # Check the sensor for new data for 250ms - if (self.safeCheck(250)): - return self.sense.IR.popHead() - else: - # Sensor failed to find new data - return 0 - - # Get a new green value - def getGreen(self): - # Check the sensor for new data for 250ms - if (self.safeCheck(250)): - return self.sense.green.popHead() - else: - # Sensor failed to find new data - return 0 - - # Note: the following 3 functions are the equivalent of using 'getFIFO' - # methods of the SparkFun library - # Pops the next red value in storage (if available) - def popRedFromStorage(self): - if (len(self.sense.red) == 0): - return 0 - else: - return self.sense.red.pop() - - # Pops the next IR value in storage (if available) - def popIRFromStorage(self): - if (len(self.sense.IR) == 0): - return 0 - else: - return self.sense.IR.pop() - - # Pops the next green value in storage (if available) - def popGreenFromStorage(self): - if (len(self.sense.green) == 0): - return 0 - else: - return self.sense.green.pop() - - # (useless - for comparison purposes only) - def nextSample(self): - if(self.available()): - # With respect to the SparkFun library, using a deque object - # allows us to avoid manually advancing of the tail - return True - - # Polls the sensor for new data - def check(self): - TAG = "check" - - # Call continuously to poll the sensor for new data. - readPointer = ord(self.getReadPointer()) - writePointer = ord(self.getWritePointer()) - numberOfSamples = 0 - - # Do we have new data? - if (readPointer != writePointer): - # Calculate the number of readings we need to get from sensor - numberOfSamples = writePointer - readPointer - - # Wrap condition (return to the beginning of 32 samples) - if (numberOfSamples < 0): - numberOfSamples += 32 - - for i in range(numberOfSamples): - # Read a number of bytes equal to activeLEDs*3 (= 1 sample) - fifo_bytes = self.i2c_read_register(MAX30105_FIFODATA, - self._multiLedReadMode) - - # Convert the readings from bytes to integers, depending - # on the number of active LEDs - if (self._activeLEDs > 0): - self.sense.red.append( - self.FIFO_bytes_to_int(fifo_bytes[0:3]) - ) - - if (self._activeLEDs > 1): - self.sense.IR.append( - self.FIFO_bytes_to_int(fifo_bytes[3:6]) - ) - - if (self._activeLEDs > 2): - self.sense.green.append( - self.FIFO_bytes_to_int(fifo_bytes[6:9]) - ) - - return True - - else: - return False - - # Check for new data but give up after a certain amount of time - def safeCheck(self, maxTimeToCheck): - markTime = ticks_ms() - while(True): - if (ticks_diff(ticks_ms(), markTime) > maxTimeToCheck): - # Timeout reached - return False - if (self.check() == True): - # new data found - return True - sleep_ms(1) diff --git a/max30102/circular_buffer.py b/max30102/circular_buffer.py new file mode 100644 index 0000000..2e33ea9 --- /dev/null +++ b/max30102/circular_buffer.py @@ -0,0 +1,41 @@ +from ucollections import deque + + +class CircularBuffer(object): + ''' Very simple implementation of a circular buffer based on deque ''' + def __init__(self, max_size): + self.data = deque((), max_size, True) + self.max_size = max_size + + def __len__(self): + return len(self.data) + + def is_empty(self): + return not bool(self.data) + + def append(self, item): + try: + self.data.append(item) + except IndexError: + # deque full, popping 1st item out + self.data.popleft() + self.data.append(item) + + def pop(self): + return self.data.popleft() + + def clear(self): + self.data = deque((), self.max_size, True) + + def pop_head(self): + buffer_size = len(self.data) + temp = self.data + if buffer_size == 1: + pass + elif buffer_size > 1: + self.data.clear() + for x in range(buffer_size - 1): + self.data = temp.popleft() + else: + return 0 + return temp.popleft() \ No newline at end of file diff --git a/max30102/max30102.py b/max30102/max30102.py new file mode 100644 index 0000000..466d6ab --- /dev/null +++ b/max30102/max30102.py @@ -0,0 +1,717 @@ +# This work is a lot based on: +# - https://github.com/sparkfun/SparkFun_MAX3010x_Sensor_Library +# Written by Peter Jansen and Nathan Seidle (SparkFun) +# This is a library written for the Maxim MAX30105 Optical Smoke Detector +# It should also work with the MAX30105, which has a Green LED, too. +# These sensors use I2C to communicate, as well as a single (optional) +# interrupt line that is not currently supported in this driver. +# Written by Peter Jansen and Nathan Seidle (SparkFun) +# BSD license, all text above must be included in any redistribution. +# +# - https://github.com/kandizzy/esp32-micropython/blob/master/PPG/ppg/MAX30105.py +# A port of the library to MicroPython by kandizzy +# +# With this driver, I want to give almost full access to Maxim MAX30102 sensor +# functionalities. +# This code is being tested on TinyPico Board with Maxim genuine sensors. +# n-elia + +from machine import Pin, SoftI2C +from ustruct import unpack +from utime import sleep_ms, ticks_diff, ticks_ms + +from circular_buffer import CircularBuffer + +# These I2C default settings work for TinyPico (ESP32-based board) +MAX3010X_I2C_ADDRESS = 0x57 +I2C_SPEED_FAST = 400000 # 400kHz speed +I2C_SPEED_NORMAL = 100000 # 100kHz speed +I2C_DEF_SDA_PIN = 21 +I2C_DEF_SCL_PIN = 22 + +# Status Registers +MAX30105_INT_STAT_1 = 0x00 +MAX30105_INT_STAT_2 = 0x01 +MAX30105_INT_ENABLE_1 = 0x02 +MAX30105_INT_ENABLE_2 = 0x03 + +# FIFO Registers +MAX30105_FIFO_WRITE_PTR = 0x04 +MAX30105_FIFO_OVERFLOW = 0x05 +MAX30105_FIFO_READ_PTR = 0x06 +MAX30105_FIFO_DATA = 0x07 + +# Configuration Registers +MAX30105_FIFO_CONFIG = 0x08 +MAX30105_MODE_CONFIG = 0x09 +MAX30105_PARTICLE_CONFIG = 0x0A # Sometimes listed as 'SPO2' in datasheet (pag.11) +MAX30105_LED1_PULSE_AMP = 0x0C # IR +MAX30105_LED2_PULSE_AMP = 0x0D # RED +MAX30105_LED3_PULSE_AMP = 0x0E # GREEN (when available) +MAX30105_LED_PROX_AMP = 0x10 +MAX30105_MULTI_LED_CONFIG_1 = 0x11 +MAX30105_MULTI_LED_CONFIG_2 = 0x12 + +# Die Temperature Registers +MAX30105_DIE_TEMP_INT = 0x1F +MAX30105_DIE_TEMP_FRAC = 0x20 +MAX30105_DIE_TEMP_CONFIG = 0x21 + +# Proximity Function Registers +MAX30105_PROX_INT_THRESH = 0x30 + +# Part ID Registers +MAX30105_REVISION_ID = 0xFE +MAX30105_PART_ID = 0xFF # Should always be 0x15. Identical for MAX30102. + +# MAX30105 Commands +# Interrupt configuration (datasheet pag 13, 14) +MAX30105_INT_A_FULL_MASK = ~0b10000000 +MAX30105_INT_A_FULL_ENABLE = 0x80 +MAX30105_INT_A_FULL_DISABLE = 0x00 + +MAX30105_INT_DATA_RDY_MASK = ~0b01000000 +MAX30105_INT_DATA_RDY_ENABLE = 0x40 +MAX30105_INT_DATA_RDY_DISABLE = 0x00 + +MAX30105_INT_ALC_OVF_MASK = ~0b00100000 +MAX30105_INT_ALC_OVF_ENABLE = 0x20 +MAX30105_INT_ALC_OVF_DISABLE = 0x00 + +MAX30105_INT_PROX_INT_MASK = ~0b00010000 +MAX30105_INT_PROX_INT_ENABLE = 0x10 +MAX30105_INT_PROX_INT_DISABLE = 0x00 + +MAX30105_INT_DIE_TEMP_RDY_MASK = ~0b00000010 +MAX30105_INT_DIE_TEMP_RDY_ENABLE = 0x02 +MAX30105_INT_DIE_TEMP_RDY_DISABLE = 0x00 + +# FIFO data queue configuration +MAX30105_SAMPLE_AVG_MASK = ~0b11100000 +MAX30105_SAMPLE_AVG_1 = 0x00 +MAX30105_SAMPLE_AVG_2 = 0x20 +MAX30105_SAMPLE_AVG_4 = 0x40 +MAX30105_SAMPLE_AVG_8 = 0x60 +MAX30105_SAMPLE_AVG_16 = 0x80 +MAX30105_SAMPLE_AVG_32 = 0xA0 + +MAX30105_ROLLOVER_MASK = 0xEF +MAX30105_ROLLOVER_ENABLE = 0x10 +MAX30105_ROLLOVER_DISABLE = 0x00 +# Mask for 'almost full' interrupt (defaults to 32 samples) +MAX30105_A_FULL_MASK = 0xF0 + +# Mode configuration commands (page 19) +MAX30105_SHUTDOWN_MASK = 0x7F +MAX30105_SHUTDOWN = 0x80 +MAX30105_WAKEUP = 0x00 +MAX30105_RESET_MASK = 0xBF +MAX30105_RESET = 0x40 + +MAX30105_MODE_MASK = 0xF8 +MAX30105_MODE_RED_ONLY = 0x02 +MAX30105_MODE_RED_IR_ONLY = 0x03 +MAX30105_MODE_MULTI_LED = 0x07 + +# Particle sensing configuration commands (pgs 19-20) +MAX30105_ADC_RANGE_MASK = 0x9F +MAX30105_ADC_RANGE_2048 = 0x00 +MAX30105_ADC_RANGE_4096 = 0x20 +MAX30105_ADC_RANGE_8192 = 0x40 +MAX30105_ADC_RANGE_16384 = 0x60 + +MAX30105_SAMPLERATE_MASK = 0xE3 +MAX30105_SAMPLERATE_50 = 0x00 +MAX30105_SAMPLERATE_100 = 0x04 +MAX30105_SAMPLERATE_200 = 0x08 +MAX30105_SAMPLERATE_400 = 0x0C +MAX30105_SAMPLERATE_800 = 0x10 +MAX30105_SAMPLERATE_1000 = 0x14 +MAX30105_SAMPLERATE_1600 = 0x18 +MAX30105_SAMPLERATE_3200 = 0x1C + +MAX30105_PULSE_WIDTH_MASK = 0xFC +MAX30105_PULSE_WIDTH_69 = 0x00 +MAX30105_PULSE_WIDTH_118 = 0x01 +MAX30105_PULSE_WIDTH_215 = 0x02 +MAX30105_PULSE_WIDTH_411 = 0x03 + +# LED brightness level. It affects the distance of detection. +MAX30105_PULSE_AMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch +MAX30105_PULSE_AMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch +MAX30105_PULSE_AMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch +MAX30105_PULSE_AMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch + +# Multi-LED Mode configuration (datasheet pag 22) +MAX30105_SLOT1_MASK = 0xF8 +MAX30105_SLOT2_MASK = 0x8F +MAX30105_SLOT3_MASK = 0xF8 +MAX30105_SLOT4_MASK = 0x8F +SLOT_NONE = 0x00 +SLOT_RED_LED = 0x01 +SLOT_IR_LED = 0x02 +SLOT_GREEN_LED = 0x03 +SLOT_NONE_PILOT = 0x04 +SLOT_RED_PILOT = 0x05 +SLOT_IR_PILOT = 0x06 +SLOT_GREEN_PILOT = 0x07 + +MAX_30105_EXPECTED_PART_ID = 0x15 + +TAG = 'MAX30105' + +# Size of the queued readings +STORAGE_QUEUE_SIZE = 4 + + +# Data structure to hold the last readings +class SensorData: + def __init__(self): + self.red = CircularBuffer(STORAGE_QUEUE_SIZE) + self.IR = CircularBuffer(STORAGE_QUEUE_SIZE) + self.green = CircularBuffer(STORAGE_QUEUE_SIZE) + + +# Sensor class +class MAX30102(object): + def __init__(self, + i2cHexAddress=MAX3010X_I2C_ADDRESS, + i2c=SoftI2C(sda=Pin(I2C_DEF_SDA_PIN), + scl=Pin(I2C_DEF_SCL_PIN), + freq=I2C_SPEED_FAST) + ): + self._address = i2cHexAddress + self._i2c = i2c + self._active_leds = None + self._pulse_width = None + self._multi_led_read_mode = None + # Store current config values to compute acquisition frequency + self._sample_rate = None + self._sample_avg = None + self._acq_frequency = None + self._acq_frequency_inv = None + # Circular buffer of readings from the sensor + self.sense = SensorData() + + try: + self._i2c.readfrom(self._address, 1) + except OSError as error: + raise SystemExit(error) + + if not (self.check_part_id()): + raise SystemExit() + + # Sensor setup method + def setup_sensor(self, LED_MODE=2, ADC_RANGE=16384, SAMPLE_RATE=400, + LED_POWER=MAX30105_PULSE_AMP_HIGH, SAMPLE_AVG=8, + PULSE_WIDTH=411): + # Reset the sensor's registers from previous configurations + self.soft_reset() + + # Set the number of samples to be averaged by the chip to 8 + self.set_fifo_average(SAMPLE_AVG) + + # Allow FIFO queues to wrap/roll over + self.enable_fifo_rollover() + + # Set the LED mode to the default value of 2 (RED + IR) + # Note: the 3rd mode is available only with MAX30105 + self.set_led_mode(LED_MODE) + + # Set the ADC range to default value of 16384 + self.set_adc_range(ADC_RANGE) + + # Set the sample rate to the default value of 400 + self.set_sample_rate(SAMPLE_RATE) + + # Set the Pulse Width to the default value of 411 + self.set_pulse_width(PULSE_WIDTH) + + # Set the LED brightness to the default value of 'low' + self.set_pulse_amplitude_red(LED_POWER) + self.set_pulse_amplitude_it(LED_POWER) + self.set_pulse_amplitude_green(LED_POWER) + self.set_pulse_amplitude_proximity(LED_POWER) + + # Clears the FIFO + self.clear_fifo() + + def __del__(self): + self.shutdown() + + # Methods to read the two interrupt flags + def get_int_1(self): + # Load the Interrupt 1 status (configurable) from the register + rev_id = self.i2c_read_register(MAX30105_INT_STAT_1) + return rev_id + + def get_int_2(self): + # Load the Interrupt 2 status (DIE_TEMP_DRY) from the register + rev_id = self.i2c_read_register(MAX30105_INT_STAT_2) + return rev_id + + # Methods to set up the interrupt flags + def enable_a_full(self): + # Enable the almost full interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_A_FULL_MASK, MAX30105_INT_A_FULL_ENABLE) + + def disable_a_full(self): + # Disable the almost full interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_A_FULL_MASK, MAX30105_INT_A_FULL_DISABLE) + + def enable_data_rdy(self): + # Enable the new FIFO data ready interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_DATA_RDY_MASK, MAX30105_INT_DATA_RDY_ENABLE) + + def disable_data_rdy(self): + # Disable the new FIFO data ready interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_DATA_RDY_MASK, MAX30105_INT_DATA_RDY_DISABLE) + + def enable_alc_ovf(self): + # Enable the ambient light limit interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_ALC_OVF_MASK, MAX30105_INT_ALC_OVF_ENABLE) + + def disable_alc_ovf(self): + # Disable the ambient light limit interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_ALC_OVF_MASK, MAX30105_INT_ALC_OVF_DISABLE) + + def enable_prox_int(self): + # Enable the proximity interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_PROX_INT_MASK, MAX30105_INT_PROX_INT_ENABLE) + + def disable_prox_int(self): + # Disable the proximity interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_1, MAX30105_INT_PROX_INT_MASK, MAX30105_INT_PROX_INT_DISABLE) + + def enable_die_temp_rdy(self): + # Enable the die temp. conversion finish interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_2, MAX30105_INT_DIE_TEMP_RDY_MASK, MAX30105_INT_DIE_TEMP_RDY_ENABLE) + + def disable_die_temp_rdy(self): + # Disable the die temp. conversion finish interrupt (datasheet pag. 13) + self.bitmask(MAX30105_INT_ENABLE_2, MAX30105_INT_DIE_TEMP_RDY_MASK, MAX30105_INT_DIE_TEMP_RDY_DISABLE) + + # Configuration reset + + def soft_reset(self): + # When the RESET bit is set to one, all configuration, threshold, + # and data registers are reset to their power-on-state through + # a power-on reset. The RESET bit is cleared automatically back to zero + # after the reset sequence is completed. (datasheet pag. 19) + self.set_bitmask(MAX30105_MODE_CONFIG, MAX30105_RESET_MASK, MAX30105_RESET) + curr_status = -1 + while not ((curr_status & MAX30105_RESET) == 0): + sleep_ms(10) + curr_status = ord(self.i2c_read_register(MAX30105_MODE_CONFIG)) + + # Power states methods + def shutdown(self): + # Put IC into low power mode (datasheet pg. 19) + # During shutdown the IC will continue to respond to I2C commands but + # will not update with or take new readings (such as temperature). + self.set_bitmask(MAX30105_MODE_CONFIG, MAX30105_SHUTDOWN_MASK, MAX30105_SHUTDOWN) + + def wakeup(self): + # Pull IC out of low power mode (datasheet pg. 19) + self.set_bitmask(MAX30105_MODE_CONFIG, MAX30105_SHUTDOWN_MASK, MAX30105_WAKEUP) + + # LED Configuration + + def set_led_mode(self, LED_mode): + # Set LED mode: select which LEDs are used for sampling + # Options: RED only, RED + IR only, or ALL (datasheet pag. 19) + if LED_mode == 1: + self.set_bitmask(MAX30105_MODE_CONFIG, MAX30105_MODE_MASK, MAX30105_MODE_RED_ONLY) + elif LED_mode == 2: + self.set_bitmask(MAX30105_MODE_CONFIG, MAX30105_MODE_MASK, MAX30105_MODE_RED_IR_ONLY) + elif LED_mode == 3: + self.set_bitmask(MAX30105_MODE_CONFIG, MAX30105_MODE_MASK, MAX30105_MODE_MULTI_LED) + else: + raise ValueError('Wrong LED mode:{0}!'.format(LED_mode)) + + # Multi-LED Mode Configuration: enable the reading of the LEDs + # depending on the chosen mode + self.enable_slot(1, SLOT_RED_LED) + if LED_mode > 1: + self.enable_slot(2, SLOT_IR_LED) + if LED_mode > 2: + self.enable_slot(3, SLOT_GREEN_LED) + + # Store the LED mode used to control how many bytes to read from + # FIFO buffer in multiLED mode: a sample is made of 3 bytes + self._active_leds = LED_mode + self._multi_led_read_mode = LED_mode * 3 + + # ADC Configuration + def set_adc_range(self, ADC_range): + # ADC range: set the range of the conversion + # Options: 2048, 4096, 8192, 16384 + # Current draw: 7.81pA. 15.63pA, 31.25pA, 62.5pA per LSB. + if ADC_range == 2048: + r = MAX30105_ADC_RANGE_2048 + elif ADC_range == 4096: + r = MAX30105_ADC_RANGE_4096 + elif ADC_range == 8192: + r = MAX30105_ADC_RANGE_8192 + elif ADC_range == 16384: + r = MAX30105_ADC_RANGE_16384 + else: + raise ValueError('Wrong ADC range:{0}!'.format(ADC_range)) + + self.set_bitmask(MAX30105_PARTICLE_CONFIG, MAX30105_ADC_RANGE_MASK, r) + + # Sample Rate Configuration + def set_sample_rate(self, sample_rate): + # Sample rate: select the number of samples taken per second. + # Options: 50, 100, 200, 400, 800, 1000, 1600, 3200 + # Note: in theory, the resulting acquisition frequency for the end user + # is sampleRate/sampleAverage. However, it is worth testing it before + # assuming that the sensor can effectively sustain that frequency + # given its configuration. + if sample_rate == 50: + sr = MAX30105_SAMPLERATE_50 + elif sample_rate == 100: + sr = MAX30105_SAMPLERATE_100 + elif sample_rate == 200: + sr = MAX30105_SAMPLERATE_200 + elif sample_rate == 400: + sr = MAX30105_SAMPLERATE_400 + elif sample_rate == 800: + sr = MAX30105_SAMPLERATE_800 + elif sample_rate == 1000: + sr = MAX30105_SAMPLERATE_1000 + elif sample_rate == 1600: + sr = MAX30105_SAMPLERATE_1600 + elif sample_rate == 3200: + sr = MAX30105_SAMPLERATE_3200 + else: + raise ValueError('Wrong sample rate:{0}!'.format(sample_rate)) + + self.set_bitmask(MAX30105_PARTICLE_CONFIG, MAX30105_SAMPLERATE_MASK, sr) + + # Store the sample rate and recompute the acq. freq. + self._sample_rate = sample_rate + self.update_acquisition_frequency() + + # Pulse width Configuration + def set_pulse_width(self, pulse_width): + # Pulse width of LEDs: The longer the pulse width the longer range of + # detection. At 69us and 0.4mA it's about 2 inches, + # at 411us and 0.4mA it's about 6 inches. + if pulse_width == 69: + pw = MAX30105_PULSE_WIDTH_69 + elif pulse_width == 118: + pw = MAX30105_PULSE_WIDTH_118 + elif pulse_width == 215: + pw = MAX30105_PULSE_WIDTH_215 + elif pulse_width == 411: + pw = MAX30105_PULSE_WIDTH_411 + else: + raise ValueError('Wrong pulse width:{0}!'.format(pulse_width)) + self.set_bitmask(MAX30105_PARTICLE_CONFIG, MAX30105_PULSE_WIDTH_MASK, pw) + + # Store the pulse width + self._pulse_width = pw + + # LED Pulse Amplitude Configuration methods + def set_active_leds_amplitude(self, amplitude): + if self._active_leds > 0: + self.set_pulse_amplitude_red(amplitude) + if self._active_leds > 1: + self.set_pulse_amplitude_it(amplitude) + if self._active_leds > 2: + self.set_pulse_amplitude_green(amplitude) + + def set_pulse_amplitude_red(self, amplitude): + self.i2c_set_register(MAX30105_LED1_PULSE_AMP, amplitude) + + def set_pulse_amplitude_it(self, amplitude): + self.i2c_set_register(MAX30105_LED2_PULSE_AMP, amplitude) + + def set_pulse_amplitude_green(self, amplitude): + self.i2c_set_register(MAX30105_LED3_PULSE_AMP, amplitude) + + def set_pulse_amplitude_proximity(self, amplitude): + self.i2c_set_register(MAX30105_LED_PROX_AMP, amplitude) + + def set_proximity_threshold(self, thresh_msb): + # Set the IR ADC count that will trigger the beginning of particle- + # sensing mode.The threshMSB signifies only the 8 most significant-bits + # of the ADC count. (datasheet page 24) + self.i2c_set_register(MAX30105_PROX_INT_THRESH, thresh_msb) + + # FIFO averaged samples number Configuration + def set_fifo_average(self, number_of_samples): + # FIFO sample avg: set the number of samples to be averaged by the chip. + # Options: MAX30105_SAMPLE_AVG_1, 2, 4, 8, 16, 32 + if number_of_samples == 1: + ns = MAX30105_SAMPLE_AVG_1 + elif number_of_samples == 2: + ns = MAX30105_SAMPLE_AVG_2 + elif number_of_samples == 4: + ns = MAX30105_SAMPLE_AVG_4 + elif number_of_samples == 8: + ns = MAX30105_SAMPLE_AVG_8 + elif number_of_samples == 16: + ns = MAX30105_SAMPLE_AVG_16 + elif number_of_samples == 32: + ns = MAX30105_SAMPLE_AVG_32 + else: + raise ValueError( + 'Wrong number of samples:{0}!'.format(number_of_samples)) + self.set_bitmask(MAX30105_FIFO_CONFIG, MAX30105_SAMPLE_AVG_MASK, ns) + + # Store the number of averaged samples and recompute the acq. freq. + self._sample_avg = number_of_samples + self.update_acquisition_frequency() + + def update_acquisition_frequency(self): + if None in [self._sample_rate, self._sample_avg]: + return + else: + self._acq_frequency = self._sample_rate / self._sample_avg + from math import ceil + + # Compute the time interval to wait before taking a good measure + # (see note in setSampleRate() method) + self._acq_frequency_inv = int(ceil(1000 / self._acq_frequency)) + + def get_acquisition_frequency(self): + return self._acq_frequency + + def clear_fifo(self): + # Resets all points to start in a known state + # Datasheet page 15 recommends clearing FIFO before beginning a read + self.i2c_set_register(MAX30105_FIFO_WRITE_PTR, 0) + self.i2c_set_register(MAX30105_FIFO_OVERFLOW, 0) + self.i2c_set_register(MAX30105_FIFO_READ_PTR, 0) + + def enable_fifo_rollover(self): + # FIFO rollover: enable to allow FIFO tro wrap/roll over + self.set_bitmask(MAX30105_FIFO_CONFIG, MAX30105_ROLLOVER_MASK, MAX30105_ROLLOVER_ENABLE) + + def disable_fifo_rollover(self): + # FIFO rollover: disable to disallow FIFO tro wrap/roll over + self.set_bitmask(MAX30105_FIFO_CONFIG, MAX30105_ROLLOVER_MASK, MAX30105_ROLLOVER_DISABLE) + + def set_fifo_almost_full(self, number_of_samples): + # Set number of samples to trigger the almost full interrupt (page 18) + # Power on default is 32 samples. Note it is reverse: 0x00 is + # 32 samples, 0x0F is 17 samples + self.set_bitmask(MAX30105_FIFO_CONFIG, MAX30105_A_FULL_MASK, number_of_samples) + + def get_write_pointer(self): + # Read the FIFO Write Pointer from the register + wp = self.i2c_read_register(MAX30105_FIFO_WRITE_PTR) + return wp + + def get_read_pointer(self): + # Read the FIFO Read Pointer from the register + wp = self.i2c_read_register(MAX30105_FIFO_READ_PTR) + return wp + + # Die Temperature method: returns the temperature in C + def read_temperature(self): + # DIE_TEMP_RDY interrupt must be enabled + # Config die temperature register to take 1 temperature sample + self.i2c_set_register(MAX30105_DIE_TEMP_CONFIG, 0x01) + + # Poll for bit to clear, reading is then complete + reading = ord(self.i2c_read_register(MAX30105_INT_STAT_2)) + sleep_ms(100) + while (reading & MAX30105_INT_DIE_TEMP_RDY_ENABLE) > 0: + reading = ord(self.i2c_read_register(MAX30105_INT_STAT_2)) + sleep_ms(1) + + # Read die temperature register (integer) + tempInt = ord(self.i2c_read_register(MAX30105_DIE_TEMP_INT)) + # Causes the clearing of the DIE_TEMP_RDY interrupt + tempFrac = ord(self.i2c_read_register(MAX30105_DIE_TEMP_FRAC)) + + # Calculate temperature (datasheet pg. 23) + return float(tempInt) + (float(tempFrac) * 0.0625) + + def set_prox_int_tresh(self, val): + # Set the PROX_INT_THRESH (see proximity function on datasheet, pag 10) + self.i2c_set_register(MAX30105_PROX_INT_THRESH, val) + + # DeviceID and Revision methods + def read_part_id(self): + # Load the Device ID from the register + part_id = self.i2c_read_register(MAX30105_PART_ID) + return part_id + + def check_part_id(self): + # Checks the correctness of the Device ID + part_id = ord(self.read_part_id()) + return part_id == MAX_30105_EXPECTED_PART_ID + + def get_revision_id(self): + # Load the Revision ID from the register + rev_id = self.i2c_read_register(MAX30105_REVISION_ID) + return ord(rev_id) + + # Time slots management for multi-LED operation mode + def enable_slot(self, slot_number, device): + # In multi-LED mode, each sample is split into up to four time slots, + # SLOT1 through SLOT4. These control registers determine which LED is + # active in each time slot. (datasheet pag 22) + # Devices are SLOT_RED_LED or SLOT_RED_PILOT (proximity) + # Assigning a SLOT_RED_LED will pulse LED + # Assigning a SLOT_RED_PILOT will detect the proximity + if slot_number == 1: + self.bitmask(MAX30105_MULTI_LED_CONFIG_1, MAX30105_SLOT1_MASK, device) + elif slot_number == 2: + self.bitmask(MAX30105_MULTI_LED_CONFIG_1, MAX30105_SLOT2_MASK, device << 4) + elif slot_number == 3: + self.bitmask(MAX30105_MULTI_LED_CONFIG_2, MAX30105_SLOT3_MASK, device) + elif slot_number == 4: + self.bitmask(MAX30105_MULTI_LED_CONFIG_2, MAX30105_SLOT4_MASK, device << 4) + else: + raise ValueError('Wrong slot number:{0}!'.format(slot_number)) + + def disable_slots(self): + # Clear all the slots assignments + self.i2c_set_register(MAX30105_MULTI_LED_CONFIG_1, 0) + self.i2c_set_register(MAX30105_MULTI_LED_CONFIG_2, 0) + + # Low-level I2C Communication + def i2c_read_register(self, REGISTER, n_bytes=1): + self._i2c.writeto(self._address, bytearray([REGISTER])) + return self._i2c.readfrom(self._address, n_bytes) + + def i2c_set_register(self, REGISTER, VALUE): + self._i2c.writeto(self._address, bytearray([REGISTER, VALUE])) + return + + # Given a register, read it, mask it, and then set the thing + def set_bitmask(self, REGISTER, MASK, NEW_VALUES): + newCONTENTS = (ord(self.i2c_read_register(REGISTER)) & MASK) | NEW_VALUES + self.i2c_set_register(REGISTER, newCONTENTS) + return + + # Given a register, read it and mask it + def bitmask(self, reg, slotMask, thing): + originalContents = ord(self.i2c_read_register(reg)) + originalContents = originalContents & slotMask + self.i2c_set_register(reg, originalContents | thing) + + def fifo_bytes_to_int(self, fifo_bytes): + value = unpack(">i", b'\x00' + fifo_bytes) + return (value[0] & 0x3FFFF) >> self._pulse_width + + # Returns how many samples are available + def available(self): + number_of_samples = len(self.sense.red) + return number_of_samples + + # Get a new red value + def get_red(self): + # Check the sensor for new data for 250ms + if self.safe_check(250): + return self.sense.red.pop_head() + else: + # Sensor failed to find new data + return 0 + + # Get a new IR value + def get_ir(self): + # Check the sensor for new data for 250ms + if self.safe_check(250): + return self.sense.IR.pop_head() + else: + # Sensor failed to find new data + return 0 + + # Get a new green value + def get_green(self): + # Check the sensor for new data for 250ms + if self.safe_check(250): + return self.sense.green.pop_head() + else: + # Sensor failed to find new data + return 0 + + # Note: the following 3 functions are the equivalent of using 'getFIFO' + # methods of the SparkFun library + # Pops the next red value in storage (if available) + def pop_red_from_storage(self): + if len(self.sense.red) == 0: + return 0 + else: + return self.sense.red.pop() + + # Pops the next IR value in storage (if available) + def pop_ir_from_storage(self): + if len(self.sense.IR) == 0: + return 0 + else: + return self.sense.IR.pop() + + # Pops the next green value in storage (if available) + def pop_green_from_storage(self): + if len(self.sense.green) == 0: + return 0 + else: + return self.sense.green.pop() + + # (useless - for comparison purposes only) + def next_sample(self): + if self.available(): + # With respect to the SparkFun library, using a deque object + # allows us to avoid manually advancing of the tail + return True + + # Polls the sensor for new data + def check(self): + # Call continuously to poll the sensor for new data. + read_pointer = ord(self.get_read_pointer()) + write_pointer = ord(self.get_write_pointer()) + + # Do we have new data? + if read_pointer != write_pointer: + # Calculate the number of readings we need to get from sensor + number_of_samples = write_pointer - read_pointer + + # Wrap condition (return to the beginning of 32 samples) + if number_of_samples < 0: + number_of_samples += 32 + + for i in range(number_of_samples): + # Read a number of bytes equal to activeLEDs*3 (= 1 sample) + fifo_bytes = self.i2c_read_register(MAX30105_FIFO_DATA, + self._multi_led_read_mode) + + # Convert the readings from bytes to integers, depending + # on the number of active LEDs + if self._active_leds > 0: + self.sense.red.append( + self.fifo_bytes_to_int(fifo_bytes[0:3]) + ) + + if self._active_leds > 1: + self.sense.IR.append( + self.fifo_bytes_to_int(fifo_bytes[3:6]) + ) + + if self._active_leds > 2: + self.sense.green.append( + self.fifo_bytes_to_int(fifo_bytes[6:9]) + ) + + return True + + else: + return False + + # Check for new data but give up after a certain amount of time + def safe_check(self, max_time_to_check): + mark_time = ticks_ms() + while True: + if ticks_diff(ticks_ms(), mark_time) > max_time_to_check: + # Timeout reached + return False + if self.check(): + # new data found + return True + sleep_ms(1) diff --git a/sdist_upip.py b/sdist_upip.py index ce5877a..4cee05e 100644 --- a/sdist_upip.py +++ b/sdist_upip.py @@ -44,14 +44,14 @@ def gzip_4k(inf, fname): (None, r".+\.egg-info/.+"), ] - outbuf = io.BytesIO() + def filter_tar(name): fin = tarfile.open(name, "r:gz") fout = tarfile.open(fileobj=outbuf, mode="w") for info in fin: -# print(info) + # print(info) if not "/" in info.name: continue fname = info.name.split("/", 1)[1] @@ -82,42 +82,42 @@ def filter_tar(name): def make_resource_module(manifest_files): - resources = [] - # Any non-python file included in manifest is resource - for fname in manifest_files: - ext = fname.rsplit(".", 1) - if len(ext) > 1: - ext = ext[1] - else: - ext = "" - if ext != "py": - resources.append(fname) - - if resources: - print("creating resource module R.py") - resources.sort() - last_pkg = None - r_file = None - for fname in resources: - try: - pkg, res_name = fname.split("/", 1) - except ValueError: - print("not treating %s as a resource" % fname) - continue - if last_pkg != pkg: - last_pkg = pkg - if r_file: - r_file.write("}\n") - r_file.close() - r_file = open(pkg + "/R.py", "w") - r_file.write("R = {\n") - - with open(fname, "rb") as f: - r_file.write("%r: %r,\n" % (res_name, f.read())) - - if r_file: - r_file.write("}\n") - r_file.close() + resources = [] + # Any non-python file included in manifest is resource + for fname in manifest_files: + ext = fname.rsplit(".", 1) + if len(ext) > 1: + ext = ext[1] + else: + ext = "" + if ext != "py": + resources.append(fname) + + if resources: + print("creating resource module R.py") + resources.sort() + last_pkg = None + r_file = None + for fname in resources: + try: + pkg, res_name = fname.split("/", 1) + except ValueError: + print("not treating %s as a resource" % fname) + continue + if last_pkg != pkg: + last_pkg = pkg + if r_file: + r_file.write("}\n") + r_file.close() + r_file = open(pkg + "/R.py", "w") + r_file.write("R = {\n") + + with open(fname, "rb") as f: + r_file.write("%r: %r,\n" % (res_name, f.read())) + + if r_file: + r_file.write("}\n") + r_file.close() class sdist(_sdist): @@ -142,4 +142,4 @@ def run(self): if __name__ == "__main__": filter_tar(sys.argv[1]) outbuf.seek(0) - gzip_4k(outbuf, sys.argv[1]) \ No newline at end of file + gzip_4k(outbuf, sys.argv[1]) diff --git a/setup.py b/setup.py index a38fdfd..10d02d1 100644 --- a/setup.py +++ b/setup.py @@ -3,19 +3,20 @@ setup( name="micropython-max30102", - version="0.3.3", + version="0.3.4", description="MAX30102 driver for micropython.", long_description=open("README.md").read(), long_description_content_type='text/markdown', url="https://github.com/n-elia/MAX30102-MicroPython-driver", license="MIT", - keywords="micropython", + keywords=["micropython", "max30102", "max30105", "esp32"], author="Nicola Elia", maintainer="Nicola Elia", classifiers=[ + "Development Status :: 5 - Production/Stable", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: Implementation :: MicroPython", ],