In [None]:
import getpass
from datetime import datetime, timedelta
from tapipy.tapis import Tapis
from random import gauss, randint
from time import time_ns
import random
import pandas as pd
from io import StringIO
import matplotlib.pyplot as plt
import matplotlib.dates as md

In [None]:
username = getpass.getpass(prompt = "Username: ", stream=None)
password = getpass.getpass(prompt = "Password: ", stream=None)
vm_password = getpass.getpass(prompt = "VM Password: ", stream=None)

base_url = "https://training.tapis.io"

client = Tapis(
    base_url = base_url, 
    username = username,
    password = password
) 

#generate access token
client.get_tokens()

client.access_token

In [None]:
unique_name = f"{username}_{randint(0, 1000000)}"

project_id = f"smart_data_workshop_project_{unique_name}"
site_id = f"smart_data_workshop_site_{unique_name}"
inst_id = f"smart_data_workshop_instrument_{unique_name}"
execution_system_id = f"{unique_name}_tutorial_vm_execution_system"
storage_system_id = f"{unique_name}_tutorial_vm_storage_system"
app_id = f"{unique_name}_img_classifier"
trigger_channel_id = f"{unique_name}_trigger_channel"
discord_channel_id = f"{unique_name}_discord_channel"

In [None]:
#define and create project
project = {
    "project_name": project_id,
    "description": f"A smart data workshop project for user {username}",
    "owner": username,
    "pi": username,
    "active": True,
    "metadata": {}
}
proj = client.streams.create_project(**project)
proj

In [None]:
#define and create site
site = {
    "project_id": project_id,
    "request_body": [{
        "site_name": site_id,
        "site_id": site_id,
        "description": f"A smart data workshop site for user {username}",
        "latitude": 19.89,
        "longitude": 155.58,
        "elevation": 10,
        "metadata": {}
    }]
}
site = client.streams.create_site(**site)
site

In [None]:
#define and create instrument
instrument = {
    "project_id": project_id,
    "site_id": site_id,
    "request_body": [{
        "inst_name": inst_id,
        "inst_id": inst_id,
        "inst_description": f"A smart data workshop instrument for user {username}",
        "metadata": {}
    }]
}
inst = client.streams.create_instrument(**instrument)
inst

In [None]:
#define and create variables
variables = {
    "project_id": project_id,
    "site_id": site_id,
    "inst_id": inst_id,
    "request_body": [{
        "var_name": "rainfall",
        "var_id": "rainfall",
        "units": "mm",
        "shortname": "rf",
        "metadata": {}
    },
    {
        "var_name": "temperature",
        "var_id": "temperature",
        "units": "C",
        "shortname": "temp",
        "metadata": {}
    }]
}
streams_vars = client.streams.create_variable(**variables)
streams_vars

In [None]:
#Write Measurements - For today 
variables = []
#generate sensor records up to current time
date = datetime.now() - timedelta(minutes = 10000)
#generate 100 sensor records
for i in range(0, 10000):
    date = date + timedelta(minutes = 1)
    date_s = date.isoformat() 
    variables.append({"temperature": randint(60, 89),
                        "rainfall": randint(10, 200),
                        "datetime": date_s
                        })
#write observations to measurements endpoint for our instrument
result = client.streams.create_measurement(inst_id=inst_id, vars=variables)

result

In [None]:
#Download measurments as CSV
result = client.streams.list_measurements(inst_id=inst_id,
                                                    project_id=project_id, 
                                                    site_id=site_id,
                                                    start_date='2021-01-01T00:00:00Z',
                                                    end_date='2025-12-30T22:19:25Z',
                                                    format='csv')
result

In [None]:
#Read Measurements to Data Frame
input = StringIO(str(result,'utf-8'))
df = pd.read_csv(input)
df['datetime']=pd.to_datetime(df['time'])
df.set_index('datetime',inplace=True)
df.pop('time')
df

