In [1]:
! pip install kafka-python



In [2]:
! pip install twint nest_asyncio



In [2]:
import json
import pprint
import twint
import nest_asyncio
nest_asyncio.apply()

from kafka import KafkaConsumer, KafkaProducer

In [3]:
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(ex)


def connect_kafka_producer(server):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[server], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

In [4]:
kafka_broker = 'broker:9093'
kafka_producer = connect_kafka_producer(kafka_broker)

In [6]:
pp = pprint.PrettyPrinter(indent=4)

# Events on this topic are produced by Neo4j Streams CDC
consumer = KafkaConsumer("users2", 
                         auto_offset_reset='earliest',
                         bootstrap_servers=[kafka_broker], 
                         api_version=(0, 10),                     
                         consumer_timeout_ms=1000,
                        value_deserializer = json.loads)
for msg in consumer:
    pp.pprint(msg.value)

{   'meta': {   'operation': 'deleted',
                'source': {'hostname': 'neo4j'},
                'timestamp': 1558278302233,
                'txEventId': 0,
                'txEventsCount': 27,
                'txId': 7226,
                'username': 'neo4j'},
    'payload': {   'after': None,
                   'before': {   'labels': ['User'],
                                 'properties': {   'bio': 'a geek mother of '
                                                          'two girls   hold '
                                                          'aaPhD from '
                                                          '@INSIGHT_Centre  '
                                                          'Galway '
                                                          '@yordangunawan wife '
                                                          'works @its_campus',
                                                   'followers': 2570,
                                       

{   'meta': {   'operation': 'created',
                'source': {'hostname': 'neo4j'},
                'timestamp': 1558278316559,
                'txEventId': 12,
                'txEventsCount': 21,
                'txId': 7229,
                'username': ''},
    'payload': {   'after': {   'labels': ['User'],
                                'properties': {   'id': 22467617,
                                                  'username': 'neo4j'}},
                   'before': None,
                   'id': '5566',
                   'type': 'node'},
    'schema': {'constraints': None, 'properties': []}}


In [7]:
# Events on this topic are filtered by KSQL
consumer = KafkaConsumer("USERS2_CREATED", 
                         auto_offset_reset='earliest',
                         bootstrap_servers=[kafka_broker], 
                         api_version=(0, 10),                     
                         consumer_timeout_ms=1000,
                        value_deserializer = json.loads)

for msg in consumer:
    document = msg.value
    
    c = twint.Config()
    c.Username = document["PAYLOAD"]["AFTER"]["properties"]["username"]
    c.Store_json = False
    c.User_full = True
    c.Store_object = True
    c.Hide_output = True

    twint.run.Lookup(c)
    
    user = twint.output.user_object[0].__dict__
    print(user["username"], user)
    
    publish_message(kafka_producer, "users_enriched", user["name"], json.dumps(user))
    
    twint.output.user_object = []

rvanbruggen {'id': '18263413', 'name': 'Rik Van Bruggen', 'username': 'rvanbruggen', 'bio': 'Mad about graphs. Working on and with Neo4j. Write about that on http://blog.bruggen.com\xa0.', 'location': 'Antwerp', 'url': 'http://www.neo4j.com', 'join_date': '19 Dec 2008', 'join_time': '11:43 PM', 'tweets': 3402, 'following': 1176, 'followers': 2010, 'likes': 3978, 'media_count': 545, 'is_private': 0, 'is_verified': 0, 'avatar': 'https://pbs.twimg.com/profile_images/799599347633684481/IdlOGInc_400x400.jpg', 'background_image': 'https://pbs.twimg.com/profile_banners/18263413/1383756105/1500x500'}
Message published successfully.
GraphConnect {'id': '599665849', 'name': 'GraphConnect', 'username': 'GraphConnect', 'bio': 'The largest global gathering of graph technology experts and enthusiasts.', 'location': '', 'url': 'http://graphconnect.com', 'join_date': '4 Jun 2012', 'join_time': '4:50 PM', 'tweets': 5251, 'following': 2627, 'followers': 4884, 'likes': 1419, 'media_count': 1479, 'is_priv

In [None]:
# Events on this topic are filtered by KSQL
consumer = KafkaConsumer("USERS2_CREATED", 
                         auto_offset_reset='earliest',
                         bootstrap_servers=[kafka_broker], 
                         api_version=(0, 10),                     
                         consumer_timeout_ms=1000,
                        value_deserializer = json.loads)

for msg in consumer:
    document = msg.value
    username = document["PAYLOAD"]["AFTER"]["properties"]["username"]
    print(username)
    
    c = twint.Config()
    c.Username = username
    c.User_full = False
    c.Store_object = True
    c.Hide_output = True

    twint.run.Followers(c)
    followers = twint.output.follow_object

    if not username in followers:
        followers[username] = {"followers": []}
    
    user = {"followers": followers[username]["followers"], "username": username}
    
    publish_message(kafka_producer, "followers", username, json.dumps(user))
    
    twint.output.follow_object = {}

In [None]:
# Events on this topic are filtered by KSQL
consumer = KafkaConsumer("USERS2_CREATED", 
                         auto_offset_reset='earliest',
                         bootstrap_servers=[kafka_broker], 
                         api_version=(0, 10),                     
                         consumer_timeout_ms=1000,
                        value_deserializer = json.loads)

for msg in consumer:
    document = msg.value
    username = document["PAYLOAD"]["AFTER"]["properties"]["username"]
    print(username)
    
    c = twint.Config()
    c.Username = username
    c.User_full = False
    c.Store_object = True
    c.Hide_output = True

    twint.run.Following(c)
    following = twint.output.follow_object

    if not username in following:
        following[username] = {"following": []}
    
    user = {"following": following[username]["following"], "username": username}
    
    publish_message(kafka_producer, "following", username, json.dumps(user))
    
    twint.output.follow_object = {}