# Producer

The data generated based on the following formula:

$$
y(t+1) = -0.4 \frac{(3 - y(t)^2)}{(1 + y(t)^2)} + 0.6 \frac{(3 - (y(t-1) - 0.5)^3)}{(1 + (y(t-1) - 0.5)^3)}
$$


In [1]:
from kafka import KafkaProducer
import time
import numpy as np
import json
from message import Message
from messageManager import MessageManager
from messageManagerFactory import MessageManagerFactory

In [2]:
producer = KafkaProducer(bootstrap_servers='localhost:9092')

In [3]:
def get_y_future(y_current, y_before):
    """
    Calculates the y(t+1) based on y(t) and y(t-1)
    
    Parameters
    ----------
    arg1: float
        represents y(t)
    arg2: float
        represents y(t-1)
        
    Returns
    -------
    float
        y(t+1)
    """
    return -0.4 * (3 - pow(y_current, 2))/(1 + pow(y_current, 2)) + 0.6 * (3 - pow(y_before - 0.5, 3)) / (1 + pow(y_before - 0.5, 3)) + np.random.random()


In [4]:
def send_message(message, producer, topic_name):
    """
    It uses the producer to sent a message using that topic's name
    
    Parameters
    ----------
    arg1: Message
        an object containing the id and the inputs xs
    arg2: KafkaProducer
    arg3: string
    
    Returns
    -------
    string
        JSON message that was sent
    """

    message_json = json.dumps(message.__dict__)
    producer.send(topic_name, message_json.encode())
    return message_json

In [6]:
np.random.seed(2452020515) # Fix seed to ensure repeatability

time_delay = 5
messages_sent = 0

message_manager_factory = MessageManagerFactory()
message_manager_factory.create(1000, topic_name = 'timeSerieS')

# y(t-1), y(t), y(t+1)
init_range = 100
y_before = np.random.random()*init_range
y_current = np.random.random()*init_range
y_future = None

message_manager_factory.add_input(y_before)
message_manager_factory.add_input(y_current)

#Infinite loop for sending messages to Kafka with the topic_name
while True:
    
    for message_manager in message_manager_factory.get_all():
        # if the data is enough send the message
        if message_manager.has_reached_input_size_limit():
            message_manager.increment_message_sent()
            sent_message = send_message(message_manager.message, producer, message_manager.topic_name)
            print('Message on topic', message_manager.topic_name)
            print('\t ID ',message_manager.message.id,' SIZE ', message_manager.input_vector_size, ' INPUT ', message_manager.message.x[:5])
            print('\r\n')
            message_manager.reset_message()
            time.sleep(time_delay) # wait a bit to sent the other message

    y_future = get_y_future(y_current, y_before)
    y_before = y_current
    y_current = y_future

    message_manager_factory.add_input(y_future)
    

Message on topic timeSerieS
	 ID  1  SIZE  1000  INPUT  [0.24151530439742164, 2.7939322560110558, 2.1120984617709713, 0.3665895884885007, -1.0459761231321218]


Message on topic timeSerieS
	 ID  2  SIZE  1000  INPUT  [0.76443002977776, -0.9866562025684653, 2.1339469580945667, -1.3119028089737994, -0.0976704554160352]


Message on topic timeSerieS
	 ID  3  SIZE  1000  INPUT  [0.036384613531617704, 0.6699173767554893, 1.63854288419193, 1.8558471429768026, 0.8818686500344783]


Message on topic timeSerieS
	 ID  4  SIZE  1000  INPUT  [2.0940141761140953, 1.017793704291555, -0.4718155633757626, 1.5734994870856742, 29.17306484310754]


Message on topic timeSerieS
	 ID  5  SIZE  1000  INPUT  [-2.1672846845291325, -2.2121082414568844, 0.1291836956621305, -1.438602881695864, 1.9986252403014035]