In [None]:
# Plot Measurements in the DataFrame
%matplotlib inline
xfmt = md.DateFormatter('%H:%M:%S')
df.plot(lw=1, colormap='jet', marker='.', 
        markersize=12, title='Timeseries Stream Output', rot=90).xaxis.set_major_formatter(xfmt)
plt.tight_layout()
plt.legend(loc='best')
plt.savefig('test.png')

Go to Tapis UI to view the created measurements https://ikewai.github.io/tapis_ui_streams_training_deploy/#/streams
Login with your Tapis credentials

In [None]:
#Create a storage system in Tapis so we can upload and download data from our server
#NOTE you system id needs to be unique across Tapis- so add your last_name
system = client.systems.createSystem(id=storage_system_id,
            description="VM storage",
            host="129.114.17.159",
            systemType="LINUX",
            defaultAuthnMethod="PASSWORD",
            effectiveUserId=username,
            rootDir=f"/home/{username}/",
            canExec=False)
system

In [None]:
# Add login credentials so Tapis can access the system - NORMALLY WE USED SSH KEYS but for this tutorial we will utliize the password auth
#NOTE you need to update the systemId to what you used above

client.systems.createUserCredential(systemId=storage_system_id,userName=username, password=vm_password)

In [None]:
#List the files in our home directory
#NOTE the system id needs to be updated
client.files.listFiles(systemId=storage_system_id, path="/")

In [None]:
#Create an Actor that we can pass to a channel to execute when a threshold triggers
#NOTE update the name and the system_id
my_actor = client.actors.create_actor(image="mcleanj/workshop_actor:0.0.12",
                                     name="Plot Streams Data-1",
                                     description="Actor that plots streams measurements.",
                                     default_environment={"system_id": storage_system_id, "destination_path": "/"})
                                     
                                     
my_actor

In [None]:
#Create a Channel to check for our Temperature exceeding 200- then execute an Actor to generate a plot and upload to our system
#NOTE you have to change you channel id to a unique one - add your lastname
channel = client.streams.create_channels(channel_id=trigger_channel_id, 
                            channel_name=trigger_channel_id, 
                            template_id="default_threshold",
                            triggers_with_actions=[
                                {"inst_ids":[inst_id],
                                "condition":{"key":inst_id+".temperature",
                                              "operator":">", 
                                              "val":100},
                                 "action":{"method": "ACTOR","actor_id" : my_actor.id,
                                           "message": f"Instrument: {inst_id} exceeded Temperature threshold"}}])
channel

In [None]:
#Write Measurements - to trigger our Channel

#generate measurement
date_s = datetime.now().isoformat()
variables = [{"temperature": 230,
                    "rainfall": 0,
                    "datetime": date_s
                    }]
#write observations to measurements endpoint for our instrument
result = client.streams.create_measurement(inst_id=inst_id, vars=variables)
print(result)

In [None]:
#List the Alerts issued on our Channel
client.streams.list_alerts(channel_id=channel.channel_id)

In [None]:
#Fetch the latest alert and assign to a variable
alert = client.streams.list_alerts(channel_id=channel.channel_id).alerts[0]

In [None]:
#Fetch our Actor execution log
client.actors.get_execution_logs(actor_id=alert.actor_id, 
                                 execution_id=alert.execution_id)

In [None]:
#View our files on our VM - we should see the new plot file
client.files.listFiles(systemId=storage_system_id, path="/")

In [None]:
#Lets download the file to view here in our notebook
fileb =  client.files.getContents(systemId=storage_system_id, path='/plot_2022-04-28T02:17:55Z.png')
with open("download.png","wb") as f:
    f.write(fileb)
    f.close()

