## Create Events from Plug Data

In [6]:
import json
import joule
from joule.errors import ApiError

In [2]:
PLUG_STREAM = "/plugs/chest_freezer"
EVENT_STREAM = "/plugs/chest freezer2"
LOAD_NAME = "Chest Freeezer"
NODE = "vagrant"
MIN_WATTS = 20
REMOVE_EXISTING = True

In [3]:
node = joule.api.get_node(NODE)

plug_stream = await node.data_stream_get(PLUG_STREAM)
try:
    event_stream = await node.event_stream_get(EVENT_STREAM)
except ApiError:
    folder = "/".join(EVENT_STREAM.split('/')[:-1])
    name = EVENT_STREAM.split('/')[-1]
    event_stream = joule.api.EventStream(name=name, event_fields = {
        "name": "string",
        "average_power": "numeric",
        "kwh": "numeric",
        "peak_power": "numeric",
        "duration": "numeric"
    })
    event_stream = await node.event_stream_create(event_stream, folder)
await node.close()

In [10]:
node = joule.api.get_node()

if REMOVE_EXISTING:
    json_filter = json.dumps([[['name','is',LOAD_NAME],['peak_power','gt',100.5]]])
    await node.event_stream_remove(event_stream, json_filter=json_filter)
await node.close()

In [9]:
node = joule.api.get_node(NODE)
pipe = await node.data_read(plug_stream)
peak_power = None
start_row = None
power_acc = 0
run_ticks = 0
while not pipe.is_empty():
    data = await pipe.read(flatten=True)
    pipe.consume(len(data))
    for row in data:
        if row[3]>MIN_WATTS and start_row is None:
            start_row = row
            peak_power = row[3]
            power_acc = row[3]
            run_ticks = 1
        if start_row is not None:
            peak_power = max((peak_power, row[3]))
            power_acc += row[3]
            run_ticks += 1
        if start_row is not None and row[3]<MIN_WATTS:
            event = joule.api.Event(start_row[0],row[0])
            event.content = {
                'average_power': power_acc/run_ticks,
                'kwh': row[7]-start_row[7],
                'name': LOAD_NAME,
                'peak_power': peak_power,
                'duration': (row[0]-start_row[0])/1e6
            }
            await node.event_stream_write(event_stream, [event])
            print("added event")
            start_row = None
            peak_power = None
            power_acc = 0
            run_ticks = 0
    print(f"read {len(data)} rows")
await node.close()

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f45174b85b0>


read 445 rows
added event
added event
added event
added event
added event
added event
read 10751 rows
added event
added event
added event
added event
added event
read 7794 rows
