In [1]:
%gui asyncio

In [2]:
from ipywidgets import IntSlider, Output
import ipywidgets as widgets
import pandas as pd
import requests
import asyncio
import io
import json
import os
from dateutil import parser
from dateutil.tz import *

In [3]:
# OMF_ENDPOINT = "http://httpbin.org/post"
OMF_ENDPOINT = "https://academicpi.azure-api.net/csv-ingress/messages"
API_KEY = "dummy" 

def omf_type(new_type):
    return (
        f"stream-{new_type}",
        {
            "id": f"stream-{new_type}",
            "description": "Timestamp and real-time value",
            "type": "object",
            "classification": "dynamic",
            "properties": {
                "IndexedDateTime": {
                    "type": "string",
                    "format": "date-time",
                    "isindex": True,
                },
                "value": {"type": f"{new_type}"},
            },
        },
    )


omf_number_typeid, omf_number_type = omf_type("number")


def container_id(asset, name):
    return f"{asset}.{name}"


def omf_container(asset, name, typeid):
    return {"id": container_id(asset, name), "typeid": f"{typeid}"}


def omf_data(asset, name, timestamp, value):
    ts = parser.parse(timestamp).astimezone(UTC)
    return {
        "containerid": container_id(asset, name),
        "values": [{"IndexedDateTime": f"{ts.isoformat()}", "value": value}],
    }


producer_token = os.environ.get("PRODUCER_TOKEN", "osi_unitops1")


def omf_headers(message_type, api_key=API_KEY, producer_token=producer_token):
    # message_type is "type", "container" or "data"
    return {
        "producertoken": f"{producer_token}",
        "messagetype": f"{message_type}",
        "messageformat": "json",
        "omfversion": "1.0",
        "Ocp-Apim-Subscription-Key": api_key,
    }


def send_omf_message(message_type, data, api_key, out=None):
    if not isinstance(data, list):
        data = [data]
    if out:
        out.append_stdout(f"!!send_omf_message {omf_headers(message_type, api_key)}\n")
        out.append_stdout(
            f"!!send_omf_message [{message_type}]:\n {json.dumps(data, indent=2)}\n\n"
        )
        return None

    return requests.post(
        OMF_ENDPOINT, headers=omf_headers(message_type, api_key), json=data
    )

In [4]:
asset = "Asset03"
sensor = "Temp01"
row_batch_size = 10

In [5]:
# r = send_omf_message("type", omf_number_type)
# print(f"type status {r.status_code} \n ==> len={len(r.text)}: {r.text}")

In [6]:
"""
r = send_omf_message("type", omf_number_type)
print(f"type status {r.status_code} \n ==> len={len(r.text)}: {r.text}")

r = send_omf_message("container", omf_container(asset, sensor, omf_number_typeid))
print(f"container status {r.status_code} \n ==> len={len(r.text)}: {r.text}")

message_data = omf_data(asset, sensor, "2020-01-29T00:00 -05:00", -10.0)
message_data

r = send_omf_message("data", omf_data(asset, sensor, "2020-01-29T00:00 -05:00", -10.0))
print(f"data status {r.status_code} \n ==> len={len(r.text)}: {r.text}")
"""
None

<img src="https://academichub.blob.core.windows.net/images/logo_rgb.png" align="left" alt="drawing" width="55"/>

##  CSV Load/Extract/Transfer for Academic Hub version 2.2

### 1) Enter API key provided by OSIsoft
### 2) Enter experiment or asset name for data
### 3) Click `Upload` button, then select CSV file to transfer 

If CSV is in a valid format, progress information will show up until completion. 

**Note: this web application can transfer one CSV at a time. Reload this web page to restart and upload a new CSV or after an error**

In [7]:
def wait_for_change(widget, value):
    future = asyncio.Future()

    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)

    widget.observe(getvalue, value)
    return future

In [9]:
upload = widgets.FileUpload(accept=".csv")
api_key = widgets.Text(
    value="",
    placeholder="Type provided API key",
    description="API key:",
    disabled=False,
)
experiment = widgets.Text(
    value="",
    placeholder="Name of experiment or asset",
    description="Experiment:",
    disabled=False,
)
out = Output()


