## Tapis PEARC20 Demo

In this notebook, we use Tapis to store and analyze streaming data generated from code simulating a sensor. We introduce a number of Tapis services and concepts along the way.

![Alt text](images/tapis_demo_overview.png "Tapis Demo Overview")

### Tapis Python SDK, Tenants and Authentication

In this notebook, we will use the official Tapis Python SDK for all of our interactions with the services. The Python SDK provides Python-native methods and objects for making HTTP requests and parsing HTTP responses to and from the Tapis API. 

In order to do just about anything with Tapis, we will need to authenticate. Tapis makes heavy use of the notion of "tenants" in order to provide isolation for different projects. By setting the base_url variable, you indicate to the Tapis SDK which tenant you wish to interact with.

For the demo, we will be using the "dev" tenant which has a base URL of "https://dev.tapis.io". This tenant is an internal tenant the Tapis core development team can use to try out services. The demo will make use of two test accounts in the dev tenant.

The "TACC tenant", with base URL "https://tacc.tapis.io", allows individuals to authenticate using any valid TACC account. For other tenants, the authentication rules could be different. 

Authentication in the "dev" and "TACC" tenants use OAuth2 (again, this could be different in other tenants), but the Tapis Python SDK simplifies some of the complexity inherent in OAuth2 by providing some convenience functions for common use cases. For example, we are able to generate an access token using just our username and password via the convenience function “get_tokens()”. We do this below:

In [None]:
import datetime
import getpass
import os

permitted_username = getpass.getpass(prompt='Permitted Username: ', stream=None)
permitted_user_password = getpass.getpass(prompt='Permitted Password: ', stream=None)
non_permitted_username = getpass.getpass(prompt='Non Permitted Username: ', stream=None)
non_permitted_password = getpass.getpass(prompt='Non Permitted Password: ', stream=None)
base_url = 'https://dev.tapis.io'
tenant_id='dev'

from tapipy.tapis import Tapis
#Create python Tapis client for user
permitted_client = Tapis(base_url= base_url, username=permitted_username, password=permitted_user_password,
                        download_latest_specs=True,spec_dir='/home/tapis',resource_set='dev') 
# *** Tapis v3: Call to Tokens API
permitted_client.get_tokens()

#Create python Tapis client for non-permitted user
nonpermitted_client = Tapis(base_url= base_url, username=non_permitted_username, password=non_permitted_password,
                            download_latest_specs=True,spec_dir='/home/tapis',resource_set='dev') 
# *** Tapis v3: Call to Tokens API
nonpermitted_client.get_tokens()



In Tapis, access tokens (and refresh tokens) are simply JSON Web Tokens (JWTs). The access_token Python object created and managed by the Python SDK has attributes on it that include the "raw" JWT string as well as claims associated with the JWT. Services use the claims to determine what actions a user is authorized to take. In particular, the "sub" (subject) claim uniquely identifies a user inside Tapis. 

In [None]:
permitted_client.access_token

Note also the ttl (time-to-live) claim; Tapis tokens have a finite lifetime, typically a few hours, configurable by tenant. After the token expires, we will need to get a new token in order to continue interacting with Tapis. The Python SDK has convenience methods for managing tokens and even automatically refreshing a token.

If we look at the token for the nonpermitted client, we see it represents a different user.

In [None]:
nonpermitted_client.access_token

In the following code block, we set up some basic variables that will be used to create and interact with the systems, files, actors and streams objects throughout the rest of the notebook.

In [None]:
# Setup Variables that are used in the rest of the notebook
# ensure project_id is unique each time we execute the notebook to ensure no collisions 
project_id ='wq_demo_tapis_proj1'+ str(datetime.datetime.today().isoformat())
site_id = 'wq_demo_site'
instrument_id = 'Ohio_River_Robert_C_Byrd_Locks'+  str(datetime.datetime.today().isoformat()).replace(':','_').replace('.','_')
channel_id = 'demo_wq_channel'+  str(datetime.datetime.today().isoformat()).replace(':','_').replace('.','_')
template_id = 'demo_channel_template'
actor_id = "JEbJj6EZPWNo3"
storage_id = "dev.tapisv3.storage.system"

