Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: load testing and gunicorn support #354

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
FROM python:3.8.3-alpine3.12 as base
FROM base as builder
RUN apk --no-cache --update-cache add gcc python3 python3-dev py-pip build-base wget
RUN apk --no-cache --update-cache add gcc python3 python3-dev py-pip build-base wget libffi-dev
RUN ln -s /usr/include/locale.h /usr/include/xlocale.h
RUN pip install pipenv
RUN pip install pipenv gunicorn gevent
RUN mkdir -p /src/ngsi-timeseries-api
COPY Pipfile /src/ngsi-timeseries-api/Pipfile
COPY Pipfile.lock /src/ngsi-timeseries-api/Pipfile.lock
RUN cd /src/ngsi-timeseries-api && { pipenv lock -r > /requirements.txt; }
RUN pip install -r /requirements.txt

FROM base
RUN apk --no-cache add curl
RUN apk --no-cache add curl supervisor

COPY --from=builder /usr/local /usr/local
COPY . /src/ngsi-timeseries-api/
WORKDIR /src/ngsi-timeseries-api/src
ENV PYTHONPATH=$PWD:$PYTHONPATH
COPY conf/supervisord.conf /etc/supervisord.conf

EXPOSE 8668

CMD python app.py
CMD ["supervisord", "-c", "/etc/supervisord.conf"]
270 changes: 154 additions & 116 deletions Pipfile.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions conf/supervisord.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[supervisord]
nodaemon=true

[program:quantumleap]
command=gunicorn -b 0.0.0.0:8668 uwsgi --log-level DEBUG --worker-class gevent --worker-connections 10000 --config gunicorn.conf.py
directory=/src/ngsi-timeseries-api/src
autostart=true
autorestart=true
startsecs=10
startretries=3
2 changes: 2 additions & 0 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python
from utils.hosts import LOCAL


Expand All @@ -10,3 +11,4 @@
# validate_responses=True, strict_validation=True
)
app.run(host=LOCAL, port=8668)
application = app.app
3 changes: 3 additions & 0 deletions src/gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import multiprocessing

workers = multiprocessing.cpu_count() * 4 + 1
90 changes: 76 additions & 14 deletions src/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,57 @@
import requests
import time


# INPUT VARIABLES
QL_URL = os.environ.get("QL_URL", "http://localhost:8668")
ORION_URL = os.environ.get("ORION_URL", "http://localhost:1026")
ORION_URL_4QL = os.environ.get("ORION_URL_4QL", "http://orion:1026")
QL_URL_4ORION = os.environ.get("QL_URL_4ORION", "http://quantumleap:8668")
QL_DEFAULT_DB = os.environ.get("QL_DEFAULT_DB", "crate")

# HELPER VARIABLES
ENTITY_TYPE = "IntegrationTestEntity"


class IntegrationTestEntity:
def __init__(self, e_id, fiware_service=None, fiware_servicepath=None):
def __init__(self, e_id, fiware_service=None, fiware_servicepath=None, keyValues=True):
self.FIWARE_SERVICEPATH_KEY = 'Fiware-ServicePath'
self.id = e_id
self.type = ENTITY_TYPE

self.keyValues = keyValues
self.fiware_service = fiware_service
self.fiware_servicepath = fiware_servicepath

self.attrs = {
"int_attr": 120,
"float_attr": 0.5,
"text_attr": "blabla",
"bool_attr": False,
"obj_attr": {},
}
if keyValues:
self.attrs = {
"int_attr": 120,
"float_attr": 0.5,
"text_attr": "blabla",
"bool_attr": False,
"obj_attr": {},
}
else:
self.attrs = {
"int_attr": {
"value": 120,
"type" : "Number"
},
"float_attr": {
"value": 0.5,
"type": "Number"
},
"text_attr": {
"value": "blabla",
"type": "Text"
},
"bool_attr": {
"value": False,
"type": "Boolean"
},
"obj_attr": {
"value": {},
"type": "StructuredValue"
}
}

def headers(self):
h = {}
Expand All @@ -46,10 +70,10 @@ def payload(self):
def update(self):
self.attrs['int_attr'] += random.choice((1, -1))
return {
"int_attr": {
"value": self.attrs['int_attr'],
"type": "Number"
}
"int_attr": {
"value": self.attrs['int_attr'],
"type": "Number"
}
}

def __repr__(self):
Expand Down Expand Up @@ -130,6 +154,36 @@ def load_data(is_old_ql_image=False):
return entities


def send_data_ql(entities, batch=False, is_old_ql_image=False):
check_ql_url(is_old_ql_image)

# Post Entities in QL
url = "{}/v2/notify".format(QL_URL)
if batch:
array = []
h = None
for e in entities:
h = {'Content-Type': 'application/json', **e.headers()}
array.append(e.payload())
data = {
"data": array
}
data = json.dumps(data)
res = requests.post(url, data=data, params=None, headers=h)
assert res.ok, res.text
else:
for e in entities:
h = {'Content-Type': 'application/json', **e.headers()}
array = []
array.append(e.payload())
data = {
"data": array
}
data = json.dumps(data)
res = requests.post(url, data=data, params=None, headers=h)
assert res.ok, res.text


def check_data(entities):
check_orion_url()
check_ql_url()
Expand Down Expand Up @@ -160,6 +214,11 @@ def check_data(entities):


def unload_data(entities):
delete_orion(entities)
delete_ql(entities)


