## Kafka Consumer
Reads tweet text coming from producer, performs NER and Relation Extraction and saves the results in mongoDB

Requires:
- topic e.g. 'covid' or 'blm' - same as specified in producer
- database - mongodb database name

In [None]:
import json
import spacy
from spacy import displacy
from kafka import KafkaConsumer
from pymongo import MongoClient
from openie import StanfordOpenIE
import pandas as pd

In [None]:
def insert_mongodb(my_dict, collection):
    try:
        rec_id1 = db[collection].insert_one(my_dict)
        print("Data inserted with record ids", rec_id1)
    except:
        print("Could not insert record in Mongo collection: " + collection)

In [None]:
topic = 'covid'
database = 'analysisDB'
twitter_collection = topic
vertices_collection = topic + "_vertices"
edges_collection = topic + "_edges"
ne_collection = topic + "_nes" 

In [None]:
# Connect to mongoDB
try:
    client = MongoClient('mongodb://mongo:27017/')
    db = client[database]
    print("Connected to database: " + database)
except:
    print("Could not connect to MongoDB")

In [None]:
client.list_database_names()

In [None]:
consumer = KafkaConsumer(topic, group_id='test-group',
                         bootstrap_servers=['172.27.1.16:9092'],
                         auto_offset_reset='latest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))

# start openie and spacy
openie_client = StanfordOpenIE()
nlp = spacy.load('en_core_web_sm')

for msg in consumer:
    # Get and save tweet text
    text = msg.value
    tweet = {'text': text}
    insert_mongodb(tweet, twitter_collection)
    
    # get relation triples
    reln_triples = openie_client.annotate(text.lower().replace('#', ''))

    #get named entities
    doc = nlp(text.lower().replace('#', ''))
    entities = doc.ents
    
    df = pd.DataFrame(reln_triples)
    drop_df = df.drop_duplicates(['subject','relation'], keep='last')
    
    if len(drop_df.index) > 0:
        for vertex in pd.unique(df[['subject', 'object']].values.ravel('K')):
            temp_dict = {'vertex': (vertex, vertex)}
            insert_mongodb(temp_dict, vertices_collection)
            print(temp_dict)

        for _, row in drop_df.iterrows():
            temp_dict = {"edge": (row['subject'], row['object'], row["relation"])}
            insert_mongodb(temp_dict, edges_collection)
            print(temp_dict)

    for entity in entities:
        temp_dict = {'type': entity.label_, 'text': entity.text}
        insert_mongodb(temp_dict, ne_collection)
        print('=>', temp_dict)
        print()