# Producer 3: Feed Hotspot data from satellite TERRA

In [None]:
# import statements
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import datetime as dt
import pandas as pd

# method for publishing data
def publish_message(producer_instance, topic_name, data):
    try:
        producer_instance.send(topic_name, value=data)
        print('Message published successfully. Data: ' + str(data))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))

# method for creating kafka connection
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  value_serializer=lambda x: dumps(x).encode('ascii'),
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
if __name__ == '__main__':
    # read csv file
    aqua_terra_streaming = pd.read_csv('hotspot_TERRA_streaming.csv')
    # setup up topic    
    topic = 'StopFire'
    print('Publishing records..')
    producer = connect_kafka_producer()
    while True:
        # set up a random feed interval between 10-30s and sender_id
        random_time = random.randrange(10,30)
        created_time = str(dt.datetime.now().strftime("%X"))
        sender_id = 'producer_3'
        
        # append timestamp and sender_id to the randomlty selected records
        dict_hotspot = aqua_terra_streaming.sample(1).to_dict('records')[0]
        dict_hotspot.update({'created_time': created_time, 'sender_id': sender_id}) 
        publish_message(producer, topic, dict_hotspot)
        
        # feed the data to stream based on the random interval
        sleep(random_time)

Publishing records..
Message published successfully. Data: {'sender_id': 'producer_3', 'longitude': 148.07299999999998, 'confidence': 63.0, 'created_time': '21:35:44', 'latitude': -37.554, 'surface_temperature_celcius': 37.0}
Message published successfully. Data: {'sender_id': 'producer_3', 'longitude': 144.5934, 'confidence': 70.0, 'created_time': '21:36:02', 'latitude': -36.5223, 'surface_temperature_celcius': 45.0}
Message published successfully. Data: {'sender_id': 'producer_3', 'longitude': 146.4774, 'confidence': 64.0, 'created_time': '21:36:23', 'latitude': -36.0047, 'surface_temperature_celcius': 42.0}
Message published successfully. Data: {'sender_id': 'producer_3', 'longitude': 143.1764, 'confidence': 74.0, 'created_time': '21:36:39', 'latitude': -37.7905, 'surface_temperature_celcius': 47.0}
Message published successfully. Data: {'sender_id': 'producer_3', 'longitude': 142.2606, 'confidence': 77.0, 'created_time': '21:37:06', 'latitude': -36.4811, 'surface_temperature_celciu