def load_extract_transfer(value, api_key, timezone="-07:00"):
    r = send_omf_message("type", omf_number_type, api_key)
    if r.status_code != 200:
        if r.status_code == 401:
            out.append_stdout(f"@@ please correct API key: current key is {api_key}\n")
        else:
            out.append_stdout(
                f"\n\n !#!# error with type definition: status={r.status_code}\n\n >>> {r.text}\n\n"
        )
        return "@@error with OMF type or API key, reload URL"
    else:
        out.append_stdout(">> OMF type definition OK\n")

    # print(f"type status {r.status_code} \n ==> len={len(r.text)}: {r.text}")
    asset = experiment.value
    if len(asset) == 0:
        out.append_stdout("@@ experiment/asset name should not be empty\n")
        return "error: experiment/asset name is empty"
    file_key = list(value.keys())[0]
    out.append_stdout(f">> Filename: {file_key} " + "\n")
    data = value[file_key]["content"].decode("utf-8")
    df = pd.read_csv(io.StringIO(data))

    fixed_column_names = [x.replace(".", "_") for x in list(df.columns)]
    df.columns = fixed_column_names
    # out.append_stdout(f"\n\n @@Columns: {fixed_column_names}\n")
    buf = io.StringIO()
    df.info(buf=buf)
    out.append_stdout(
        f"\n------ Info about Dataframe built from CSV ------- \n"
        + str(buf.getvalue())
        + "\n\n"
    )
    #
    # First two columns for Utah are Date and Time, so skip them
    #
    containers = [
        omf_container(asset, sensor, omf_number_typeid)
        for sensor in list(df.columns)[2:]
    ]
    r = send_omf_message("container", containers, api_key)  # , out=out)
    if r.status_code != 200:
        out.append_stdout(
            f"\n\n !#!# error with column definition: status={r.status_code}\n\n >>> {r.text}\n\n"
        )
        return "@@error with OMF containers"
    else:
        out.append_stdout(">> column definitions OK\n")

    count = 0
    out.append_stdout(">> processing row: ")
    row_batch_data = []
    for r in df.itertuples():
        row_dict = r._asdict()
        #
        # Time format is HH:MM:SS:ss, needs to be HH:MM:SS.ss
        #
        last_colon = r.Time.rfind(":")
        new_time = r.Time[:last_colon] + "." + r.Time[last_colon + 1 :]
        # out.append_stdout(f"new_time = {new_time}\n")
        t = parser.parse(r.Date + " " + new_time + " " + timezone)
        # out.append_stdout(f"## {t.astimezone(UTC).isoformat()} \n")

        row_omf_data = [
            omf_data(asset, sensor, t.astimezone(UTC).isoformat(), row_dict[sensor])
            for sensor in list(df.columns)[2:]
        ]
        # out.append_stdout(f"row omf data: {row_omf_data}\n")
        count += 1
        row_batch_data.extend(row_omf_data)
        if count % row_batch_size == 0:
            out.append_stdout(f"[{count}]")
            r = send_omf_message("data", row_batch_data, api_key)  # , out=out)
            row_batch_data = []
            if r:
                if r.status_code != 200:
                    out.append_stdout(
                        f"\n\n !#!# error with row #{count}: status={r.status_code}\n\n >>> {r.text}\n\n"
                    )
    out.append_stdout(f"[last rows {len(row_batch_data)}]\n")
    if len(row_batch_data) > 0:
        r = send_omf_message("data", row_batch_data, api_key)  # , out=out)
        if r.status_code != 200:
            out.append_stdout(
                f"\n\n !#!# error with last rows #{count}: status={r.status_code}\n\n >>> {r.text}\n\n"
            )
        else:
            out.append_stdout(
                f"\n\nLoading-Extract-Transfer to Hub done, status OK, #rows = {count}\n"
            )
    else:
        out.append_stdout("[no more data]\n")

    return f"OK {file_key}"


async def f():
    out.append_stdout("Click *Upload* button to select CSV file for transfer to Hub \n")
    x = await wait_for_change(upload, "value")
    out.append_stdout(
        "working on load, extract and tranfer... "
        + str(list(upload.value.keys())[0])
        + "\n"
    )
    status = load_extract_transfer(upload.value, api_key=api_key.value)
    out.append_stdout(f"Upload status {status}\n")


asyncio.ensure_future(f())

display(api_key)
display(experiment)
display(upload)
display(out)

Text(value='', description='API key:', placeholder='Type provided API key')

Text(value='', description='Experiment:', placeholder='Name of experiment or asset')

FileUpload(value={}, accept='.csv', description='Upload')

Output()