# Part 1 : RDF Stream Processing with CSPARQL Engine
In this part of the tutorial, we will see how to query  RDF Streams using an RDF Stream Processing (RSP) engine.
The **CSPARQL engine** is an RSP engine that does not offer reasoning capabilities.
Although extensions have been proposed, in this part of the tutorial we 
focus on plain **continuous** query answering.

## RDFLib
RSPLib is the python library that we are going to use in our tutorial.
It offers abstrations to manipulate RSPSources, 
e.g. RDF Streams and to interact with RSP Engines, e.g.
registering streams and queries.

RSPLib APIs are still under development. In this tutorial we are going to use version 0.3.4 which
is available at https://pypi.python.org/pypi/rsplib/.

to upgrade it just type !pip instal rsplib --upgrade in a cell

source code available at https://github.com/streamreasoning/rsplib

In [2]:
from rsplib.processing import RSPSource, RSPEngine

## RDF Streams with TripleWave
[TripleWave](https://github.com/streamreasoning/triplewave) is a resource designed to
publish RDF Stream on the web. 
RSPLab integrates TripleWave as a default tool for stream provisioning.

**Did you already started the streams in the `streamer/citybench` folder?**

Notably, since we are deal with **replayed** RDF streams, we need to start them using a specific API.
Wild Streams, i.e. those naturally available on the web like WikiPedia changes), do not
require to be started.

In [5]:
stream0.start()

{'msg': 'streamCreated'}

In [None]:
stream1.start()

Now that we started our streams, we can consider them as wild RDF streams that we can consume and query using an RSP Engine.

RDFLib abstracts RDF Streams a **RSPSource**. Using this Facace is possible to access the stream description published
using TripleWave: the **sGragh**

In [33]:
stream0 = RSPSource("http://aarhustrafficdata182955", 4000);
stream0.sgraph()