Message on topic timeSerieS
	 ID  6  SIZE  1000  INPUT  [2.047067385516998, -0.4244321440657831, -0.484478796855225, 10.49325814041427, 52.52970564041169]


Message on topic timeSerieS
	 ID  7  SIZE  1

Message on topic timeSerieS
	 ID  52  SIZE  1000  INPUT  [-0.8929235221518731, -0.07020216693096559, -3.07025627726276, 2.7244543233756073, -0.2032946864514802]


Message on topic timeSerieS
	 ID  53  SIZE  1000  INPUT  [-0.4056914722664119, -0.2513078191104746, 8.005318910555225, 4.342310106324302, -0.05097776863716891]


Message on topic timeSerieS
	 ID  54  SIZE  1000  INPUT  [-1.09330561796591, 2.344488819234952, -0.39547223624918815, -0.487785785623327, 7.502056996023756]


Message on topic timeSerieS
	 ID  55  SIZE  1000  INPUT  [3.4564046596600626, 0.23998859315153764, -1.2605300300302638, 2.3886084714892144, -0.8450886468265584]


Message on topic timeSerieS
	 ID  56  SIZE  1000  INPUT  [-0.656142920363111, 3.232459428809385, -3.8467911888615283, 0.09394879034677861, -1.4169397280372231]


Message on topic timeSerieS
	 ID  57  SIZE  1000  INPUT  [-0.9009450127122759, 0.022748756034161155, -2.332559583767063, 2.835450245474034, -0.26103906559786916]


Message on topic timeSerieS

Message on topic timeSerieS
	 ID  103  SIZE  1000  INPUT  [3.5456587928375702, 0.3992903391076722, -1.4400052290292031, 1.770421399151474, -0.12180498393845696]


Message on topic timeSerieS
	 ID  104  SIZE  1000  INPUT  [-0.12202187295448852, -1.0471768396640586, 2.2070282448369434, -0.6437099309684089, -0.8701920946747751]


Message on topic timeSerieS
	 ID  105  SIZE  1000  INPUT  [2.156473593271312, 2.485517031410983, 0.19592880300338184, -1.1593070210208387, 1.9182170162738275]


Message on topic timeSerieS
	 ID  106  SIZE  1000  INPUT  [0.3547108215003407, -0.377500444835295, 0.8277412827039513, 7.179786408928938, 2.2199826221440517]


Message on topic timeSerieS
	 ID  107  SIZE  1000  INPUT  [2.97747893492188, 0.3872626563731463, -0.8702891526524743, 1.8531735725137906, -1.6998963506491669]


Message on topic timeSerieS
	 ID  108  SIZE  1000  INPUT  [-1.018530584079942, 0.29422833032605966, -2.4591136999193, 2.811645317930953, 0.2270689570987]


Message on topic timeSerieS
	 ID 

Message on topic timeSerieS
	 ID  154  SIZE  1000  INPUT  [0.39908216603251245, -0.8480051123238007, 1.5670092823222277, -2.223888671829796, 0.8637129678394588]


Message on topic timeSerieS
	 ID  155  SIZE  1000  INPUT  [-2.5506016883348934, 2.3289346345390958, -0.21391574624126874, -0.42791882463668707, 2.520546943412168]


Message on topic timeSerieS
	 ID  156  SIZE  1000  INPUT  [0.03905624513387923, -1.6028143657155396, 3.008825224908315, -0.22026494292220433, -0.97605826534403]


Message on topic timeSerieS
	 ID  157  SIZE  1000  INPUT  [1.9666388912012178, -1.524390873747971, 0.3920998902756927, -0.9477931411746091, 2.3236322750902945]


Message on topic timeSerieS
	 ID  158  SIZE  1000  INPUT  [-1.4524064730901758, 0.2579745409848234, -2.047968613807118, 2.7944330140175264, 0.35074315073579]


Message on topic timeSerieS
	 ID  159  SIZE  1000  INPUT  [-1.6213381456227927, 3.425025782158004, -0.4755366606568684, -1.2205328156575106, 32.93192222888022]


