-
Notifications
You must be signed in to change notification settings - Fork 112
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Light switch example Send notifications when button is pressed and light changes state * Add write callback * add return values for charc and descs (#82) * Blinkt 100 example (#83) * add return values for charc and descs * added level 100 linkt example * Add eddystone beacon * CPU temperature service * Correct DBus signature for Descriptors * pep8 fixes * Include pycodestyle in pass/fail criteria * Give overall pass/fail when running local tests * Add summary of new examples
- Loading branch information
Showing
7 changed files
with
377 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from bluezero import GATT | ||
from bluezero import localGATT | ||
from bluezero import advertisement | ||
from bluezero import adapter | ||
from bluezero import tools | ||
import dbus | ||
from blinkt import set_pixel, set_all, show | ||
|
||
WEB_BLINKT = 'https://goo.gl/wQOjbe' | ||
TX_POWER = 0x08 | ||
EDDYSTONE = 'FEAA' | ||
SERVICE_UUID = '0000FFF0-0000-1000-8000-00805F9B34FB' | ||
CHAR_UUID = '0000FFF3-0000-1000-8000-00805F9B34FB' | ||
|
||
|
||
class blinkt: | ||
def __init__(self, ble): | ||
self.ble = ble | ||
|
||
def on_ble_write(self, *args, **kwargs): | ||
try: | ||
# bytes=[0x07, 0x02, 0x00, 0x01, 0x00, 0x0FF, 0x00] | ||
bytes = args[1]["Value"] | ||
if len(bytes) > 2: | ||
cmd = (bytes[0] << 8) + (bytes[1] & 0xff) | ||
|
||
if cmd == 0x0702: | ||
if len(bytes) >= 7: | ||
set_pixel(bytes[3] - 1, bytes[4], bytes[5], bytes[6]) | ||
elif cmd == 0x0601: | ||
if len(bytes) >= 5: | ||
set_all(bytes[2], bytes[3], bytes[4]) | ||
show() | ||
except Exception as inst: | ||
print(type(inst)) | ||
print(inst.args) | ||
print(inst) | ||
return 0 | ||
|
||
|
||
class ble: | ||
def __init__(self): | ||
self.bus = dbus.SystemBus() | ||
self.app = localGATT.Application() | ||
self.srv = localGATT.Service(1, SERVICE_UUID, True) | ||
|
||
self.charc = localGATT.Characteristic(1, | ||
CHAR_UUID, | ||
self.srv, | ||
[0xBB], | ||
True, | ||
['write']) | ||
|
||
self.charc.service = self.srv.path | ||
self.app.add_managed_object(self.srv) | ||
self.app.add_managed_object(self.charc) | ||
|
||
self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) | ||
self.srv_mng.register_application(self.app, {}) | ||
|
||
self.dongle = adapter.Adapter(adapter.list_adapters()[0]) | ||
advert = advertisement.Advertisement(1, 'peripheral') | ||
|
||
advert.service_UUIDs = [SERVICE_UUID] | ||
eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER) | ||
advert.service_data = {EDDYSTONE: eddystone_data} | ||
if not self.dongle.powered: | ||
self.dongle.powered = True | ||
ad_manager = advertisement.AdvertisingManager(self.dongle.path) | ||
ad_manager.register_advertisement(advert, {}) | ||
|
||
def add_call_back(self, callback): | ||
self.charc.PropertiesChanged = callback | ||
|
||
def start_bt(self): | ||
# self.light.StartNotify() | ||
tools.start_mainloop() | ||
|
||
if __name__ == '__main__': | ||
link = ble() | ||
blinkt_ble = blinkt(link) | ||
link.charc.add_call_back(blinkt_ble.on_ble_write) | ||
|
||
link.start_bt() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
# Standard modules | ||
import os | ||
import dbus | ||
from gi.repository import GLib | ||
import random | ||
|
||
# Bluezero modules | ||
from bluezero import tools | ||
from bluezero import constants | ||
from bluezero import adapter | ||
from bluezero import advertisement | ||
from bluezero import localGATT | ||
from bluezero import GATT | ||
|
||
# constants | ||
CPU_TMP_SRVC = '12341000-1234-1234-1234-123456789abc' | ||
CPU_TMP_CHRC = '2A6E' | ||
CPU_FMT_DSCP = '2904' | ||
|
||
|
||
def get_cpu_temperature(): | ||
# return random.randrange(3200, 5310, 10) / 100 | ||
cpu_temp = os.popen('vcgencmd measure_temp').readline() | ||
return float(cpu_temp.replace('temp=', '').replace("'C\n", '')) | ||
|
||
|
||
def sint16(value): | ||
return int(value * 100).to_bytes(2, byteorder='little', signed=True) | ||
|
||
|
||
def cpu_temp_sint16(value): | ||
answer = [] | ||
value_int16 = sint16(value[0]) | ||
for bytes in value_int16: | ||
answer.append(dbus.Byte(bytes)) | ||
|
||
return answer | ||
|
||
|
||
class TemperatureChrc(localGATT.Characteristic): | ||
def __init__(self, service): | ||
localGATT.Characteristic.__init__(self, | ||
1, | ||
CPU_TMP_CHRC, | ||
service, | ||
[get_cpu_temperature()], | ||
False, | ||
['read', 'notify']) | ||
|
||
def temperature_cb(self): | ||
reading = [get_cpu_temperature()] | ||
print('Getting new temperature', | ||
reading, | ||
self.props[constants.GATT_CHRC_IFACE]['Notifying']) | ||
self.props[constants.GATT_CHRC_IFACE]['Value'] = reading | ||
|
||
self.PropertiesChanged(constants.GATT_CHRC_IFACE, | ||
{'Value': dbus.Array(cpu_temp_sint16(reading))}, | ||
[]) | ||
print('Array value: ', cpu_temp_sint16(reading)) | ||
return self.props[constants.GATT_CHRC_IFACE]['Notifying'] | ||
|
||
def _update_temp_value(self): | ||
if not self.props[constants.GATT_CHRC_IFACE]['Notifying']: | ||
return | ||
|
||
print('Starting timer event') | ||
GLib.timeout_add(500, self.temperature_cb) | ||
|
||
def ReadValue(self, options): | ||
return dbus.Array( | ||
cpu_temp_sint16(self.props[constants.GATT_CHRC_IFACE]['Value']) | ||
) | ||
|
||
def StartNotify(self): | ||
if self.props[constants.GATT_CHRC_IFACE]['Notifying']: | ||
print('Already notifying, nothing to do') | ||
return | ||
print('Notifying on') | ||
self.props[constants.GATT_CHRC_IFACE]['Notifying'] = True | ||
self._update_temp_value() | ||
|
||
def StopNotify(self): | ||
if not self.props[constants.GATT_CHRC_IFACE]['Notifying']: | ||
print('Not notifying, nothing to do') | ||
return | ||
|
||
print('Notifying off') | ||
self.props[constants.GATT_CHRC_IFACE]['Notifying'] = False | ||
self._update_temp_value() | ||
|
||
|
||
class ble: | ||
def __init__(self): | ||
self.bus = dbus.SystemBus() | ||
self.app = localGATT.Application() | ||
self.srv = localGATT.Service(1, CPU_TMP_SRVC, True) | ||
|
||
self.charc = TemperatureChrc(self.srv) | ||
|
||
self.charc.service = self.srv.path | ||
|
||
cpu_format_value = dbus.Array([dbus.Byte(0x0E), | ||
dbus.Byte(0xFE), | ||
dbus.Byte(0x2F), | ||
dbus.Byte(0x27), | ||
dbus.Byte(0x01), | ||
dbus.Byte(0x00), | ||
dbus.Byte(0x00)]) | ||
self.cpu_format = localGATT.Descriptor(4, | ||
CPU_FMT_DSCP, | ||
self.charc, | ||
cpu_format_value, | ||
['read']) | ||
|
||
self.app.add_managed_object(self.srv) | ||
self.app.add_managed_object(self.charc) | ||
self.app.add_managed_object(self.cpu_format) | ||
|
||
self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) | ||
self.srv_mng.register_application(self.app, {}) | ||
|
||
self.dongle = adapter.Adapter(adapter.list_adapters()[0]) | ||
advert = advertisement.Advertisement(1, 'peripheral') | ||
|
||
advert.service_UUIDs = [CPU_TMP_SRVC] | ||
# eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER) | ||
# advert.service_data = {EDDYSTONE: eddystone_data} | ||
if not self.dongle.powered: | ||
self.dongle.powered = True | ||
ad_manager = advertisement.AdvertisingManager(self.dongle.path) | ||
ad_manager.register_advertisement(advert, {}) | ||
|
||
def add_call_back(self, callback): | ||
self.charc.PropertiesChanged = callback | ||
|
||
def start_bt(self): | ||
# self.light.StartNotify() | ||
tools.start_mainloop() | ||
|
||
if __name__ == '__main__': | ||
print('CPU temperature is {}C'.format(get_cpu_temperature())) | ||
print(sint16(get_cpu_temperature())) | ||
pi_cpu_monitor = ble() | ||
pi_cpu_monitor.start_bt() |
Oops, something went wrong.