# Check the Publisher Endpoint

In [1]:
from rsplib import RSPEngine, RSPPublisher, Stream, rdf_table,load_graph, accessURL, Task, URIRef

In [2]:
streamhub = RSPPublisher("http://streamhub:9292/streamhub")

## Publication Query - SPARQL-like Syntax

In [3]:
vocals = '''
BASE  <http://streamhub:9292/streamhub>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX dcat: <http://www.w3.org/ns/dcat#>
PREFIX frmt: <http://www.w3.org/ns/formats/>
PREFIX vocals: <http://w3id.org/rsp/vocals#>
PREFIX vsd: <http://w3id.org/rsp/vocals-sd#>
REGISTER STREAM :pizzastream
FROM SOURCE <ws://pizzastream:8080>
WHERE {

           {this} a vocals:StreamDescriptor .

           {publisher} a vsd:PublishingService ;
                         vsd:hasFeature vsd:replaying ;
                         vsd:resultFormat frmt:JSON-LD .

           :PizzaEndpoint a vocals:StreamEndpoint ;
                            dcat:title "A Pizza Stream Endpoint"^^xsd:string ;
                            dcat:description "Streaming endpoint to consume Pizza via WebSocket"^^xsd:string ;
                            dcat:license <https://creativecommons.org/licenses/by-nc/4.0/> ;
                            dcat:format frmt:JSON-LD ;
                            dcat:accessURL {source} ;
                            vsd:publishedBy {publisher} .

           {stream} a vocals:RDFStream ;
                    dcat:title "Pizza Stream"^^xsd:string ;
                    dcat:description "Stream of pizza"^^xsd:string ;
                    dcat:publisher {publisher} ;
                    dcat:landingPage <https://example.org/rw/pizza/> ;
                    vocals:hasEndpoint :PizzaEndpoint  .

}'''

In [4]:
streamhub.publish(vocals)

{
    "@id": "streams/pizzastream",
    "@type": "vocals:StreamDescriptor",
    "dcat:dataset": {
        "@id": "streams/pizzastream",
        "@type": "vocals:RDFStream",
        "dcat:description": "Stream of pizza",
        "dcat:landingPage": "https://example.org/rw/pizza/",
        "dcat:publisher": "http://streamhub:9292/streamhub",
        "dcat:title": "Pizza Stream",
        "vocals:hasEndpoint": [
            [
                {
                    "@id": "http://streamhub:9292/streamhub/PizzaEndpoint",
                    "@type": "http://w3id.org/rsp/vocals#StreamEndpoint",
                    "dcat:title": "A Pizza Stream Endpoint",
                    "dcat:description": "Streaming endpoint to consume Pizza via WebSocket",
                    "dcat:license": "https://creativecommons.org/licenses/by-nc/4.0/",
                    "dcat:format": "http://www.w3.org/ns/formats/JSON-LD",
                    "dcat:accessURL": "ws://pizzastream:8080",
                    "vsd:pu

## Checking if there is any stream

In [5]:
streamhub.lists()

[http://streamhub:9292/streamhub/streams/pizzastream]

In [6]:
streamhub.lists()[0].endpoints()

[GET ws://pizzastream:8080]

## Getting the stream just registered

In [7]:
pizzastream = streamhub.lists()[0].endpoints()[0]

In [8]:
pizzastream.call()



{
    "@context": {
        "pizza": "http://www.co-ode.org/ontologies/pizza/pizza.owl#",
        "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
        "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
    },
    "@id": "668dd661-a305-4285-98d2-c091ca1f467a",
    "@type": "pizza:Pepperoni"
}

## Setup jasper

In [9]:
jasper = RSPEngine("http://jasper:8181/jasper")

In [10]:
jasper.streams()

[]

## Simple select query

In [11]:
qid  = "simple"
tbox = "http://notebook:8888/files/pizza.owl"
frmt = "JSON"
body ="""
PREFIX : <http://www.co-ode.org/ontologies/pizza/pizza.owl#>
SELECT ?p
FROM NAMED WINDOW <pw> ON <http://streamhub:9292/streamhub/streams/pizzastream> [RANGE PT15S STEP PT5S]
WHERE {
    WINDOW <pw> {
        ?p a :NamedPizza .
    }
}
"""

In [12]:
query = jasper.create(qid, body, tbox, frmt)

In [13]:
query.sources()

[GET http://streamhub:9292/streamhub/streams/pizzastream]

In [14]:
r = jasper.expose(qid, 'WEBSOCKET')

In [15]:
jasper.streams()

[http://streamhub:9292/streamhub/streams/pizzastream,
 http://jasper:8181/jasper/streams/simple]

In [16]:
resp = r.endpoints()[0].call()
resp



{
    "p": "http://jasper:8181/68eb16f9-88d3-4b29-a89c-e1b5ffa83fe6",
    "eventTime": 1589467675950,
    "processingTime": 1589467675952
}

## Join uning Ontop SPARQL Endpoint

In [17]:
qid  = "complex"
tbox = "http://notebook:8888/files/pizza.owl"
frmt = "JSON"
body ="""
PREFIX : <http://www.co-ode.org/ontologies/pizza/pizza.owl#>
PREFIX cook: <http://linkeddata.stream/ontologies/cooking#>
SELECT ?p ?t
FROM NAMED WINDOW <pw> ON <http://streamhub:9292/streamhub/streams/pizzastream> [RANGE PT15S STEP PT5S]
WHERE {
    WINDOW <pw> {
        ?p a :Margherita .
    }
    SERVICE <http://ontop:8080/sparql> {
        :Margherita cook:tempAvg ?t .
    }  
}
"""

In [18]:
query = jasper.create(qid, body, tbox, frmt)

In [19]:
query.sources()

[GET http://streamhub:9292/streamhub/streams/pizzastream]

In [20]:
r = jasper.expose(qid, 'WEBSOCKET')

In [21]:
jasper.streams()

[http://streamhub:9292/streamhub/streams/pizzastream,
 http://jasper:8181/jasper/streams/simple,
 http://jasper:8181/jasper/streams/complex]

In [None]:
resp = r.endpoints()[0].call()
resp

## Deleting the stream registered