# Chapter 4 : Datasources

Dealing with datasources is the most complex part because datasources are versatile :
- Different content formats : plain text, json, ...
- Different protocols UDP, FTP, HTTP, ...
- Request (request the datasource aka client mode) or Being requested (server mode).

## Dive into the Source class

The Source class aims at providing a common interface for all datasources.<p>
As datasources have very little in common, the only assumption made by the framework is :<br>

**Every datasource is iterable**<br>

In practical terms, a Source uses a Python generator.<br>
The *pyngsi.source* package offers many generic sources, and it's easy to create your custom Source by extending the Source class.<p>

A Source iterates over rows.<br>
Rows are composed of two parts :
- the record : the incoming content itself (the payload)
- the content provider : just a string that reminds the origin of the row

Theses 2 points - the iterable sources and the row definition - are the basement of the framework.<br>
This common interface will allow us to create agents that will use our Sources, as we have seen in the previous chapter.

### The Row

In [None]:
from pyngsi.source import Row

help(Row.__init__)

### Sources provided by the framework

In [None]:
import pyngsi.source

print([x for x in dir(pyngsi.source) if x.startswith("Source")])

- Source is the Source base class

- SourceSampleOrion is the Source dedicated to the tutorial

- SourceStdin takes incoming data from standard input

- SourceFile takes incoming data from a local file (supports zip & gzip compression)

- SourceJson takes JSON incoming data, from stdin or from a file (supports zip & gzip compression).<br>
If the incoming JSON is a JSON Array then SourceJson iterates over the array

- SourceIter takes incoming data from any Python Sequence argument (list, tuple, ...) provided to the constuctor

- SourceSingle takes incoming data from the argument provided to the constuctor


## Example 1 : Process a local file

Here the Source takes incoming data from a compressed JSON file.<br>


As the JSON is an array, the Source iterates over each row of the JSON Array.<br>
The provider is filled with the name of the file.

In [None]:
from pyngsi.source import Source

# returns a SourceJson guessed from the extension
src = Source.create_source_from_file("files/colors.json.gz")

for row in src:
    print(row)

## Example 2 : Process FTP files

A quite complex example made easy.<p>

The FTP server used in the example serves RFC files.<br>
Each RFC comes with a brief description in a JSON file.<br>
Have a look at rfc959.json in the *files/* folder.<p>

We will output NGSI entities to Orion with a great datamodel exposing the title, the date of publication and the number of pages.<br>
We will focus only on RFC958 and RFC2228 speaking about the FTP protocol.

### Define our datamodel

In [5]:
from datetime import datetime

from pyngsi.source import Row
from pyngsi.ngsi import DataModel

def build_entity(row: Row) -> DataModel:
    rfc = row.record
    m = DataModel(id=rfc["doc_id"], type="RFC")
    m.add("dataProvider", row.provider)
    m.add("title", rfc["title"])
    m.add("publicationDate", datetime.strptime(rfc["pub_date"], "%B %Y"))
    m.add("pageCount", int(rfc["page_count"]))
    return m

### Let's use our datamodel in our Agent

In [8]:
from pyngsi.source import SourceFtp
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent

# help(SourceFtp) for more info
src = SourceFtp("ftp.ps.pl", paths=[
    "/pub/rfc"], f_match=lambda x: x.endswith("rfc958.json") or x.endswith("rfc2228.json"))

# if you have an Orion server available, just replace SinkStdout() with SinkOrion()
sink = SinkStdout()

# the source has auto-detected that we deal with JSON files, hence has parsed json for us
agent = NgsiAgent.create_agent(src, sink, process=build_entity)
agent.run()

# resources are freed
# here the agent removes the temporary directory (where files were downloaded).
agent.close()

# get statistics
print(agent.stats)



2020-03-27 11:38:25.977 | DEBUG    | pyngsi.ftpclient:__init__:22 - Connect to FTP server
2020-03-27 11:38:27.881 | INFO     | pyngsi.source:_retrieve_filelist:308 - Found 2 matching files
2020-03-27 11:38:27.882 | DEBUG    | pyngsi.ftpclient:download:46 - Download file /pub/rfc/rfc2228.json
2020-03-27 11:38:28.666 | DEBUG    | pyngsi.ftpclient:download:46 - Download file /pub/rfc/rfc958.json
2020-03-27 11:38:29.492 | DEBUG    | pyngsi.ftpclient:close:57 - Disconnect from FTP server
2020-03-27 11:38:29.591 | INFO     | pyngsi.agent:__init__:110 - init NGSI agent
2020-03-27 11:38:29.593 | INFO     | pyngsi.agent:__init__:112 - source = [SourceFtp]
2020-03-27 11:38:29.595 | INFO     | pyngsi.agent:__init__:114 - sink = [SinkStdout]
2020-03-27 11:38:29.597 | INFO     | pyngsi.agent:run:123 - start to acquire data
2020-03-27 11:38:29.599 | INFO     | pyngsi.source:__iter__:296 - process local /tmp/tmptzzan6uh/rfc2228.json
2020-03-27 11:38:29.605 | INFO     | pyngsi.source:__iter__:296 - pr

## Example 3 : Expose a REST API (Server mode)

This time the Agent doesn't get incoming data from the datasource.<p>

In server mode, the Agent is requested by the datasource.<br>
From the datasource point of view we could also call that push mode.<p>

In this example our temperature/pressure sensors store measures locally and a JSON file is sent periodically to the Agent.

### Define your datamodel

In [9]:
from pyngsi.source import Row
from pyngsi.ngsi import DataModel

def build_entity(row: Row) -> DataModel:
    r = row.record
    m = DataModel(id=r["room"], type="Room")
    m.add("temperature", r["temperature"])
    m.add("pressure", r["pressure"])
    return m

### Let's use our datamodel in our Agent

In [None]:
from pyngsi.server import ServerHttpUpload
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent

# help(ServerHttpUpload) for more info
src = ServerHttpUpload()

# if you have an Orion server available, just replace SinkStdout() with SinkOrion()
sink = SinkStdout()

# the agent processes JSON content received from the client
agent = NgsiAgent.create_agent(src, sink, process=build_entity)

# You must push data to the source, here we send POST requests to the server
# For example, in a bash shell, type in :
# curl -X POST -H "Content-Type: application/json" -d '{"room":"Room1","temperature":23.0,"pressure":710}' http://127.0.0.1:8080/upload
# You could also send a JSON Array. For example, type in :
# curl -X POST -H "Content-Type: application/json" -d '[{"room":"Room1","temperature":23.0,"pressure":710},{"room":"Room2","temperature":21.0,"pressure":711}]' http://127.0.0.1:8080/upload
# You can also send a file, the NGSI datasource provider is set as the filename
# curl -v -F file=@test.json http://127.0.0.1:8080/upload
# CTRL-C to stop the server
agent.run()

# The agent provides global statistics on its execution
agent.close()

## Our first custom Source

In [None]:
from pyngsi.source import Source

class CustomSource(Source): 
    def __init__(self, rooms): 
        self.rooms = rooms 
        
    def __iter__(self): 
        for record in self.rooms: 
            yield Row("custom", record)

Let's use it

In [None]:
# our CSV lines
rooms = ["Room1;23;720", "Room2;21;711"]

# init the source
src = CustomSource(rooms)

# consume the source and print rows
for row in src:
    print(row)

Your custom Source will rely on the same principles.<br>
The only difference is that you will have to focus on how to acquire your own data.