Skip to content
Mehdi Lefebvre edited this page Dec 19, 2019 · 1 revision

How to inject data:

import requests
from uuid import uuid4 as uuid
from random import randint as rand
from datetime import datetime
from datetime import timedelta


def id():
    return str(uuid())


def gen_lang():
    langs = ['en', 'fr', 'de', 'it']
    return langs[rand(0, len(langs) - 1)]


def gen_source():
    sources = ['livechat', 'chatbot']
    return sources[rand(0, len(sources) - 1)]


def next_rate():
    return rand(1, 5)


def gen_timestamp():
    now = datetime.now()
    mcs = rand(0, 1000)
    ms = rand(0, 1000)
    s = rand(0, 60)
    m = rand(0, 60)
    h = rand(0, 60)
    d = rand(0, 97)
    dt = timedelta(microseconds=mcs, milliseconds=ms, seconds=s, minutes=m, hours=h, days=d)
    return now - dt


def gen_rating():
    return {
        "conversationId": id(),
        "rating": next_rate(),
        "timestamp": "%sZ" % gen_timestamp().isoformat(),
        "lang": gen_lang(),
        "source": gen_source()
    }

#{
#  "conversationId": "054E31C0-453B-4107-A7B6-0C52FC4BE3F5",
#  "source": "chatbot".
#  "lang": "fr"
#  "timestamp": "2019-12-18T08:44:50Z"
#  "rating": 4
#}

#{
#  "conversationId": "AE835494-C69B-4A6B-8A02-7885DBC6CC87",
#  "source": "livechat".
#  "lang": "en"
#  "timestamp": "2019-12-18T08:44:50Z"
#  "rating": 5
#}

def send(url, data):
    headers = {'user-agent': 'python-processor/0.0.1', 'content-type': 'application/json'}
    r = requests.post(url, json = data, headers=headers)


def run():
    url = "http://localhost:8765/ratings"
    index = "ratings"
    nb_rating = 2000
    for x in range(0, nb_rating):
        rating = gen_rating()
        send(url, rating)


if __name__ == "__main__":
    run()
Clone this wiki locally