# Joule API Demonstration Notebook

This notebook shows how to use the Joule Application Programming Interface (API).

<div class="alert alert-block alert-info"> 
    <p>
<b>NOTE</b> Before running this notebook, you must authorize API access for the current user. Run the following command from the terminal to authorize API access:
    </p>
<code>$> sudo -E joule admin authorize</code>
</div>

  


In [None]:
# run this cell first to import the packages
import joule.api
# convenience imports to make code more compact
from joule.api import EventStream, Event, DataStream, Element, Annotation
from joule.errors import EmptyPipeError
# utility libraries
import numpy as np
from matplotlib import pyplot as plt

To use the API you must have access to the Joule node. To view accessible nodes run the following command:
    
    $> joule node list


In [None]:
# get_node() returns the default node, add a name parameter to request a specific one
node = joule.api.get_node()

# all node methods are async so you must use the await keyword
info = await node.info()

print("Node [%s] running joule %s" % (info.name, info.version))

### Create data streams and write data
Create a two element stream of 5Hz sine, cosine waveforms

In [None]:
freq = 5.0
t = np.arange(0,1,0.001) # 1ms sample rate
sine = np.sin(freq*2*np.pi*t)
cosine = np.cos(freq*2*np.pi*t)
tangent = np.tan(freq*2*np.pi*t)
plt.plot(t, sine, 'r', t, cosine, 'g')
plt.xlabel('Time (sec)')
plt.show()

Create a stream on the Joule Node that can store this data.
<div class="alert alert-block alert-info"> 
<b>NOTE</b> Running code below than once will generate an error that the stream already exists
</div>

In [None]:
stream = DataStream(name="waves", elements=[Element(name="sine"), Element(name="cosine")])
stream = await node.data_stream_create(stream,"/api_demo") # now stream is a registered model and can be used with API calls

Package the data into a 2D numpy array and write it to the data stream. 

Each row is a timestamped set of data: `[ts, sine, cosine]`. 
<div class="alert alert-block alert-info"> 
<b>NOTES:</b>
    <ul>
        <li>All Joule timestamps are in UNIX microseconds. </li>
        <li>All timestamps must be unique in a datastream (only run the code below once).</li>
    </ul>
</div>

In [None]:
# we need to put the data in an Nx3 numpy array:
#  [ ts sine cosine
#    ts sine cosine
#    ...           ]
#
# There are many ways to do this, the following is rather concise
# The timestamp vector 't' is in seconds, multiply by 1e6 to convert to microseconds
#
data = np.vstack((t*1e6, sine, cosine)).T

#
# add data to the stream by using an input pipe
#
stream = await node.data_stream_get('/api_demo/waves')
pipe = await node.data_write(stream)
await pipe.write(data) # timestamps should be in us
await pipe.close() # make sure to close the pipe after writing

Refresh the node in Lumen and to see the new stream with data.

### Manipulate data streams and data

In [None]:
# get information about a stream
print("Stream Info:\t", await node.data_stream_info(stream))

# get the data intervals (regions of the stream with data)
print("Intervals:\t", await node.data_intervals(stream))

# change the display type of an element to discrete
stream.elements[1].display_type="discrete"
await node.data_stream_update(stream) # refresh the node to see this change

# remove data from a stream 
# ***DANGEROUS: OMITTING START and END will remove ALL DATA***
await node.data_delete(stream,start=0.2*1e6, end=0.4*1e6)
print("--removed data--")
print("Intervals:\t", await node.data_intervals(stream))

# ...many more methods are available, see API docs

### Data Annotations

<div class="alert alert-block alert-warning"> 
<b>Important</b> Create a stream annotation in Lumen before running the cell below.
</div>


In [None]:
# retrieve a list of annotations (include start,end parameters to limit query to a time range)
annotations = await node.annotation_get(stream)

if len(annotations) == 0:
    print("ERROR: Create an annotation in Lumen then run this cell")
elif annotations[0].end is None:
    print("ERROR: Annotate a range in Lumen, not an event")
else:
    annotation = annotations[0]

    # read the data associated with the annotation
    pipe = await node.data_read(stream,start=annotation.start, end=annotation.end)
    data = await pipe.read_all() # this automatically closes the pipe

    # plot the data
    plt.plot(data['timestamp']/1e6, data['data'])
    plt.title(annotation.title)
    plt.xlabel('Time (sec)')
    plt.show()

# Annotations can also be created with the API
#
    annotation = Annotation(title='Created by API', start=0.8*1e6)
    await node.annotation_create(annotation, stream)



Refresh the annotations in the Plot Tab of Lumen to see the `Created by API`  annotation.

### Explore data streams and read data

In [None]:
# Nodes can be explored through the API
#
root = await node.folder_root()

def print_folder(folder, indent=0):
    for child in folder.children:
        print("  "*indent + child.name)
        print_folder(child, indent+1)
    for stream in folder.data_streams:
        print("  "*indent + "[%s: %s]" % (stream.name, stream.layout))
        