Message on topic timeSeri

Message on topic timeSerieS
	 ID  205  SIZE  1000  INPUT  [1.4032419191639294, -4.021209017124167, 1.6234708049494708, 0.20337366659451128, 0.013587149397822573]


Message on topic timeSerieS
	 ID  206  SIZE  1000  INPUT  [-0.4378843761595249, -0.3921975049695202, 12.933256315547194, 8.52954020844591, 0.17206877099488777]


Message on topic timeSerieS
	 ID  207  SIZE  1000  INPUT  [-0.10909158381912643, 0.45315125920566934, 2.100274752068624, 2.9023142569759424, 0.6865907060059082]


Message on topic timeSerieS
	 ID  208  SIZE  1000  INPUT  [1.4829884173303531, -1.6664546095813317, 1.0567260291459766, -0.7828716996724013, 1.7002182025385013]


Message on topic timeSerieS
	 ID  209  SIZE  1000  INPUT  [4.373408994401316, 0.3746216913047309, -0.6246594671688176, 1.8093489952098, -5.614325246174709]


Message on topic timeSerieS
	 ID  210  SIZE  1000  INPUT  [-1.0537972869900736, -6.816252664224712, -0.8699674792893045, -0.30028734750459496, -3.015651931607759]


Message on topic timeSeri

Message on topic timeSerieS
	 ID  256  SIZE  1000  INPUT  [-0.2854385896306296, 1.250378240737993, 4.133129426299071, 1.4544246977337996, -0.2706341091569854]


Message on topic timeSerieS
	 ID  257  SIZE  1000  INPUT  [-1.7471696690215168, 2.9554554619973055, 0.18394345354996078, -0.8136309905252305, 2.014594249639842]


Message on topic timeSerieS
	 ID  258  SIZE  1000  INPUT  [-0.13460273450718985, 1.0281537022364389, 2.6201018363389026, 1.708763947859845, -0.2509101606851357]


Message on topic timeSerieS
	 ID  259  SIZE  1000  INPUT  [-7.035914409417922, -1.4105492815543645, 0.15203500777403034, -1.2153573393283772, 2.473855896373496]


Message on topic timeSerieS
	 ID  260  SIZE  1000  INPUT  [2.101518858123037, -2.1771674908428094, 0.01035284500446959, -1.6496465557230986, 2.5275586009650732]


Message on topic timeSerieS
	 ID  261  SIZE  1000  INPUT  [-0.32037972602317866, -1.95279020277068, 5.148454099273755, -0.3332767455903229, -1.2303867999377789]


Message on topic timeSer

Message on topic timeSerieS
	 ID  307  SIZE  1000  INPUT  [-2.958781144870981, -0.022376830637695078, -1.438902239813037, 2.094636650108314, -0.6940792527866957]


Message on topic timeSerieS
	 ID  308  SIZE  1000  INPUT  [0.6062058296120739, -0.9567989557966298, 1.9211037619715277, -1.4314277835089568, 0.7521338963103635]


Message on topic timeSerieS
	 ID  309  SIZE  1000  INPUT  [0.11292032761149817, 6.726997027098609, 2.708296819109284, 0.5254920829897609, -0.8142261506728895]


Message on topic timeSerieS
	 ID  310  SIZE  1000  INPUT  [3.041144996070983, 0.08242169916654823, -0.7385771281628488, 1.444434407703059, -2.492722662601854]


Message on topic timeSerieS
	 ID  311  SIZE  1000  INPUT  [-0.07378226662902432, -1.1001845225676536, 2.0865818917717522, -0.6175582972428928, 0.1155326849346281]


Message on topic timeSerieS
	 ID  312  SIZE  1000  INPUT  [1.1122011928548832, -0.6403640572376725, 1.3854152173561656, -4.88305000332238, 1.1985304743553442]


Message on topic timeSeri

KeyboardInterrupt: 