In [123]:
# Simple thermostat API using FastAPI
# importing libraries
from fastapi import FastAPI, Depends
import uvicorn
import nest_asyncio
from pydantic import BaseModel, AnyUrl

app = FastAPI()


In [124]:
class Thermostat(BaseModel):
    name: str
    desired_temperature: int = 0
    actual_temperature: int = 0
    humidity: int = 0
    mode: str = "heat"
    kb_id: AnyUrl


In [125]:
# Creating a thermostat object
thermostat = Thermostat(name="Thermostat",
                        desired_temperature=20,
                        #actual_temperature=18,
                        humidity=50,
                        mode="heat",
                         kb_id="http://127.0.0.1:8000/thermostat",
                        )

In [126]:
thermostat.dict()

{'name': 'Thermostat',
 'desired_temperature': 20,
 'actual_temperature': 0,
 'humidity': 50,
 'mode': 'heat',
 'kb_id': AnyUrl('http://127.0.0.1:8000/thermostat', scheme='http', host='127.0.0.1', host_type='ipv4', port='8000', path='/thermostat')}

In [127]:
# Defining the API endpoints
@app.get("/")
async def root():
    return {"message": "Welcome to the thermostat API"}


@app.get("/thermostat")
async def get_thermostat() -> Thermostat:
    return thermostat


@app.get("/thermostat/name")
async def get_name():
    return thermostat.name


@app.get("/thermostat/actual_temperature")
async def get_actual_temperature() -> int:
    return thermostat.desired_temperature


@app.get("/thermostat/desired_temperature")
async def get_desired_temperature() -> int:
    return thermostat.desired_temperature


@app.get("/thermostat/humidity")
async def get_humidity():
    return thermostat.humidity


@app.get("/thermostat/mode")
async def get_mode():
    return thermostat.mode


@app.get("/thermostat/kb_id")
async def get_kb_id():
    return thermostat.kb_id


@app.put("/thermostat")
async def set_thermostat(commons: Thermostat = Depends()) -> Thermostat:
    thermostat.__dict__.update(commons.__dict__)
    return thermostat


@app.put("/thermostat/desired_temperature")
async def set_desired_temperature(desired_temperature: int) -> int:
    thermostat.desired_temperature = desired_temperature
    return thermostat.desired_temperature


@app.put("/thermostat/humidity")
async def set_humidity(humidity: int):
    thermostat.humidity = humidity
    return thermostat.humidity


@app.put("/thermostat/mode")
async def set_mode(mode: str):
    thermostat.mode = mode
    return thermostat.mode


@app.put("/thermostat/name")
async def set_name(name: str):
    thermostat.name = name
    return thermostat.name


@app.put("/thermostat/kb_id")
async def set_kb_id(kb_id: str):
    thermostat.kb_id = kb_id
    return thermostat.kb_id


In [61]:
# Defining constants

# Knowledge engine REST API URL
URL = "http://127.0.0.1:8280/rest"

# Knowledge engine REST API headers
HEADERS = {
    'Content-Type': 'application/json',
    'Knowledge-Base-Id': thermostat.kb_id,
}


In [62]:
# register API in the knowledge engine
import httpx


# Creating a smart connector
def create_smart_connector():
    # Smart connector data
    sc_data = {
        "knowledgeBaseId": thermostat.kb_id,
        "knowledgeBaseName": thermostat.name,
        "knowledgeBaseDescription": "Thermostat API v0.0.1 for the Smart Home",
        "reasonerEnabled": "false",
    }

    # Registering the smart connector via the /sc endpoint
    response = httpx.post(URL + "/sc", headers=HEADERS, json=sc_data)

    print(response.status_code)
    print(response.text)


create_smart_connector()

200



In [63]:
# check if the smart connector is registered
def check_smart_connector():
    response = httpx.get(URL + "/sc", headers=HEADERS)
    print(response.status_code)
    print(response.text)


check_smart_connector()


200
[{"knowledgeBaseId":"http://127.0.0.1:8000/thermostat","knowledgeBaseName":"Thermostat","knowledgeBaseDescription":"Thermostat API v0.0.1 for the Smart Home","reasonerEnabled":false}]


