Skip to content
No description, website, or topics provided.
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
kafka/data
neo4j
README.adoc
docker-compose.yml
graph.png

README.adoc

Kafka Connect - Neo4j

This repository shows how to load Tweets from the twint library into Kafka, and then from Kafka into Neo4j using the Kafka Connect Sink.

Launch Kafka Cluster + Neo4j

git clone git@github.com:mneedham/kafka-connect-neo4j.git && cd kafka-connect-neo4j
docker-compose up

Dependencies

pip install --upgrade -e git+https://github.com/twintproject/twint.git@origin/master#egg=twint
pip install confluent-kafka[avro]

Loading Data into Kafka

import twint
import sys
import json

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
         { "name": "id",        "type": "long" },
         { "name": "tweet",     "type": "string" },
         { "name": "datetime",  "type": "long" },
         { "name": "username",  "type": "string" },
         { "name": "user_id",   "type": "long" },
         { "name": "hashtags",  "type": {"type": "array", "items": "string"} }
   ]
}
"""

key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

kafka_broker = 'localhost:9092'
schema_registry = 'http://localhost:8081'

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)

producer = AvroProducer({
    'bootstrap.servers': kafka_broker,
    'schema.registry.url': schema_registry
    }, default_key_schema=key_schema, default_value_schema=value_schema)


module = sys.modules["twint.storage.write"]

def Json(obj, config):
    tweet = obj.__dict__
    print(tweet)
    producer.produce(topic='tweets10', value=tweet, key={"name": "Key"})
    producer.flush()

module.Json = Json

c = twint.Config()
c.Search = "neo4j OR \"graph database\" OR \"graph databases\" OR graphdb OR graphconnect OR @neoquestions OR @Neo4jDE OR @Neo4jFr OR neotechnology"
c.Store_json = True
c.Custom["user"] = ["id", "tweet", "user_id", "username", "hashtags", "mentions"]
c.User_full = True
c.Output = "tweets.json"
c.Since = "2019-05-20"
c.Hide_output = True

twint.run.Search(c)

Create Neo4j Sink Connector

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d '{
      "name": "connect.sink.neo4j.tweets",
      "config": {
        "topics": "tweets10",
        "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
        "neo4j.server.uri": "bolt://neo4j:7687",
        "neo4j.authentication.basic.username": "neo4j",
        "neo4j.authentication.basic.password": "neo",
        "neo4j.topic.cypher.tweets10": "WITH event AS data MERGE (t:Tweet {id: data.id}) SET t.text = data.tweet, t.createdAt = datetime({epochmillis:data.datetime}) MERGE (u:User {username: data.username}) SET u.id = data.user_id   MERGE (u)-[:POSTED]->(t) FOREACH (ht IN data.hashtags | MERGE (hashtag:HashTag {value: ht}) MERGE (t)-[:HAS_HASHTAG]->(hashtag))"
      }
    }'

Browse Data in Neo4j

Navigate to http://localhost:7474 and paste the following into the query pane:

MATCH path = (u:User)-[:POSTED]->(t:Tweet)-[:HAS_HASHTAG]->(ht)
RETURN path
LIMIT 100
graph
You can’t perform that action at this time.