In [None]:
discord_channel = client.streams.create_channels(channel_id=discord_channel_id,
                            channel_name=discord_channel_id, 
                            template_id="default_threshold",
                            triggers_with_actions=[
                                {"inst_ids":[inst_id],
                                 "condition":{"key":inst_id+".rainfall",
                                              "operator":">", 
                                              "val":150},
                                 "action":{"method":"DISCORD","webhook_url" :"https://discord.com/api/webhooks/969019400990646332/9ihkprgFimjrCDPOnWcxoJvLTJzYE24fOB7mPCYyRmAzUMeoHbgtRbCpC7b_dWnZoxUS",
                                           "message":"My Instrument exceeded Rainfall threshold val ${ r.value}"}}], _tapis_debug=True)
discord_channel

Please join this discord link https://discord.gg/x8B5JZNm to view alerts

In [None]:
#Write Measurement - to trigger our Discord Channel
#generate sensor record out of range
date_s = datetime.now().isoformat()
variables = [{"temperature": 80,
                    "rainfall": 151,
                    "datetime": date_s
                    }]
#write observations to measurements endpoint for our instrument
result = client.streams.create_measurement(inst_id=inst_id, vars=variables)
print(result)

In [None]:
#Create a execution system in Tapis so we can upload and download data from our server
#NOTE you system id needs to be unique across Tapis- so add your last_name

system_config = {
    "id": execution_system_id,
    "description": "System for testing jobs in training tenant",
    "systemType": "LINUX",
    "host": "129.114.17.159",
    "effectiveUserId": "${apiUserId}",
    "defaultAuthnMethod": "PASSWORD",
    "rootDir": "/home",
    "canExec": True,
    "canRunBatch": True,
    "jobRuntimes": [
        {
            "runtimeType": "DOCKER",
            "version": "0.0.1d"
        },
        {
            "runtimeType": "SINGULARITY",
            "version": "0.0.1"
        }
    ],
    "jobWorkingDir": "${apiUserId}/workdir",
    "batchScheduler": "SLURM",
    "batchLogicalQueues": [
        {
            "name": "tapisNormal",
            "hpcQueueName": "debug",
            "maxJobs": 5,
            "maxJobsPerUser": 1,
            "minNodeCount": 1,
            "maxNodeCount": 1,
            "minCoresPerNode": 1,
            "maxCoresPerNode": 4,
            "minMemoryMB": 1,
            "maxMemoryMB": 4096,
            "minMinutes": 1,
            "maxMinutes": 60
        }
    ],
    "batchDefaultLogicalQueue": "tapisNormal"
}
system = client.systems.createSystem(**system_config)

system

In [None]:
client.systems.createUserCredential(systemId=execution_system_id,userName=username, password=vm_password)

In [None]:
#List the files in our home directory
client.files.listFiles(systemId=execution_system_id, path=f"/{username}/")

In [None]:
app_config = {
    "id": app_id,
    "version": "0.1",
    "description": "An image classifier run via FORK using docker.",
    "runtime": "DOCKER",
    "jobType": "FORK",
    "containerImage": "tapis/img-classify2:0.1",
    "jobAttributes": {
        "execSystemId": execution_system_id,
        "execSystemExecDir": "${JobWorkingDir}/jobs/${JobUUID}",
        "execSystemInputDir": "${JobWorkingDir}/jobs/${JobUUID}/data",
        "execSystemOutputDir": "${JobWorkingDir}/jobs/${JobUUID}/output",
        "parameterSet": {
           "appArgs": [
             { "name": "arg1", "arg": "--image_file" },
             { "name": "inputFile",
               "arg": "https://s3.amazonaws.com/cdn-origin-etr.akc.org/wp-content/uploads/2017/11/12231410/Labrador-Retriever-On-White-01.jpg"
             }
           ],
           "archiveFilter":
            { 
              "includeLaunchFiles": True
            }
        },
        "memoryMB": 2048,
        "nodeCount": 1,
        "coresPerNode": 4,
        "maxMinutes": 10
    }
}
app = client.apps.createAppVersion(**app_config)

app

Go to Tapis UI to run the job https://ikewai.github.io/tapis_ui_streams_training_deploy/#/apps