In [64]:
# register an ANSWER Knowledge Interaction with the smart connector
def register_answer_ki():
    answer_ki_data = {
        "knowledgeInteractionType": "AnswerKnowledgeInteraction",
        "prefixes": {"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
                     "saref": "https://w3id.org/saref#",
                     },

        "graphPattern": '''?meas rdf:type saref:Measurement .
                            ?meas saref:hasValue ?temp .
                            ?meas saref:isMeasuredIn saref:TemperatureUnit .
                            ?meas saref:hasTimestamp ?timestamp .
                            ?meas saref:isMeasurementOf ?room_id .
                            ?meas saref:relatesToProperty saref:Temperature .
                            ?meas saref:measurementMadeBy ?device_id .'''
    }

    response = httpx.post(URL + "/sc/ki", headers=HEADERS, json=answer_ki_data)
    print(response.status_code)
    print(response.text)


register_answer_ki()


200
{"knowledgeInteractionId":"http://127.0.0.1:8000/thermostat/interaction/489aa242-b1e5-4309-9a1d-c464ff068095"}


In [65]:
# check if the Knowledge Interaction is registered
def list_all_kis():
    response = httpx.get(URL + "/sc/ki", headers=HEADERS)
    return response.json()


list_all_kis()

[{'knowledgeInteractionId': 'http://127.0.0.1:8000/thermostat/interaction/489aa242-b1e5-4309-9a1d-c464ff068095',
  'knowledgeInteractionType': 'AnswerKnowledgeInteraction',
  'communicativeAct': {'requiredPurposes': ['https://w3id.org/knowledge-engine/InformPurpose'],
   'satisfiedPurposes': ['https://w3id.org/knowledge-engine/InformPurpose']},
  'graphPattern': '?meas <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://w3id.org/saref#Measurement> . ?meas <https://w3id.org/saref#hasValue> ?temp . ?meas <https://w3id.org/saref#isMeasuredIn> <https://w3id.org/saref#TemperatureUnit> . ?meas <https://w3id.org/saref#hasTimestamp> ?timestamp . ?meas <https://w3id.org/saref#isMeasurementOf> ?room_id . ?meas <https://w3id.org/saref#relatesToProperty> <https://w3id.org/saref#Temperature> . ?meas <https://w3id.org/saref#measurementMadeBy> ?device_id . '}]

In [66]:
# Start waiting for a handle request for the given Knowledge Base Id.
def handle_and_answer():
    response = httpx.get(URL + "/sc/handle", headers=HEADERS, timeout=None)
    print(response.status_code)
    print(response.text)
    ki_id = list_all_kis()[-1]['knowledgeInteractionId']
    answer_headers = {
        'Content-Type': 'application/json',
        'Knowledge-Base-Id': thermostat.kb_id,
        'Knowledge-Interaction-Id': ki_id,
    }
    handle_request_id = response.json()["handleRequestId"]
    answer_binding_set = [{
        "meas": "<https://www.example.org/meas1>",
        "temp": f'"{thermostat.temperature}"',
        "timestamp": '"2016-12-01T15:31:10-05:00"',
        "room_id": '"1"',
        "device_id": '"id1"',
    }]

    # Answering the handle request
    answer_data = {
        "handleRequestId": handle_request_id,
        "bindingSet": answer_binding_set,
    }

    response = httpx.post(URL + "/sc/handle", headers=answer_headers, json=answer_data)
    print(response.status_code)
    print(response.text)


handle_and_answer()

200
{"knowledgeInteractionId":"http://127.0.0.1:8000/thermostat/interaction/489aa242-b1e5-4309-9a1d-c464ff068095","handleRequestId":1,"bindingSet":[],"requestingKnowledgeBaseId":"http://127.0.0.1:8000/subscriber"}
200



In [128]:
# Running the API
if __name__ == "__main__":
    nest_asyncio.apply()
    #uvicorn.run(app, host="0.0.0.0", port=8001)
    uvicorn.run(app)

INFO:     Started server process [12495]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:34260 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:34260 - "GET /openapi.json HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [12495]
