In [1]:
import requests
import csv
from datetime import datetime
import os
import pandas as pd
import numpy as np
import warnings
import time
from tqdm import tqdm 
warnings.simplefilter(action='ignore', category=FutureWarning)

# ddi - Mini-Challenge zu LE3, NoSQL
Roman Studer, Simon Luder

## Use Case
<p align="center">
  <img src="./data/images/Konzept.png" alt="drawing" width="900"/>
</p>

Wir unterbreiten den Vorschlag eine NoSQL Datenbank (MongoDB) als Data Lake für die Speicherung von Time Series Daten zu verwenden und die Analyse besagter Daten auf einer Time-Series Datenbank durchzuführen. Die Analyse dieser Daten kann mittels einer auf Time Series optimierte Datenbank auf Abruf geschehen. Als Beispiel verwenden wir als Datenquelle openSenseMap, welche Messwerte und Sensormetainformationen über eine API zur Verfügung stellt. Ein aktiver Sensor sendet periodisch (je nach Sensor alle paar Sekunden oder Minuten) einen Messwert. Bei einem Intervall von 10 Sekunden sendet ein Sensor pro Jahr 3'153'600 Datenpunkte. 

OpenSenseMap erlaubt es ein ganzes Gebiet (Mittels Angabe von Breiten- und Längengrad) zu überwachen. Die Anzahl Sensoren, sowie deren Attribute kann sich über die Zeit ändern. Wenn zum Beispiel ein neuer Sensor im gleichen Gebiet in Betrieb genommen wird. Daher ist die Datenspeicherung in einer NoSQL, schemenlosen Datenbank geeignet. MongoDB ist dabei aufgrund des flexigblem Schema und einfacher horizontaler Skalierung gut geeignet. Dadurch sind wir auf Änderungen in den durch die API erhaltenen Attributen, sowie auf grosse Änderungen in der Datenmenge gewappnet. MongoDB ist allerdins nicht für die Analyse von Time Series geeignet. InfluxDB, eine Time Series Datenbank, ist für Time Series Daten optimiert und kann schnell Aggregationen über eine grosse Anzahl von Datenpunkten (über Timestamp Indexiert) durchführen. Ein Beispiel wäre ein Moving Average mit kleinem Fenster über mehrere Millionen Datenpunkte.

Weiter existiert ein MongoDB-Plugin auf Telegraf welche die Performance der MongoDB überwachen kann. Somit kann das Monitoring über Influx betrieben werden. 

Im Anschluss setzen wir sowohl eine MongoDB als auch eine InfluxDB auf. Über ein Script laden wir alle Sensordaten auf dem Gelände der ETH-Zürich, welche OpenSenseMap zur Verfügung stellt herunter und speichern diese in der MongoDB. Im Anschluss messen wir die Zeit für Aggregationen bei steigenden Datenpunkten einzeln für beide Datenbanken, sowie für den Fall wenn MongoDB als Datalake verwendet wird. Dadurch können wir einen Punkt identifizieren ab dem es nicht mehr sinnvoll ist nur mit einer MongoDB zu arbeiten, sondern die InfluxDB zur Analyse hinzuzuziehen.

### Vergleich MongoDB vs. InfluxDB

Einen Einblick in die beiden Datenbanken zu erhalten, erlaubt dass Dokument [1] "Benchmarking InfluxDB vs. MonoDB for Time Series Data, Metrics % Management" (siehe Anhang),

| Bereich  | InfluxDB  |MongoDB  | 
|---|---|---|
| **Verwendung** | Time-Series Datenbank | Dokumentenbasierte Datenbank, NoSQL Datenbank | 
| **Memory-Auslastung**  | ca 2-4 GB pro <100'000 Einträge [2]  | ca. 1 GB pro 100'000 Einträge [3] |
| **Festplattenauslastung über 24h** [1]  | 178 MB |  34820 MB  |
| **Abfragegeschwindigkeit** (Queries per Second, 1000 Einträge) [1] |  935 | 164  |
| **Einfügegeschwindigkeit** (Werte pro Sekunde) [1] | 2,800,990  | 1,114,616  |
| **Skalierbarkeit**  | Horizontal Skalierbar (Clustering) bei InfluxDB Enterprise [4] | Horizontal Skalierbar  |
| **Sicherheit**  |   |   |
| **Sprache**[1]  | C/C++  | Go |
| **Schema**[1]  | Schemaless   | Schemaless |