## Systems API

In [None]:
# Look at the system description for our pre-registered S3 bucket:

# *** Tapis v3: Call to Systems API
permitted_client.systems.getSystem(systemId=storage_id)

In [None]:
# Code used to actually create the storage system -- we will not run here.

# the description of an S3 bucket
s3_bucket = {
  "name":storage_id,
  "description":"Joe's Bucket",
  "host":"https://tapis-files-test.s3.us-east-1.amazonaws.com/",
  "systemType":"OBJECT_STORE",
  "defaultAccessMethod":"ACCESS_KEY",
  "effectiveUserId":"testuser2",
  "bucketName":"tapis-files-bucket",
  "rootDir":"/",
  "jobCanExec": False,
  "transferMethods":["S3"],
  "accessCredential":
  {
    "accessKey":"***",
    "accessSecret":"***"
  }
}

# create the system in Tapis
# Demo Step (0)(a)
# *** Tapis v3: Call to Systems API
# permited_client.systems.createSystem(**s3_bucket)


## Files API

In [None]:
#List file of current storage system
# *** Tapis v3: Call to Files API
permitted_client.files.listFiles(systemId=storage_id, path="/")

## Streams API

![Alt text](images/streams-api.png "a title")

### Project and Metadata Setup
Projects are defined at a top level in the hierarchy of Streams resources. A user registers a project by providing metadata information such as the principal Investigator, project URL, funding resource, etc. A list of authorized users can be added to various project roles to have a controlled access over the project resources. When a project is first registered, a collection is created in the back-end MongoDB. User permissions to access this collection are then set up in the security kernel. Every request to access the project resource or documents within (i.e sites, instruments, variables) goes through a security kernel check and only the authorized user requests are allowed to be processed.

In [None]:
## Create Project
# Demo Step (0)(a)
# *** Tapis v3: Call to Streams API
result, debug = permitted_client.streams.create_project(project_name=project_id,
                                                        description='project for early adopters demo',
                                                        owner='testuser2', 
                                                        pi='ajamthe', 
                                                        funding_resource='tapis', 
                                                        project_url='test.tacc.utexas.edu',
                                                        active=True,
                                                        _tapis_debug=True)
print(result)

![Alt text](images/stream-mongo.png "a title")

#### Create Site
A site is a geographical location that may hold one or more instruments. Sites are next in the streams hierarchy and they inherit permissions from the projects. Project owners can create sites by providing the geographical information such as latitude, longitude and elevation of the site or GeoJSON encoded spatial information. This spatial information is useful when searching sites or data based on location. In the back-end database a site is represented as a JSON document within the project collection. Site permissions are inherited from the project. 


In [None]:
## Create Site
# Demo Step (0)(a)
# *** Tapis v3: Call to Streams API
result, debug = permitted_client.streams.create_site(project_id=project_id,
                                                     request_body=[{
                                                     "site_name":site_id, 
                                                     "site_id":site_id,
                                                     "latitude":50, 
                                                     "longitude":10, 
                                                     "elevation":2,
                                                     "description":'test_site'
                                                    }], _tapis_debug=True)
print(result)

#### Create Instrument
Instruments are physical entities that may have one or more embedded sensors to sense various parameters such as temperature, relative humidity, specific conductivity, etc. These sensors referred to as variables in Streams API generate measurements, which are stored in the influxDB along with a ISO8601 timestamp. Instruments are associated with specific sites and projects. Information about the instruments such as site and project ids, name and description of the instrument, etc. are stored in the mongoDB sites JSON document. 

