# Kafka Producer

In [2]:
from kafka.producer import KafkaProducer
from datetime import datetime 
from time import sleep
from random import randint
import geohash 
import json

In [3]:
# create a producer
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                        value_serializer=lambda x: json.dumps(x).encode('utf-8')) # send data as a json object
        
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
# send message from producer
def publish_message(producer_instance, topic_name, value):
    try:
        
        # note that the application is responsible for encoding messages to type json    
        # send the content in a json format 
        content = {'latitude':value[0], 
                   'longitude':value[1], 
                   'air_temperature_celcius':value[2], 
                   'relative_humidity':value[3],
                   'windspeed_knots':value[4],
                   'max_wind_speed':value[5],
                   'precipitation':value[6][1:],
                   'geohash':geohash.encode(float(value[0]), float(value[1]), precision=5),
                   'created_time': value[7], 
                   'sender_id': value[8]} 
        
        producer.send(topic_name, content)
        producer_instance.flush()
        
        print('Message published successfully. Data: ' + str(value))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))
        
#  read file and randomly feed a data from file       
def sendcsv(producer, f, sender_id):
    
    sender_id = str(sender_id)
    
    # publish the message if the file still has records left in the file 
    while len(f) > 0: 
        
        sleep(5) # time interval
        random_line = randint(1,len(f)) # select a random record 
        
        
        try: 
            formated_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # format time 
            line = f.pop(random_line) + ',' + formated_time + ',' + sender_id # remove one line out of the file for sending
            line = line.split(',')
            publish_message(producer, "climate", line)    
            
        except IndexError:
            print ('No more climate streaming data.')
            break


if __name__ == '__main__':

    producer = connect_kafka_producer()
    f = open('../Data/climate_streaming.csv').read().splitlines()[1:]
    sendcsv(producer, f, 1)

Message published successfully. Data: ['-36.8741', '143.91', '9', '41.9', '9.2', '14', ' 0.12G', '2019-05-24 16:23:35', '1']
Message published successfully. Data: ['-37.57', '148.034', '11', '40.4', '19.3', '26', ' 0.00G', '2019-05-24 16:23:40', '1']
Message published successfully. Data: ['-37.236', '141.176', '14', '48.1', '13.7', '19', ' 0.35G', '2019-05-24 16:23:45', '1']
Message published successfully. Data: ['-36.759', '145.179', '18', '55.6', '12.8', '19', ' 0.00I', '2019-05-24 16:23:50', '1']
Message published successfully. Data: ['-37.0585', '143.8589', '19', '59.7', '7.4', '14', ' 0.63G', '2019-05-24 16:23:55', '1']
Message published successfully. Data: ['-36.3769', '144.7505', '14', '50.9', '5.9', '13', ' 0.00I', '2019-05-24 16:24:00', '1']
Message published successfully. Data: ['-37.61', '149.279', '20', '57.4', '10.9', '22', ' 0.00I', '2019-05-24 16:24:05', '1']
Message published successfully. Data: ['-37.295', '144.377', '16', '54.8', '8.3', '9.9', ' 0.00G', '2019-05-24 16

KeyboardInterrupt: 