def delete_orion(entities):
errors = []
# Cleanup all Subscriptions
for e in entities:
Expand All @@ -179,6 +238,9 @@ def unload_data(entities):
if not r.ok:
errors.append(r.text)


def delete_ql(entities):
errors = []
# Cleanup Historical Records
deleted = set([])
for e in entities:
Expand Down
6 changes: 4 additions & 2 deletions src/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ services:
- "27017:27017"

quantumleap:
image: ${QL_IMAGE:-smartsdk/quantumleap}
image: ${QL_IMAGE:-smartsdk/quantumleap:latest}
sysctls:
net.core.somaxconn: 4096
ports:
- "8668:8668"
depends_on:
Expand All @@ -33,7 +35,7 @@ services:
- USE_GEOCODING=True
- REDIS_HOST=redis
- REDIS_PORT=6379
- LOGLEVEL=DEBUG
- LOGLEVEL=ERROR

crate:
image: crate:${CRATE_VERSION:-4.1.4}
Expand Down
16 changes: 16 additions & 0 deletions src/tests/run_load_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

docker-compose up -d
docker-compose stop orion
docker-compose stop mongo
sleep 10

docker run -i loadimpact/k6 run --vus 10 --duration 60s - <script.js

sleep 10

docker run -i loadimpact/k6 run --vus 100 --duration 120s - <script.js

sleep 10

docker-compose down
49 changes: 49 additions & 0 deletions src/tests/script.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import http from 'k6/http';
import { check, sleep } from 'k6';

export default function() {
var url = 'http://192.0.0.1:8668/v2/notify';
const before = new Date().getTime();
const T = 30; // time needed to complete a VU iteration


for (var i = 0; i < 100; i++){
var data = {
"id": "Room:1",
"type": "Room",
"temperature": {
"value": 23,
"type": "Float"
},
"pressure": {
"value": 720,
"type": "Integer"
}
}
var array = [];
array.push(data);

var payload = {
"data" : array
}
var payload = JSON.stringify(payload);

var params = {
headers: {
'Content-Type': 'application/json',
}
};
let res = http.post(url, payload, params);
check(res, { 'status was 200': r => r.status == 200 });
}
const after = new Date().getTime();
const diff = (after - before) / 1000;
const remainder = T - diff;
if (remainder > 0) {
sleep(remainder);
} else {
console.warn(
`Timer exhausted! The execution time of the test took longer than ${T} seconds`
);
}
}
28 changes: 17 additions & 11 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from translators.sql_translator import NGSI_ISO8601, NGSI_DATETIME, \
NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, TIME_INDEX, \
METADATA_TABLE_NAME, FIWARE_SERVICEPATH
from translators.table_cache import TableCacheManager
import logging
from .crate_geo_query import from_ngsi_query
from utils.cfgreader import EnvReader, StrVar, IntVar
Expand All @@ -22,24 +23,25 @@
"Boolean": 'boolean',
# TODO since CRATEDB 4.0 timestamp is deprecated. Should be replaced with timestampz
# This means that to maintain both version, we will need a different mechanism
NGSI_ISO8601: 'timestamp',
NGSI_DATETIME: 'timestamp',
NGSI_ISO8601: 'timestamptz',
NGSI_DATETIME: 'timestamptz',
"Integer": 'long',
NGSI_GEOJSON: 'geo_shape',
NGSI_GEOPOINT: 'geo_point',
"Number": 'float',
NGSI_TEXT: 'string',
NGSI_STRUCTURED_VALUE: 'object',
TIME_INDEX: 'timestamp'
TIME_INDEX: 'timestamptz'
}


CRATE_TO_NGSI = dict((v, k) for (k,v) in NGSI_TO_SQL.items())
CRATE_TO_NGSI['string_array'] = 'Array'

CACHE = TableCacheManager()

class CrateTranslator(sql_translator.SQLTranslator):

class CrateTranslator(sql_translator.SQLTranslator):

NGSI_TO_SQL = NGSI_TO_SQL

Expand All @@ -66,8 +68,9 @@ def dispose(self):


def get_db_version(self):
self.cursor.execute("select version['number'] from sys.nodes")
return self.cursor.fetchall()[0][0]
#self.cursor.execute("select version['number'] from sys.nodes")
#return self.cursor.fetchall()[0][0]
return "4.1.4"


def get_health(self):
Expand Down Expand Up @@ -129,11 +132,14 @@ def _preprocess_values(self, e, table, col_names, fiware_servicepath):
return values

def _prepare_data_table(self, table_name, table, fiware_service):
columns = ', '.join('"{}" {}'.format(cn.lower(), ct)
for cn, ct in table.items())
stmt = "create table if not exists {} ({}) with " \
"(number_of_replicas = '2-all', column_policy = 'dynamic')".format(table_name, columns)
self.cursor.execute(stmt)
cached_metadata = CACHE.get(table_name+'-create-stmt')
if not cached_metadata:
columns = ', '.join('"{}" {}'.format(cn.lower(), ct)
for cn, ct in table.items())
stmt = "create table if not exists {} ({}) with " \
"(number_of_replicas = '2-all', column_policy = 'dynamic')".format(table_name, columns)
self.cursor.execute(stmt)
CACHE.add(table_name+'-create-stmt', stmt)

def _should_insert_original_entities(self, insert_error: Exception) -> bool:
return isinstance(insert_error, exceptions.ProgrammingError)
Expand Down
Loading