diff --git a/.travis.yml b/.travis.yml index 2300474..d8ab643 100755 --- a/.travis.yml +++ b/.travis.yml @@ -37,7 +37,8 @@ script: - python -m unittest -v tests.test_url_to_hex - python -m unittest -v tests.test_adapter - python -m unittest -v tests.test_adapter -after_success: - "pycodestyle bluezero" - "pycodestyle examples" - "pycodestyle tests" +after_success: + # diff --git a/README.rst b/README.rst index 2172f81..7e74149 100644 --- a/README.rst +++ b/README.rst @@ -62,7 +62,8 @@ This is a simple example of how to read the Ti Sensortag CC2650 microbit_button.py ****************** -This example +This example simply reads the button values from the micro:bit and displays them on +LEDs attached to the hardware Beacon ------ @@ -82,3 +83,20 @@ fatbeacon.py ************ Experiment with Eddystone FatBeacon. Contains service for beacon to connect to that sends html page. +lightswitch.py +************** +A simple light switch example using an LED and a switch. +Write a value to the switch characteristic to change the state of the light + +blinkt_ble.py +************* +This can be controlled from web page via web bluetooth. +This example advertises the URL via Eddystone URL. Once you attached you can change the +colours of the LEDs on the Pimoroni Blinkt. + +cpu_temperature.py +****************** +This example transmits the temperature of the CPU over the single characteristic. +If your hardware does not support the `vcgencmd` then change the `get_cpu_temperature()` +function to use the randomly generated temperature. +Values are only updated when notification are switched on. diff --git a/bluezero/localGATT.py b/bluezero/localGATT.py index e812b62..0e57559 100644 --- a/bluezero/localGATT.py +++ b/bluezero/localGATT.py @@ -209,7 +209,7 @@ def PropertiesChanged(self, interface, changed, invalidated): This signal is registered with the D-Bus at ``org.freedesktop.DBus.Properties``. """ - pass + print('Service properties changed: ', interface, changed, invalidated) class Characteristic(dbus.service.Object): @@ -267,6 +267,9 @@ def get_path(self): """Return the DBus object path""" return dbus.ObjectPath(self.path) + def add_call_back(self, callback): + self.PropertiesChanged = callback + @dbus.service.method(constants.DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') @@ -325,15 +328,20 @@ def Set(self, interface_name, property_name, value, *args, **kwargs): self.props[constants.GATT_CHRC_IFACE][property_name] = value - if bool(self.props[constants.GATT_CHRC_IFACE]['Notifying']) is True: - self.EmitSignal(dbus.PROPERTIES_IFACE, - 'PropertiesChanged', - 'sa{sv}as', - [interface_name, - dbus.Dictionary({property_name: value}, - signature='sv'), - dbus.Array([], signature='s') - ]) + return self.PropertiesChanged(interface_name, + dbus.Dictionary({property_name: value}, + signature='sv'), + dbus.Array([], signature='s')) + + @dbus.service.signal(constants.DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + """Emit a Properties Changed notification signal. + + This signal is registered with the D-Bus at + ``org.freedesktop.DBus.Properties``. + """ + print('Char Prop Changed') @dbus.service.method(constants.GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') @@ -360,7 +368,7 @@ def StartNotify(self): DBus method for enabling notifications of the characteristic value. :return: value """ - if not self.props['Notifying'] is True: + if not self.props[constants.GATT_CHRC_IFACE]['Notifying'] is True: print('Notifying already, nothing to do') return @@ -375,7 +383,7 @@ def StopNotify(self): DBus method for disabling notifications of the characteristic value. :return: value """ - if self.props['Notifying'] is False: + if self.props[constants.GATT_CHRC_IFACE]['Notifying'] is False: print('Not Notifying, nothing to do') return @@ -494,10 +502,10 @@ def Set(self, interface_name, property_name, value, *args, **kwargs): self.props[interface_name][property_name] = value - self.PropertiesChanged(interface_name, - dbus.Dictionary({property_name: value}, - signature='sv'), - dbus.Array([], signature='s')) + return self.PropertiesChanged(interface_name, + dbus.Dictionary({property_name: value}, + signature='sv'), + dbus.Array([], signature='s')) @dbus.service.signal(constants.DBUS_PROP_IFACE, signature='sa{sv}as') @@ -507,11 +515,11 @@ def PropertiesChanged(self, interface, changed, invalidated): This signal is registered with the D-Bus at ``org.freedesktop.DBus.Properties``. """ - pass + return 0 @dbus.service.method(constants.GATT_DESC_IFACE, - in_signature='', out_signature='v') - def ReadValue(self): + in_signature='a{sv}', out_signature='ay') + def ReadValue(self, options): """ DBus method for getting the characteristic value :return: value @@ -519,8 +527,8 @@ def ReadValue(self): return self.GetAll(constants.GATT_DESC_IFACE)['Value'] @dbus.service.method(constants.GATT_DESC_IFACE, - in_signature='v', out_signature='') - def WriteValue(self, value): + in_signature='aya{sv}', out_signature='') + def WriteValue(self, value, options): """ DBus method for setting the descriptor value :return: diff --git a/examples/level100/blinkt_ble.py b/examples/level100/blinkt_ble.py new file mode 100644 index 0000000..4932991 --- /dev/null +++ b/examples/level100/blinkt_ble.py @@ -0,0 +1,84 @@ +from bluezero import GATT +from bluezero import localGATT +from bluezero import advertisement +from bluezero import adapter +from bluezero import tools +import dbus +from blinkt import set_pixel, set_all, show + +WEB_BLINKT = 'https://goo.gl/wQOjbe' +TX_POWER = 0x08 +EDDYSTONE = 'FEAA' +SERVICE_UUID = '0000FFF0-0000-1000-8000-00805F9B34FB' +CHAR_UUID = '0000FFF3-0000-1000-8000-00805F9B34FB' + + +class blinkt: + def __init__(self, ble): + self.ble = ble + + def on_ble_write(self, *args, **kwargs): + try: + # bytes=[0x07, 0x02, 0x00, 0x01, 0x00, 0x0FF, 0x00] + bytes = args[1]["Value"] + if len(bytes) > 2: + cmd = (bytes[0] << 8) + (bytes[1] & 0xff) + + if cmd == 0x0702: + if len(bytes) >= 7: + set_pixel(bytes[3] - 1, bytes[4], bytes[5], bytes[6]) + elif cmd == 0x0601: + if len(bytes) >= 5: + set_all(bytes[2], bytes[3], bytes[4]) + show() + except Exception as inst: + print(type(inst)) + print(inst.args) + print(inst) + return 0 + + +class ble: + def __init__(self): + self.bus = dbus.SystemBus() + self.app = localGATT.Application() + self.srv = localGATT.Service(1, SERVICE_UUID, True) + + self.charc = localGATT.Characteristic(1, + CHAR_UUID, + self.srv, + [0xBB], + True, + ['write']) + + self.charc.service = self.srv.path + self.app.add_managed_object(self.srv) + self.app.add_managed_object(self.charc) + + self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) + self.srv_mng.register_application(self.app, {}) + + self.dongle = adapter.Adapter(adapter.list_adapters()[0]) + advert = advertisement.Advertisement(1, 'peripheral') + + advert.service_UUIDs = [SERVICE_UUID] + eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER) + advert.service_data = {EDDYSTONE: eddystone_data} + if not self.dongle.powered: + self.dongle.powered = True + ad_manager = advertisement.AdvertisingManager(self.dongle.path) + ad_manager.register_advertisement(advert, {}) + + def add_call_back(self, callback): + self.charc.PropertiesChanged = callback + + def start_bt(self): + # self.light.StartNotify() + tools.start_mainloop() + +if __name__ == '__main__': + link = ble() + blinkt_ble = blinkt(link) + link.charc.add_call_back(blinkt_ble.on_ble_write) + + link.start_bt() diff --git a/examples/level100/cpu_temperature.py b/examples/level100/cpu_temperature.py new file mode 100644 index 0000000..0c61fbf --- /dev/null +++ b/examples/level100/cpu_temperature.py @@ -0,0 +1,145 @@ +# Standard modules +import os +import dbus +from gi.repository import GLib +import random + +# Bluezero modules +from bluezero import tools +from bluezero import constants +from bluezero import adapter +from bluezero import advertisement +from bluezero import localGATT +from bluezero import GATT + +# constants +CPU_TMP_SRVC = '12341000-1234-1234-1234-123456789abc' +CPU_TMP_CHRC = '2A6E' +CPU_FMT_DSCP = '2904' + + +def get_cpu_temperature(): + # return random.randrange(3200, 5310, 10) / 100 + cpu_temp = os.popen('vcgencmd measure_temp').readline() + return float(cpu_temp.replace('temp=', '').replace("'C\n", '')) + + +def sint16(value): + return int(value * 100).to_bytes(2, byteorder='little', signed=True) + + +def cpu_temp_sint16(value): + answer = [] + value_int16 = sint16(value[0]) + for bytes in value_int16: + answer.append(dbus.Byte(bytes)) + + return answer + + +class TemperatureChrc(localGATT.Characteristic): + def __init__(self, service): + localGATT.Characteristic.__init__(self, + 1, + CPU_TMP_CHRC, + service, + [get_cpu_temperature()], + False, + ['read', 'notify']) + + def temperature_cb(self): + reading = [get_cpu_temperature()] + print('Getting new temperature', + reading, + self.props[constants.GATT_CHRC_IFACE]['Notifying']) + self.props[constants.GATT_CHRC_IFACE]['Value'] = reading + + self.PropertiesChanged(constants.GATT_CHRC_IFACE, + {'Value': dbus.Array(cpu_temp_sint16(reading))}, + []) + print('Array value: ', cpu_temp_sint16(reading)) + return self.props[constants.GATT_CHRC_IFACE]['Notifying'] + + def _update_temp_value(self): + if not self.props[constants.GATT_CHRC_IFACE]['Notifying']: + return + + print('Starting timer event') + GLib.timeout_add(500, self.temperature_cb) + + def ReadValue(self, options): + return dbus.Array( + cpu_temp_sint16(self.props[constants.GATT_CHRC_IFACE]['Value']) + ) + + def StartNotify(self): + if self.props[constants.GATT_CHRC_IFACE]['Notifying']: + print('Already notifying, nothing to do') + return + print('Notifying on') + self.props[constants.GATT_CHRC_IFACE]['Notifying'] = True + self._update_temp_value() + + def StopNotify(self): + if not self.props[constants.GATT_CHRC_IFACE]['Notifying']: + print('Not notifying, nothing to do') + return + + print('Notifying off') + self.props[constants.GATT_CHRC_IFACE]['Notifying'] = False + self._update_temp_value() + + +class ble: + def __init__(self): + self.bus = dbus.SystemBus() + self.app = localGATT.Application() + self.srv = localGATT.Service(1, CPU_TMP_SRVC, True) + + self.charc = TemperatureChrc(self.srv) + + self.charc.service = self.srv.path + + cpu_format_value = dbus.Array([dbus.Byte(0x0E), + dbus.Byte(0xFE), + dbus.Byte(0x2F), + dbus.Byte(0x27), + dbus.Byte(0x01), + dbus.Byte(0x00), + dbus.Byte(0x00)]) + self.cpu_format = localGATT.Descriptor(4, + CPU_FMT_DSCP, + self.charc, + cpu_format_value, + ['read']) + + self.app.add_managed_object(self.srv) + self.app.add_managed_object(self.charc) + self.app.add_managed_object(self.cpu_format) + + self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) + self.srv_mng.register_application(self.app, {}) + + self.dongle = adapter.Adapter(adapter.list_adapters()[0]) + advert = advertisement.Advertisement(1, 'peripheral') + + advert.service_UUIDs = [CPU_TMP_SRVC] + # eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER) + # advert.service_data = {EDDYSTONE: eddystone_data} + if not self.dongle.powered: + self.dongle.powered = True + ad_manager = advertisement.AdvertisingManager(self.dongle.path) + ad_manager.register_advertisement(advert, {}) + + def add_call_back(self, callback): + self.charc.PropertiesChanged = callback + + def start_bt(self): + # self.light.StartNotify() + tools.start_mainloop() + +if __name__ == '__main__': + print('CPU temperature is {}C'.format(get_cpu_temperature())) + print(sint16(get_cpu_temperature())) + pi_cpu_monitor = ble() + pi_cpu_monitor.start_bt() diff --git a/examples/level100/light_switch.py b/examples/level100/light_switch.py new file mode 100644 index 0000000..0746181 --- /dev/null +++ b/examples/level100/light_switch.py @@ -0,0 +1,83 @@ +from bluezero import GATT +from bluezero import localGATT +from bluezero import advertisement +from bluezero import adapter +from bluezero import tools +from bluezero import constants + +import dbus + +from gpiozero import LED +from gpiozero import Button +from time import sleep +from signal import pause + +SERVICE_UUID = '12341000-1234-1234-1234-123456789abc' +LIGHT_UUID = '12341002-1234-1234-1234-123456789abc' +SWITCH_UUID = '12341004-1234-1234-1234-123456789abc' + + +class board: + def __init__(self, ble): + self.led = LED(22) + self.button = Button(25) + self.ble = ble + + def switch_led(self, *args, **kwargs): + print(self.led.is_lit, args, kwargs) + if self.led.is_lit: + self.led.off() + self.ble.light.Set(constants.GATT_CHRC_IFACE, 'Value', [0x00]) + else: + self.led.on() + self.ble.light.Set(constants.GATT_CHRC_IFACE, 'Value', [0x01]) + + +class ble: + def __init__(self): + self.bus = dbus.SystemBus() + self.app = localGATT.Application() + self.srv = localGATT.Service(1, SERVICE_UUID, True) + self.light = localGATT.Characteristic(1, + LIGHT_UUID, + self.srv, + [0xAA], + True, + ['read', 'notify']) + self.switch = localGATT.Characteristic(2, + SWITCH_UUID, + self.srv, + [0xBB], + True, + ['read', 'write', 'notify']) + + self.light.service = self.srv.path + self.app.add_managed_object(self.srv) + self.app.add_managed_object(self.light) + self.app.add_managed_object(self.switch) + + self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) + self.srv_mng.register_application(self.app, {}) + + self.dongle = adapter.Adapter(adapter.list_adapters()[0]) + advert = advertisement.Advertisement(1, 'peripheral') + advert.service_UUIDs = [SERVICE_UUID] + if not self.dongle.powered: + self.dongle.powered = True + ad_manager = advertisement.AdvertisingManager(self.dongle.path) + ad_manager.register_advertisement(advert, {}) + + def add_call_back(self, callback): + self.switch.PropertiesChanged = callback + + def start_bt(self): + # self.light.StartNotify() + tools.start_mainloop() + +if __name__ == '__main__': + link = ble() + hat = board(link) + hat.button.when_pressed = hat.switch_led + link.switch.add_call_back(hat.switch_led) + + link.start_bt() diff --git a/run_local_tests.sh b/run_local_tests.sh index 84d6315..9fe2aae 100755 --- a/run_local_tests.sh +++ b/run_local_tests.sh @@ -1,10 +1,25 @@ #!/usr/bin/env bash python -m unittest -v tests.test_url_to_hex +test1=$? python3 -m unittest -v tests.test_url_to_hex +test2=$? python -m unittest -v tests.test_adapter +test3=$? python3 -m unittest -v tests.test_adapter +test4=$? python -m unittest -v tests.test_device +test5=$? python3 -m unittest -v tests.test_device +test6=$? pycodestyle -v bluezero +test7=$? pycodestyle -v examples +test8=$? pycodestyle -v tests +test9=$? + +if [ $((test1 + test2 + test3 + test4 + test5 + test6 + test7 + test8 + test9)) -ne 0 ]; then + echo -e "\n\n### A test has failed!! ###\n" +else + echo -e "\n\nSuccess!!!\n" +fi