In [None]:
## Create Instruments
# Demo Step (0)(a)
# *** Tapis v3: Call to Streams API
result, debug = permitted_client.streams.create_instrument(project_id=project_id,
                                                           site_id=site_id,
                                                           request_body=[{
                                                           "inst_name":instrument_id,
                                                           "inst_description":"demo instrument",
                                                           "inst_id":instrument_id
                                                           }], _tapis_debug=True)
print(result)

#### Create Variables
Variables are associated with specific instruments. When a variable is created the users provide information such as the name of variable, properties measured, units of measurements, etc. For example, a variable for temperature sensor when created can store measurements in degree Celsius or Fahrenheit.

In [None]:
#Create variables in bulk
result, debug = permitted_client.streams.create_variable(project_id=project_id,
                                                         site_id=site_id,
                                                         inst_id=instrument_id,
                                                         request_body=[
                                                         {
                                                         "topic_category_id" :"2",
                                                         "var_name":"temperature", 
                                                         "shortname":"temp","var_id":"temp"
                                                         },
                                                         {
                                                          "topic_category_id" :"2",
                                                         "var_name":"ph_level", 
                                                         "shortname":"ph","var_id":"ph"
                                                         },{
                                                          "topic_category_id" :"2",
                                                         "var_name":"battery", 
                                                         "shortname":"batv","var_id":"batv"
                                                         },{
                                                         "topic_category_id" :"2",
                                                         "var_name":"turbidity", 
                                                         "shortname":"turb","var_id":"turb"
                                                         },{
                                                         "topic_category_id" :"2",
                                                         "var_name":"specific_conductivity", 
                                                         "shortname":"spc","var_id":"spc"
                                                         }
                                                         ],_tapis_debug=True)
print(result)

### Stream Permissions
Project roles and permissions are stored in the Tapis v3 Security Kernel.

In [None]:
#Project Roles
# *** Tapis v3: Call to Security Kernel API
permitted_client.sk.getUserRoles(user=permitted_username, tenant=tenant_id)

In [None]:
# *** Tapis v3: Call to Security Kernel API
nonpermitted_client.sk.getUserRoles(user=non_permitted_username, tenant=tenant_id)


In [None]:
# *** Tapis v3: Call to Streams API
nonpermitted_client.streams.list_projects()

In [None]:
## Site Role  - non-permissioned user
# *** Tapis v3: Call to Streams API
result = nonpermitted_client.streams.get_site(project_id=project_id, site_id=site_id)
print(result)

In [None]:
## Site Role - permitted user
# *** Tapis v3: Call to Streams API
result = permitted_client.streams.get_site(project_id=project_id, site_id=site_id)
print(result)

### Write Measurements
Measurements are actual values from the variables, which are stored in the time series database influxDB. Project owners or users can download these measurements by providing a time window of measurement creation and retrieve the data in the CSV or JSON format. This data  can be processed in real time with the help of the Channels API.

In [None]:
#Write Measurements - this is our sensor simulator
from datetime import datetime
import random
from random import randint
variables=[]
for i in range(0, 10):
    datetime_now = datetime.now().isoformat()
    variables.append({"temp": randint(85, 89),
                        "spc": randint(240, 300),
                        "turb": randint(10, 19),
                        "ph": randint(1, 10),
                        "batv": round(random.uniform(10, 13), 2),
                        "datetime":datetime_now
                        })
result = permitted_client.streams.create_measurement(inst_id=instrument_id,vars=variables)
print(result)

### Download Measurements
Download the measurements we just created from our instrument.

In [None]:
#Download measurments as CSV
# *** Tapis v3: Call to Streams API
result = permitted_client.streams.list_measurements(inst_id=instrument_id,
                                                    project_id=project_id, 
                                                    site_id=site_id,
                                                    start_date='2021-01-01T00:00:00Z',
                                                    end_date='2025-12-30T22:19:25Z',
                                                    format='csv')
result

In [None]:
#Read Measurements to Data Frame
import pandas as pd
from io import StringIO
input = StringIO(str(result,'utf-8'))
df = pd.read_csv(input)
df['datetime']=pd.to_datetime(df['time'])
df.set_index('datetime',inplace=True)
df.pop('time')
df

