This repository contains an example of Twitter realtime analysis pipeline.
- High level design
- Dependencies
- How so start
- How to see the results
- Twitter Source Job Description
- Flink Sentiment Analysis Pipeline Description (including ML model)
- Flink Geo Aggregation Pipeline Description
- UI Description
- Kubernetes
- Helm
- Google Cloud SDK (optional to create a Kubernetes cluster)
- Install Google Cloud SDK (optional)
- Install kubectl
- Install helm:
curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash
- Log in into Twitter
- Apply for a developer account
- Create an application
- Set Keys and Tokens
echo -n {CONSUMER_KEY} > k8s/.env/CONSUMER_KEY
echo -n {CONSUMER_SECRET} > k8s/.env/CONSUMER_SECRET
echo -n {TOKEN} > k8s/.env/TOKEN
echo -n {TOKEN_SECRET} > k8s/.env/TOKEN_SECRET
make cluster
(this command will create Kubernetes cluster. Skip if you already have one)make initialize
(this command installshelm
andsecret
with Twitter credentials)- wait for couple of minutes for helm to initialize
make infra
(this command installs kafka/cassandra/flink/es)- wait until
kubectl pods
shows that all 3 instances of each cassandra and kafka are up and running make start
(this command will start twitter source job, flink pipelines and ui)
- UI:
kubectl get service/ui-app-svc -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
will give you ip and then open http://{ip}:8080 - Kibana:
kubectl get service/kibana-rest -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
will give you ip and then open http://{ip}:5601 - Cassandra:
echo "select count(*) from ks.twitter_sentiment_analysis; exit" | kubectl exec -i cassandra-client cqlsh cassandra
Twitter Source job is pretty simple: it uses Twitter's client library to connect to twitter realtime API and writes all received tweets into Kafka.
This pipeline performs a simple task - applies sentiment prediction model to every tweet. The model is probably the most interesting part of this pipeline.
This is how this model is created (you can see this process in details here):
- We need a dataset of texts with assigned sentiment - I took Sentiment140
- We also need a set of pre-trained word embeddings - I took Glove
- Using pre-trained word embeddings and dataset of texts with assigned sentiment we can train an Xgboost classifier which predicts sentiment for the given word enbedding
- Since the trained model is a Python object and we need to use it in JVM environment, I used m2cgen library to generate Java code for the model (I'm also a co-author of this library)
- The only thing left is to export word embeddings to use them in the flink pipeline
- Now we can apply this model for every tweet in real time.
This pipeline aggregates tweets by geographical location as well as by 5 seconds intervals. The output of this pipeline is the stream of messages in the following format:
{
"created": "2019-09-08 07:01:14.999 UTC",
"coordinates": {"lon":43.20000076293945, "lat":43.20000076293945},
"numberOfEvents": 4
}
Here are the steps we need to take in order to do such aggregation:
- Deserialize json-formatted string representing single tweet into Scala object
- Since we want to process tweets in real time, we need to use EventTime, which in turn requires extracting watermarks from the stream of tweets. Since tweets might not arrive in order we need to accommodate for some lateness. After some experiments I've discovered that bounded lateness of 3.5 seconds provides the best trade off between the number of tweets arriving after the window has been closed and the number of open windows.
- In order to group tweets by location and by time, following transformations are performed:
- Split the world's map into cells
- Convert each tweet into (cellX, cellY) tuple
- Apply time window of 5 seconds
- Calculate the number of tweets per each window/cell
- At this point we have a stream of ((cellX, cellY), numberOfEvents) tuples
- Now all we need to do is to convert (cellX, cellY) to (lon, lat) and to add the window's timestamp to the final message
- Serialize final message into JSON and push it into Kafka.
UI consists of 2 parts: Web Client written in JS connecting to NodeJS server using WebSockets. The implementation of both of them is pretty straightforward:
- NodeJS server connects to Kafka at the startup
- When user opens the page in the browser, web client establishes WebSocket connection to the NodeJS server
- Server keeps a set of current WebSocket connections from the Web clients
- For every messages consumed from Kafka, Server writes it to every WebSocket connection.
- When WebSocket connection is closed, Server deletes it from the set of current connections.