In [1]:
from kafka import KafkaProducer
def connect_kafka_producer(server=['localhost:9092'], api=(0, 10)):
    producer = None
    try:
        producer = KafkaProducer(bootstrap_servers=server, api_version=api)
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return producer

In [2]:
def publish_message(producer_instance, topic_name, value, key=None):
    try:
        try:
            key_bytes = bytes(key, encoding='utf-8')
        except:
            key_bytes = None
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

In [3]:
from kafka import KafkaConsumer
def connect_kafka_consummer(topic, server=['localhost:9092'], api=(0, 10), reset='earliest'):
    consumer = None
    try:
        consumer = KafkaConsumer(topic, auto_offset_reset=reset,
                             bootstrap_servers=server, api_version=api, consumer_timeout_ms=1000)
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return consumer

## test send message to kafka   
json file configuration

```
{ 
  "personnel_id" : "",   
  "cards" : [ 
    { 
      "card_type" : "",         # AddInsight/ShareInsight 
      "card_rank" : "",         # Datascience will determine the rank for a card 
      "card_image" : "",        # TODO: We are yet to determine this process for fetching the image. 
      "card_location" : "",     # TODO: Location may be city, neighborhood or property. 
      "card_content" : { 
        "section_1" : "",       # Used as the title for a card 
        "section_2" : ""        # Used as the subtitle for a card 
      } 
    } 
  ] 
} 
```

In [4]:
card1 = {
  "personnel_id" : 1231,
  "cards" : [
    {
      "card_type" : "AddInsight",         # AddInsight/ShareInsight
      "card_rank" : 1,         # Datascience will determine the rank for a card
      "card_image" : "",        # TODO: We are yet to determine this process for fetching the image.
      "card_location" : "",     # TODO: Location may be city, neighborhood or property.
      "card_content" : {
        "section_1" : "Write local insight for city X1",       # Used as the title for a card
        "section_2" : ""        # Used as the subtitle for a card
      }
    }
  ]
}

card2 = {
  "personnel_id" : 1232,
  "cards" : [
    {
      "card_type" : "AddInsight",         # AddInsight/ShareInsight
      "card_rank" : 1,         # Datascience will determine the rank for a card
      "card_image" : "",        # TODO: We are yet to determine this process for fetching the image.
      "card_location" : "",     # TODO: Location may be city, neighborhood or property.
      "card_content" : {
        "section_1" : "Write local insight for city X2",       # Used as the title for a card
        "section_2" : ""        # Used as the subtitle for a card
      }
    }
  ]
}

card3 = {
  "personnel_id" : 1233,
  "cards" : [
    {
      "card_type" : "AddInsight",         # AddInsight/ShareInsight
      "card_rank" : 1,         # Datascience will determine the rank for a card
      "card_image" : "",        # TODO: We are yet to determine this process for fetching the image.
      "card_location" : "",     # TODO: Location may be city, neighborhood or property.
      "card_content" : {
        "section_1" : "Write local insight for city X3",       # Used as the title for a card
        "section_2" : ""        # Used as the subtitle for a card
      }
    }
  ]
}

In [5]:
cards = [card1, card2, card3]

In [6]:
import json
cards_json = []
for card in cards:
    cards_json.append(json.dumps(card))

### create kafka producer and write cards into kafka

In [7]:
def publish_messages_kafka(topic, messages):
    kafka_producer = connect_kafka_producer()
    for message in messages:
        publish_message(kafka_producer, topic, message)
        
    if kafka_producer is not None:
        kafka_producer.close()

In [8]:
publish_messages_kafka('AddInsight', cards_json)

Message published successfully.
Message published successfully.
Message published successfully.


### create consumer and get data from kafka

In [11]:
topic = 'AddInsight'
consumer = connect_kafka_consummer(topic)

In [12]:
kafka_rec = []
for msg in consumer:
    kafka_rec.append(msg.value)

In [13]:
kafka_rec

[b'{"personnel_id": 1231, "cards": [{"card_type": "AddInsight", "card_rank": 1, "card_image": "", "card_location": "", "card_content": {"section_1": "Write local insight for city X1", "section_2": ""}}]}',
 b'{"personnel_id": 1232, "cards": [{"card_type": "AddInsight", "card_rank": 1, "card_image": "", "card_location": "", "card_content": {"section_1": "Write local insight for city X2", "section_2": ""}}]}',
 b'{"personnel_id": 1233, "cards": [{"card_type": "AddInsight", "card_rank": 1, "card_image": "", "card_location": "", "card_content": {"section_1": "Write local insight for city X3", "section_2": ""}}]}']

In [14]:
### convert json file to dic
dic_rec = []
for file in kafka_rec:
    dic_rec.append(json.loads(file))

In [15]:
dic_rec

[{'personnel_id': 1231,
  'cards': [{'card_type': 'AddInsight',
    'card_rank': 1,
    'card_image': '',
    'card_location': '',
    'card_content': {'section_1': 'Write local insight for city X1',
     'section_2': ''}}]},
 {'personnel_id': 1232,
  'cards': [{'card_type': 'AddInsight',
    'card_rank': 1,
    'card_image': '',
    'card_location': '',
    'card_content': {'section_1': 'Write local insight for city X2',
     'section_2': ''}}]},
 {'personnel_id': 1233,
  'cards': [{'card_type': 'AddInsight',
    'card_rank': 1,
    'card_image': '',
    'card_location': '',
    'card_content': {'section_1': 'Write local insight for city X3',
     'section_2': ''}}]}]