In [None]:
# Plot Measurements in the DataFrame
import matplotlib.pyplot as plt
import matplotlib.dates as md
%matplotlib inline
xfmt = md.DateFormatter('%H:%M:%S')
df.plot(lw=1, colormap='jet', marker='.', 
        markersize=12, title='Timeseries Stream Output', rot=90).xaxis.set_major_formatter(xfmt)
plt.tight_layout()
plt.legend(loc='best')
plt.savefig('test2.png')

### Create Actor

We create an Abaco actor to automatically execute code whenever a sensor registers a temperature reading beyond a predefined temperature threshold, in this case, 90 degrees. The actor is a standalone function that is written in Python and packaged as a Docker container.

The code for the actor is available from within the actor directory. When executed, the actor code retrieves all stream data points for the given instrument. It then makes a plot of the data points and uploads the plot to a Tapis S3 bucket.


In [None]:
# Review actor code

### Create Channel
Channels are created using the variable resources. When the data for a specific variable meets a certain condition defined in the channel, an alert or notification is raised. The Channels API leverages Kapacitor to create tasks to process real time streaming data from influxDB.

##### Channels require a "template" we will use an existing one 'demo_channel_template' that accepts a single condition

In [None]:
##Create Channel
# Demo Step (0)(b)
# *** Tapis v3: Call to Streams API
result =  permitted_client.streams.create_channels(channel_id=channel_id, channel_name='demo.wq.channel', template_id=template_id,
                                              triggers_with_actions=[{"inst_ids":[instrument_id],
                                                                      "condition":{"key":instrument_id+".temp","operator":">", "val":90}, 
                                                                      "action":{"method":"ACTOR","actor_id" :actor_id,
                                                                                "message":"Instrument: Ohio_River_Robert_C_Byrd_Locks temp exceeded threshold", 
                                                                                "abaco_base_url":"https://api.tacc.utexas.edu"}}]
                                              ,_tapis_debug=True)
print(result)

## Alerts
As events are generated an "Alert" is created, notifications are sent to the data processing end-points via HTTP POST with details ofthe data raising the alerts

![Alt text](images/alert-abaco.png "a title")

In [None]:
# list empty Alerts
# *** Tapis v3: Call to Streams API
permitted_client.streams.list_alerts(channel_id=channel_id)

In [None]:
# Trigger Alert Measurement
from datetime import datetime
import random
from random import randint
datetime_now = datetime.now().isoformat()
# Demo Step (2)
# *** Tapis v3: Call to Streams API
result = permitted_client.streams.create_measurement(inst_id=instrument_id,
                                      vars=[{"temp": 150,
                                            "spc": randint(240, 300),
                                            "turb": randint(10, 19),
                                            "ph": randint(1, 10),
                                            "batv": round(random.uniform(10, 13), 2),
                                            "datetime":datetime_now}])
print(result)
variables=[]
for i in range(0, 5):
    datetime_now = datetime.now().isoformat()
    # *** Tapis v3: Call to Streams API
    variables.append({"temp": randint(85, 89),
                        "spc": randint(240, 300),
                        "turb": randint(10, 19),
                        "ph": randint(1, 10),
                        "batv": round(random.uniform(10, 13), 2),
                        "datetime":datetime_now})
result = permitted_client.streams.create_measurement(inst_id=instrument_id,vars=variables)
print(result)

### List Alerts

In [None]:
#list alerts after trigger
# Demo Step (4)
# *** Tapis v3: Call to Streams API
permitted_client.streams.list_alerts(channel_id=channel_id)

In [None]:
# Demo Step (4)
# *** Tapis v3: Call to Files API
permitted_client.files.listFiles(systemId=storage_id, path="/")

### Angular UI Filebrowser
Go to the file browser to preview the generated plot https://tapis-project.github.io/ng-tapis-files-browser