**Sicherheit:** MongoDB bietet diverse Features die die Sicherheit der Datenbank erhöhen können. Die Datenbank unterstützt under anderem mehrere Authentifizierungsmechanismen wie `SCRAM` ("Salted Challenge Response Authentication Mechanism", mit SHA-1 oder SHA-256), womit Name, Passwort, Authentifizierungsdatenbank geprüft werden, aber auch das `x-509 Certificate`, die `LDAP`- und `Kerberos`-Authentifizierung. Zugriff kann durch eine rollenbasierte Zugriffskontrolle reguliert werden. So können Benutzer erstellt werden, welchen ähnlich zu einem Active Directory Rollen (wie zum Beispiel "Administrator") vergeben werden können, welche ihren Handlungsbereich einschränkt. Ein weiteres Feature ist die Client-Side Field Level Encryption. So können Felder eines Dokumentes vor der Übermittlung zum Server bereits verschlüsselt werden.(vgl. [5]) Die InfluxDB bietet ebenfalls Authentifizierungsmethoden an (darunter Authentifizierung über CLI, JWT Token oder über die API). Dabei wird ebenfalls ein Rollensystem verwendet welches die Verwaltung von Benutzern und derer Berechtigungen ermöglicht. Die Kommunikation zwischen Client und Server läuft über HTTPS, wobei die Authentizität des Servers geprüft wird. 

## Datenmodell

## Load Data

In [2]:
#get list of ifu boxes at eth
url = 'https://api.opensensemap.org/boxes?'
bbox = '8.50269672304309, 47.40598032642525,  8.512126181507432, 47.4113301084323 ' # boundary box around eth zurich
boxes = requests.get(url, params={'bbox':bbox, 'full':'false'}).json()

In [3]:
from_date = '2021-04-25T10:05:49.581Z'
to_date = '2021-05-02T10:05:49.581Z'
data_format = 'csv'

for box in tqdm(boxes):
    box_id = box['_id']
    box_name = box['name']
    location = box['currentLocation']['coordinates']
    lat, lon = location[0], location[1]
    for sensor in box['sensors']:
        try:
            sensor_id = sensor['_id']
            sensor_name = sensor['title']
            sensor_name.replace('/', '')
            sensor_unit = sensor['unit']
        except:
            pass
        
        #url = f'https://api.opensensemap.org/boxes/{box_id}/data/{sensor_id}?format={data_format}&download=true'
        url = f'https://api.opensensemap.org/boxes/{box_id}/data/{sensor_id}?from-date={from_date}&to-date={to_date}&download=true&format={data_format}'
        r = requests.get(url, stream=True)
        if (len(r.text) > 16): # check if sensor returns values (header has length 16)
            with open(f'./data/{box_name}_{sensor_name}.csv', 'wb') as f:
                for _, line in enumerate(r.iter_lines()):
                    if _ == 0: # define header
                        line = 'box_name,sensor_name,box_id,sensor_id,lat,lon,unit,current_time,value\n'
                    else:
                        time, value = (line.decode("utf-8").split(','))
                        time = time.replace('T', ' ').replace('Z', '')
                        time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S.%f').strftime('%Y-%m-%d %H:%M:%S')
                        line= f'{box_name},{sensor_name},{box_id},{sensor_id},{lat},{lon},{sensor_unit},{time},{value}\n'
                    f.write(line.encode())

100%|██████████████████████████████████████████████████████████████████████████████████| 82/82 [02:38<00:00,  1.93s/it]


## Setup MongoDB

In [4]:
#!pip install pymongo
from pymongo import MongoClient
import pymongo

### Prepare data

In [5]:
def csv_to_dict(path, file):
    '''converts a csv file to a dictionary'''
    data = pd.read_csv(path + file)
    pd.to_datetime(data.current_time, format="%Y-%m-%d %H:%M:%S")
    dictionary = dict()
    dictionary["_id"] = file.replace('.csv', '')
    dictionary["box_name"] = data["box_name"][0]
    dictionary["sensor_name"] = data["sensor_name"][0]
    dictionary["box_id"] = data["box_id"][0]
    dictionary["sensor_id"] = data["sensor_id"][0]
    dictionary["lat"] = data["lat"][0]
    dictionary["lon"] = data["lon"][0]
    dictionary["unit"] = data["unit"][0]
    dictionary["measurments"] = dict(zip(data["current_time"], data["value"]))
    return dictionary

### Create db

In [6]:
mongo_client = pymongo.MongoClient('localhost', 27017)
mongo_db = mongo_client["ddi_mc2"]

### Populate db

