In [1]:
# Storage SC

In [2]:
import logging

import smartpynector as sp
from utils import *

In [3]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("StorageSC")

In [4]:
# Constants
READ_URL = "https://graphdb.odissei.nl/repositories/MateuszTest"
WRITE_URL = "https://graphdb.odissei.nl/repositories/MateuszTest/statements"

THERMOSTAT_API_URL = "http://0.0.0.0:8001/thermostat"
PREFIXES = {
    "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
    "saref": "https://w3id.org/saref#",
    "xsd": "http://www.w3.org/2001/XMLSchema#",
}

MEAS_GRAPH_PATTERN = """?meas rdf:type saref:Measurement .
                        ?meas saref:hasValue ?temp .
                        ?meas saref:isMeasuredIn saref:TemperatureUnit .
                        ?meas saref:hasTimestamp ?timestamp .
                        ?meas saref:isMeasurementOf ?room_id .
                        ?meas saref:relatesToProperty saref:Temperature .
                        ?meas saref:measurementMadeBy ?device_id ."""

COMMAND_GRAPH_PATTERN = """?command rdf:type saref:GetMeterHistoryCommand .
                        ?command saref:hasTimestamp ?timestamp .
                        ?command saref:hasValue ?temp .
                        ?command saref:isCommandOf ?device_id .
                        ?command saref:hasTimestampStart ?startTimestamp .
                        ?command saref:hasTimestampEnd ?endTimestamp .
                        ?command saref:relatesToProperty saref:Temperature ."""

In [5]:
from typing import Dict, List


def extract_timestamp_and_temp(json_obj: Dict, q_binding) -> List[Dict[str, str]]:
    measurements = []
    for binding in json_obj["results"]["bindings"]:
        timestamp = binding["timestamp"]["value"]
        temp = binding["temp"]["value"]
        measurements.append(
            {
                "timestamp": f'"{timestamp}"',
                "temp": temp,
            }
        )
    result = [
        {
            "command": q_binding["command"],
            "device_id": q_binding["device_id"],
            "startTimestamp": q_binding["startTimestamp"],
            "endTimestamp": q_binding["endTimestamp"],
        },
        *measurements,
    ]
    return result

In [6]:
# Return measurements in datetime range from GraphDB
def handle_answer_measurements(query_bindings):
    for binding in query_bindings:
        logger.info(f"Answer query bindings: {binding}")

        start_timestamp = binding["startTimestamp"]
        end_timestamp = binding["endTimestamp"]
        sparql_query = f"""PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
                            PREFIX saref: <https://w3id.org/saref#>
                            PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>

                            SELECT *

                            WHERE {{
                                ?meas rdf:type saref:Measurement .
                                ?meas saref:hasValue ?temp .
                                ?meas saref:isMeasuredIn saref:TemperatureUnit .
                                ?meas saref:hasTimestamp ?timestamp .
                                FILTER (xsd:dateTime(?timestamp) >= "{start_timestamp}"^^xsd:dateTime && xsd:dateTime(?timestamp) <= "{end_timestamp}"^^xsd:dateTime)
                                ?meas saref:isMeasurementOf ?room_id .
                                ?meas saref:relatesToProperty saref:Temperature .
                                ?meas saref:measurementMadeBy ?device_id .
                                }}"""
        # logger.info(sparql_query)
        res = sp.run_sparql_query(READ_URL, sparql_query)
        result = extract_timestamp_and_temp(res, binding)
        # logger.info(res)
        logger.info(result)

    return result


def handle_react_measurements(bindings):
    sp.store_data_in_graphdb(
        graph_pattern=MEAS_GRAPH_PATTERN,
        binding_set=bindings,
        prefixes=PREFIXES,
        read_url=READ_URL,
        write_url=WRITE_URL,
    )
    for binding in bindings:
        logger.info(f"Saving measurement {binding['meas']}")
    return []

In [7]:
def start_storage_kb(kb_id, kb_name, kb_description, ke_endpoint):
    register_knowledge_base(kb_id, kb_name, kb_description, ke_endpoint)

    answer_measurements_ki = register_answer_knowledge_interaction(
        COMMAND_GRAPH_PATTERN,
        "answer-measurements",
        kb_id,
        ke_endpoint,
        PREFIXES,
    )

    react_measurements_ki = register_react_knowledge_interaction(
        MEAS_GRAPH_PATTERN,
        None,
        "react-measurements",
        kb_id,
        ke_endpoint,
        PREFIXES,
    )

    start_handle_loop(
        {
            answer_measurements_ki: handle_answer_measurements,
            react_measurements_ki: handle_react_measurements,
        },
        kb_id,
        ke_endpoint,
    )

In [8]:
start_storage_kb(
    "http://example.org/storage",
    "Storage",
    "GraphDB smart connector",
    "http://knowledge_engine:8280/rest/",
)

2023-06-20 23:20:03 INFO registered Storage
2023-06-20 23:20:03 INFO received issued knowledge interaction id: http://example.org/storage/interaction/answer-measurements
2023-06-20 23:20:03 INFO received issued knowledge interaction id: http://example.org/storage/interaction/react-measurements
2023-06-20 23:20:19 INFO Answer query bindings: {'device_id': '"http://0.0.0.0:8001/thermostat/devices/1"', 'endTimestamp': '2023-06-20T23:20:09+00:00', 'startTimestamp': '2023-06-20T21:55:55+00:00', 'command': '<http://0.0.0.0:8001/thermostat/command/641a977a-f897-4775-8971-042c47878786>'}
2023-06-20 23:20:19 INFO [{'command': '<http://0.0.0.0:8001/thermostat/command/641a977a-f897-4775-8971-042c47878786>', 'device_id': '"http://0.0.0.0:8001/thermostat/devices/1"', 'startTimestamp': '2023-06-20T21:55:55+00:00', 'endTimestamp': '2023-06-20T23:20:09+00:00'}, {'timestamp': '"2023-06-20T21:55:56+00:00"', 'temp': '13'}, {'timestamp': '"2023-06-20T21:56:02+00:00"', 'temp': '15'}, {'timestamp': '"2023-0

AssertionError: 