# print the folder directory structure
print_folder(root)


---

*Reading Data Option 1:* [`pipe.read_all()`](https://wattsworth.net/joule/pipes.html#joule.models.Pipe.read_all)




In [None]:
# streams can be accessed by API object (as shown in previous cells) or by path
info = await node.data_stream_info("/api_demo/waves")
print("The demo stream has %d rows of data" % info.rows)

pipe = await node.data_read("/api_demo/waves")
data = await pipe.read_all()
print(f"retreived {len(data)} rows of data")

---

*Reading Data Option 2:* [`pipe.read()`](https://wattsworth.net/joule/pipes.html#joule.models.Pipe.read)

In general you should treat a pipe as an infinite
data source and read it by chunk. This requires more code, but it scales to very large 
datasets and is the only way to work with realtime data sources

In [None]:
# If you want to treat the data like a simple array you can use the read_all method, but if
# there is too much data this may fail. In general you should treat a pipe as an infinite
# data source and read it by chunk. This requires more code, but it scales to very large 
# datasets and is the only way to work with realtime data sources
#
print("-- reading data --")
pipe = await node.data_read("/api_demo/waves")
while not await pipe.is_empty():
    data = await pipe.read()
    plt.plot(data['timestamp']/1e6,data['data'])
    pipe.consume(len(data))
    print("%d rows of data" % len(data))
    # for large data sources the chunk may or may not be an interval boundary
    # you can explicitly check whether this is the end of an interval:
    if pipe.end_of_interval:
        print(" data boundary")

plt.xlabel('Time (sec)')
plt.title('Data showing interval break')
plt.show()


## Event Streams

Event streams store JSON data with start and end timestamps. Specify the structure of the JSON with the `event_fields` parameter to enable event filtering in Lumen. The `event_fields` parameter is a dictionary of field names and their respective datatype: `{'field_name': datatype, ...}`. The valid datatypes are:
* `numeric`: interger or floating point value
* `string`: text value
* `category`: a value from a list of strings in JSON format (see the `color` field below)

In [None]:
# retrieve or create an event stream on the Joule node
stream = await node.event_stream_get("/api_demo/events", create=True, 
                                     event_fields={'height':'numeric',
                                                   'description':'string',
                                                   'color':'category:["red","blue"]'})

Create events and add them to the event stream

In [None]:
# events have a start, end, and content which can contain any JSON serializable data
# NOTE: The JSON data is not type checked against the event_fields parameter specified above
event1 = Event(start_time=0, end_time=0.1e6, 
               content={'height':1,'color':'red','description':'first one'})
event2 = Event(start_time=.3e6, end_time=0.5e6, 
               content={'height':4,'color':'green','description':'second one'})
event3 = Event(start_time=.3e6, end_time=1e6, 
               content={'height':2,'color':'blue','description':'third one'})

events = await node.event_stream_write(stream, [event1, event2, event3])
# each event now contains an ID 
print(events[0])

### Explore event streams and read data

Retrieve information about an event stream such as the time extents and total number of events

In [None]:
event_stream_info = await node.event_stream_info(stream)
print(event_stream_info)

----

*Reading Events Option 1:* [`node.event_stream_read_list(stream)`](https://wattsworth.net/joule/events.html#TODO)

In [None]:
events = await node.event_stream_read_list(stream)
for event in events:
    print(f"Event {event.id}: color={event.content['color']}")

---

*Reading Events Option 2:* [`node.event_stream_read(stream,...)`](https://wattsworth.net/joule/events.html#TODO)

For streams with too many events to comfortably handle with a single list (eg > 1000) use this function to iterate over the events asynchronously.

In [None]:
import json
# use filters to query specific event types
async for event in node.event_stream_read(stream,json_filter=json.dumps([[["color","is","red"]]])):
    print(f"Event {event.id}: color={event.content['color']}")

### Manipulate event streams and events

In [None]:
stream = await node.event_stream_get("/api_demo/events")
stream.description = "updated description"
await node.event_stream_update(stream)
print(f"Changed description to '{stream.description}'")

To modify an event object simply write it again. The write operation performs an upsert so any event objects with a valid `id` will be updated rather than inserted.

In [None]:
# retrieve all events with height > 3
tall_events =  await node.event_stream_read_list(stream,json_filter=json.dumps([[["height","gt","3"]]]))
# change the color to yellow
for event in tall_events:
    event.content['color']='yellow'
    
# because these events already have id's this write will update the existing events
updated_events = await node.event_stream_write(stream, tall_events)
print(await node.event_stream_info(stream)) # total count remains the same

In [None]:
# remove operations can have time bounds and/or JSON filters
await node.event_stream_remove(stream,start=0.2e6) # remove all events after 0.2 seconds
print(await node.event_stream_count(stream)) # only the event from 0-0.1s is left

### Reset the Node to original state
**Run this cell to undo all changes created by this notebook**

In [None]:
await node.folder_delete("/api_demo")