In [7]:
VERBOSE = False
path = './data/'
for file in tqdm(os.listdir("./data")):
#     print(file.split("_")[0])
    if file.endswith('.csv'): # check for filetype
        if not mongo_db[file.split("_")[0]].count_documents({"_id":file.replace('.csv', '')}) > 0:
            dictionary = csv_to_dict(path, file)
            mongo_db[file.split("_")[0]].insert_one(dictionary)
            if VERBOSE:
                print("populate:", file)

100%|██████████████████████████████████████████████████████████████████████████████| 161/161 [00:00<00:00, 2927.42it/s]


In [8]:
mongo_db.list_collection_names()

['IfU SenseBox2021 3A',
 'IFU Sensebox2021 2A',
 'IfU SenseBox2021 5A',
 'IfU SenseBox 7A',
 'IfU SenseBox2021 8B',
 'IKG Particulate Matter',
 'IfU SenseBox2021 2B',
 'IfU SenseBox2021 12',
 'IfU SenseBox2021 5B',
 'IfU SenseBox2021 1A',
 'IfU SenseBox2021 11A',
 'IfU SenseBox2021 dab',
 'IfU SenseBox2021 04 B',
 'IfU SenseBox2021 9A',
 'ifU SenseBox2021 10A',
 'IfU SenseBox2021 04 A']

In [9]:
mongo_db["IfU SenseBox2021 11A"].find().distinct('_id')

['IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_0',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_10',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_100',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1000',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1001',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1002',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1003',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1004',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1005',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1006',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1007',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1008',
 'IfU SenseBox2021 11A_11A Dexolved Oxygen AtlasScientific_1009',
 'IfU SenseBox2021 11A_1

## Setup InfluxDB 

In [10]:
# !pip install influxdb-client
from datetime import datetime

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

# You can generate a Token from the "Tokens Tab" in the UI
token = "uZei_UmVg7IGcllVQdvKbmbCjwx5s0pe7KfTafVspsL0qGWIg6fmB34JNwWmsEGdt9aFr2Qio6ltOB9_ZrCDDw=="
org = "ddi"
bucket = "ddi"

client = InfluxDBClient(url="http://localhost:8086", token=token)
write_api = client.write_api(write_options=SYNCHRONOUS)

# RS: uZei_UmVg7IGcllVQdvKbmbCjwx5s0pe7KfTafVspsL0qGWIg6fmB34JNwWmsEGdt9aFr2Qio6ltOB9_ZrCDDw==
# SL: AW-zLqzOTpQW4sRYaKbdXpSxBLkxT8rT-RZA-IS5MYo41RZ40YoOCoNYTyu9S2La5W4KpcDzDgCfj53fk6aZuw==

In [11]:
client.health()

{'checks': [],
 'commit': '4db98b4c9a',
 'message': 'ready for queries and writes',
 'name': 'influxdb',
 'status': 'pass',
 'version': '2.0.6'}

### Query MongoDB, insert into InfluxDB

In [12]:
# select box and get all measurements
col = mongo_db.list_collection_names()[0]
col

'IfU SenseBox2021 3A'

**Get all sensors within a selected box**

In [13]:
sensor_names = mongo_db[col].find().distinct('_id')
sensor_names

['IfU SenseBox2021 3A_3A Humidity SHT31 0',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_0',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_10',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_100',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1000',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1001',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1002',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1003',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1004',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1005',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1006',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1007',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1008',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1009',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_101',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1010',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1011',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1012',
 'IfU SenseBox2021 3A_3A Humidity SHT31 0_1013',
 'IfU SenseBox2021 3A_3A Humidity S

In [14]:
# result = mongo_db[col].find_one({},{'measurments':1}) # select first sensor with field 'measurments'
# result

**Get sensor names within the Box**

In [None]:
results = list()
for i in sensor_names:
    print(i)
    results.append(mongo_db[col].find_one({'_id':i},{'measurments'}))
len(results)

IfU SenseBox2021 3A_3A Humidity SHT31 0
IfU SenseBox2021 3A_3A Humidity SHT31 0_0
IfU SenseBox2021 3A_3A Humidity SHT31 0_1
IfU SenseBox2021 3A_3A Humidity SHT31 0_10
IfU SenseBox2021 3A_3A Humidity SHT31 0_100
IfU SenseBox2021 3A_3A Humidity SHT31 0_1000
IfU SenseBox2021 3A_3A Humidity SHT31 0_1001
IfU SenseBox2021 3A_3A Humidity SHT31 0_1002
IfU SenseBox2021 3A_3A Humidity SHT31 0_1003
IfU SenseBox2021 3A_3A Humidity SHT31 0_1004
IfU SenseBox2021 3A_3A Humidity SHT31 0_1005
IfU SenseBox2021 3A_3A Humidity SHT31 0_1006
IfU SenseBox2021 3A_3A Humidity SHT31 0_1007
IfU SenseBox2021 3A_3A Humidity SHT31 0_1008
IfU SenseBox2021 3A_3A Humidity SHT31 0_1009
IfU SenseBox2021 3A_3A Humidity SHT31 0_101
IfU SenseBox2021 3A_3A Humidity SHT31 0_1010
IfU SenseBox2021 3A_3A Humidity SHT31 0_1011
IfU SenseBox2021 3A_3A Humidity SHT31 0_1012
IfU SenseBox2021 3A_3A Humidity SHT31 0_1013
IfU SenseBox2021 3A_3A Humidity SHT31 0_1014
IfU SenseBox2021 3A_3A Humidity SHT31 0_1015
IfU SenseBox2021 3A_3A Hu

In [None]:
for result in tqdm(results):
    for observation in result['measurments'].items():
        point = Point(result['_id'].split('_')[0]) \
          .tag("sensor_name", result['_id'].split('_')[1]) \
          .field("_value", observation[1])\
          .time(datetime.strptime(observation[0], "%Y-%m-%d %H:%M:%S"), WritePrecision.S)

        write_api.write(bucket, org, point)

#### Query InfluxDB

In [None]:
times = pd.date_range(from_date,to_date,freq='4h')

def to_RFC3339Date(x):
    x = str(x)
    x = x.replace(' ','T')
    x = x.replace('000+00:00','Z')
    return x

times = [i for i in times][1:] # remove first time as it is equal to from_date
times = [to_RFC3339Date(i) for i in times]

In [None]:
len(times)

In [None]:
sensor_name = "9A"
box_name = "IfU SenseBox2021 9A"

mean_runtimes = []
std_runtimes = []
n_init = 10

api = client.query_api()

for i in tqdm(range(len(times))):   
    runtimes = [] 
    n_datepoints = []
    
    for _ in range(n_init):

        start = time.perf_counter()

        query = f'''
            from(bucket: "{bucket}")
              |> range(start: time(v: "{from_date}"), stop: time(v: "{times[i]}"))
              |> filter(fn: (r) => r["_measurement"] == "IFU Sensebox2021 2A")
              |> filter(fn: (r) => r["_field"] == "_value")
              |> filter(fn: (r) => r["sensor_name"] == "Lufttemperatur 2-BME680")
              |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
              |> yield(name: "mean")'''

        table = api.query_data_frame(query, org=org)

        end = time.perf_counter()

        runtimes.append(end-start)
        n_datepoints.append(len(table))
    
    mean_runtimes.append(np.mean(runtimes))
    std_runtimes.append(np.std(runtimes))

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
plt.plot(range(0, len(mean_runtimes)), mean_runtimes)
plt.fill_between(range(0, len(mean_runtimes)), np.subtract(mean_runtimes,std_runtimes), np.add(mean_runtimes,std_runtimes), alpha=.2)

#### Quellen

[1]«MongoDB vs InfluxDB | InfluxData Time Series Workloads», InfluxData, Dez. 18, 2018. https://www.influxdata.com/blog/influxdb-is-27x-faster-vs-mongodb-for-time-series-workloads/ (zugegriffen Juni 17, 2021).

[2]«MongoDB disk and memory requirements», Documentation & User Guides | FotoWare, Nov. 17, 2015. https://learn.fotoware.com/On-Premises/FotoWeb/05_Configuring_sites/Setting_the_MongoDB_instance_that_FotoWeb_uses/MongoDB_disk_and_memory_requirements (zugegriffen Juni 17, 2021).

[3]H. 16 A. 2018 at 12:26, «InfluxDB design guidelines to avoid performance issues», Service  Engineering (ICCLab & SPLab). https://blog.zhaw.ch/icclab/influxdb-design-guidelines-to-avoid-performance-issues/ (zugegriffen Juni 17, 2021).

[4]P. Dix, «InfluxDB Clustering - High Availability and Scalability», InfluxData, Sep. 10, 2020. https://www.influxdata.com/blog/influxdb-clustering/ (zugegriffen Juni 17, 2021).

[5]«Security — MongoDB Manual», https://github.com/mongodb/docs-bi-connector/blob/DOCSP-3279/source/index.txt. https://docs.mongodb.com/manual/security/ (zugegriffen Juni 21, 2021).