{'@context': {'generatedAt': {'@id': 'http://www.w3.org/ns/prov#generatedAtTime',
   '@type': 'http://www.w3.org/2001/XMLSchema#dateTime'},
  'sld': 'http://streamreasoning.org/ontologies/SLD4TripleWave#'},
 '@id': 'tr:sGraph',
 'sld:contains': {'@list': [{'@id': 'http://aarhustrafficdata182955:4000/20784581-ETc1c2bfa9d858284d55495dedf00b0bbf',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:42:26.670+0000'}},
   {'@id': 'http://aarhustrafficdata182955:4000/20784581-CL480af360b106131076ade0499ab7eab4',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:42:26.675+0000'}},
   {'@id': 'http://aarhustrafficdata182955:4000/20784581-AS92a4ebbf3b1ec8d6acf6ae9e16a412fc',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:42:26.835+0000'}},
   {'@id': 'http://aarhustrafficdata182955:4000/20784581-VC61882443e514edab1296a9b26122edfe',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:42:26.972+0000

In [34]:
stream1 = RSPSource("http://aarhustrafficdata158505", 4001);
stream1.sgraph()

{'@context': {'generatedAt': {'@id': 'http://www.w3.org/ns/prov#generatedAtTime',
   '@type': 'http://www.w3.org/2001/XMLSchema#dateTime'},
  'sld': 'http://streamreasoning.org/ontologies/SLD4TripleWave#'},
 '@id': 'tr:sGraph',
 'sld:contains': {'@list': [{'@id': 'http://aarhustrafficdata158505:4001/20782963-ET1657bf8575dd719875339ea15a815e8d',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:41:56.431+0000'}},
   {'@id': 'http://aarhustrafficdata158505:4001/20782963-ASc286221da954320a2674dc5bcfd94a85',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:41:56.436+0000'}},
   {'@id': 'http://aarhustrafficdata158505:4001/20782963-VCb787fa20e0f9ca22a68169db58ff344b',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:41:56.574+0000'}},
   {'@id': 'http://aarhustrafficdata158505:4001/20782963-CLa151656fac5864feb0060a9f5c64afe4',
    'generatedAt': {'@type': 'xsd:dateTime',
     '@value': '2017-11-04T17:41:57.172+0000

Are you curious to see what's is inside the RDF Streams?

You might have noticed that the **sGraph** has a *sld:contains* property that contains the uris of
the stream items, called **iGraphs**.

However, in order to access the content of the iGraphs one has to de-reference their URIs.

Click on the following link and let's see how to consume the content in a push-based manner as RSP engine do.

http://localhost:8888/notebooks/work/streamapp/Observing%20The%20Stream.ipynb

# RSP Engine: CSPARQL
Now that we have our RDF Stream running, we can start thinking about how to query them.
To this extent, we are going to use the CSPARQL engine.

**Did you started csparql in the consumer folder?**

RSPLib offers a facade to communicate with RSP engine via the RSP Services (a RESTful interface for RSPs).


In [27]:
csparql = RSPEngine("http://csparql", 8182);
csparql.status()

{"name":"csparql","host":"csparql/","runUUID":"27457187-c185-11e7-9837-1a00a83a5601","port":8182,"empty_results":false,"inference":false,"timestam_function":false,"backloop":false,"num_streams":0,"num_datasets":0,"num_queries":0}


{'backloop': False,
 'empty_results': False,
 'host': 'csparql/',
 'inference': False,
 'name': 'csparql',
 'num_datasets': 0,
 'num_queries': 0,
 'num_streams': 0,
 'port': 8182,
 'runUUID': '27457187-c185-11e7-9837-1a00a83a5601',
 'timestam_function': False}

## Registering the Streams

A first important step to answer queries over streams is the registration.
As we saw during the initial part of this tutorial, Streams are unbounded sequences of
data. Therefore, in order to access them we need special systems such as RSP engines.

The RSP engines needs receive the data in a push-based manned.
This process is initialized registering the streams to the engine.


In [28]:
csparql.register_stream("AarhusTrafficData182955", "http://aarhustrafficdata182955:4000/sgraph")

"Stream AarhusTrafficData182955 succesfully registered with IRI http://aarhustrafficdata182955:4000/sgraph"


'Stream AarhusTrafficData182955 succesfully registered with IRI http://aarhustrafficdata182955:4000/sgraph'

### Register A Second Stream

In [29]:
csparql.register_stream("AarhusTrafficData158505", "http://aarhustrafficdata158505:4001/sgraph")

"Stream AarhusTrafficData158505 succesfully registered with IRI http://aarhustrafficdata158505:4001/sgraph"


'Stream AarhusTrafficData158505 succesfully registered with IRI http://aarhustrafficdata158505:4001/sgraph'

### Available Streams

In [23]:
 to access the stream andcsparql.streams()

[{"streamURL":"AarhusTrafficData158505","status":"RUNNING"},{"streamURL":"AarhusTrafficData182955","status":"RUNNING"}]


[{'status': 'RUNNING', 'streamURL': 'AarhusTrafficData158505'},
 {'status': 'RUNNING', 'streamURL': 'AarhusTrafficData182955'}]

## Register a Query

Now that our engine receives the data from the registered streams, we can issue a query to answer.

As we saw in the second part of this tutorial, queries over streams are different from queries
over static data: **they are continuous queries**

Continuous query answering produces results over time, therefore a query is **registered**
and evaluated again and again.

In the current example, we are using the CSPARQL engine to query our RDF Streams.
This particular engine consumes queries written in a continuous extension of
SPARQL called *CSPARQL*. An example of query looks like this:


In [30]:
with open('q1.csparql.txt', 'r') as csparq_query:
   body = csparq_query.read()
   print(body)

CONSTRUCT {?s ?p ?o }
FROM STREAM <AarhusTrafficData182955> [RANGE 3s STEP 1s] 
FROM STREAM <AarhusTrafficData158505> [RANGE 3s STEP 1s] 
WHERE 
    {
        ?s ?p ?o
        
}


In order to register our query, we have to give it a **name**, e.g. *givemeall* and decide its **type**.
The query type can be either **STREAM** or **QUERY**. 
The former produces a stream as a result, i.e. each output is timestamped, 
the latter simple push out the results.

This simple difference is actually very important, since an output **STREAM** can be consumed by another
RSP engine while a *QUERY* does not.


In [31]:
csparql.register_query("givemeall", "STREAM", body) 

{'queryBody': 'REGISTER STREAM givemeall AS CONSTRUCT {?s ?p ?o }\nFROM STREAM <AarhusTrafficData182955> [RANGE 3s STEP 1s] \nFROM STREAM <AarhusTrafficData158505> [RANGE 3s STEP 1s] \nWHERE \n    {\n        ?s ?p ?o\n        \n}'}
"Query givemeall succesfully registered"


'Query givemeall succesfully registered'

## Register Observer

A final step to our query answering is observing the results.
Since we registered our query as a type **STREAM** we require an observer
that is expose the results as an RDF Stream:


In [32]:
csparql.register_observer("givemeall", "default", {"host":"csparql","type":"ws","port":8283,"name":"default"})

"http://csparql:8182/queries/givemeall/observers/default"


'http://csparql:8182/queries/givemeall/observers/default'

**Are you courious how the output stream looks like?**

http://localhost:8888/notebooks/work/streamapp/Observing%20The%20Output%20-%20Part%201.ipynb

# Clean UP

In [35]:
csparql.unregister_observer("givemeall", "default")

"Observer default (ws://csparql:8283/givemeall) succesfully unregistered"


'Observer default (ws://csparql:8283/givemeall) succesfully unregistered'

In [36]:
csparql.unregister_query("givemeall")

"Query givemeall and stream csparql/queries/givemeall succesfully unregistered"


'Query givemeall and stream csparql/queries/givemeall succesfully unregistered'

In [37]:
csparql.unregister_stream("AarhusTrafficData158505")

"Stream AarhusTrafficData158505 succesfully unregistered"


'Stream AarhusTrafficData158505 succesfully unregistered'

In [38]:
csparql.unregister_stream("AarhusTrafficData182955")

"Stream AarhusTrafficData182955 succesfully unregistered"


'Stream AarhusTrafficData182955 succesfully unregistered'

In [40]:
csparql.status()

{"name":"csparql","host":"csparql/","runUUID":"27457187-c185-11e7-9837-1a00a83a5601","port":8182,"empty_results":false,"inference":false,"timestam_function":false,"backloop":false,"num_streams":0,"num_datasets":0,"num_queries":0}


{'backloop': False,
 'empty_results': False,
 'host': 'csparql/',
 'inference': False,
 'name': 'csparql',
 'num_datasets': 0,
 'num_queries': 0,
 'num_streams': 0,
 'port': 8182,
 'runUUID': '27457187-c185-11e7-9837-1a00a83a5601',
 'timestam_function': False}