# Heartbeat
---

This project serves as a way to measure the "heartbeat" of the Internet. In this case, the stethoscope is this AI-enabled system, and we are measuring the state via Twitter.

The system runs in a local Kubernetes (K8s) cluster, but can conceivably be pushed to the cloud with ease. Since running a somewhat intensive K8s cluster is not an easy task, I will demonstrate the main mechanics of the project in this walkthrough and mock the Kubernetes services.

Inside the K8s cluster is a Kafka service at the center of it all. This is also difficult to set up, so this will also be mocked here.

If running from Google Colab, run the command below to install the necessary Python dependencies to get started.

In [249]:
!pip install tweepy pyyaml "transformers[torch]" sty scipy

Collecting scipy
  Downloading scipy-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (42.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.3/42.3 MB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: scipy
Successfully installed scipy-1.8.0


# Mocking

## Kafka

In [257]:
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Dict, List, Tuple, Union

from sty import fg, rs

Kafka has a number of brokers that deal in messages based on topics. Producers and consumers can operate on these message streams by requesting or supplying data based on the desired topic.

Any data that enters Kafka is serialized by some method. In the real system, data is serialized by custom numeric serializers and an [Apache Avro](https://avro.apache.org/docs/current/spec.html) serializer. All tweet and sentiment data payloads sent via Kafka are stored as Avro data, which is a data serialization system for arbitrary data. To use it, I define schemas based on how the data is expected to present itself. 

These schemas are registered with a Kafka SchemaRegistry service running in the K8s cluster. Here, I will just convert to and from normal dictionaries.

In [365]:
class KafkaStream:
    def __init__(self):
        print(fg.cyan + "KAFKA " + rs.all, end="")
        print("Connected to new Kafka broker")
        self.data = defaultdict(list)
        self.lock = threading.Lock()

    def add(self, topic: str, key: int, value: dict) -> None:
        self.lock.acquire()
        print(fg.cyan + "KAFKA " + rs.all, end="")
        print(fg.magenta + f"{topic} " + rs.all, end="")
        print(fg.green + "ADD" + rs.all + f" {key} = {value}")
        self.data[topic].append((key, value))
        self.lock.release()
        time.sleep(0.1)

    def get(self, topic) -> Tuple[int, str]:
        self.lock.acquire()
        print(fg.cyan + "KAFKA " + rs.all, end="")
        print(fg.magenta + f"{topic} " + rs.all, end="")
        if len(self.data[topic]) > 0:
            res = self.data[topic].pop(0)
            print(fg.red + "GET" + rs.all + f" {res[0]} = {res[1]}")
        else:
            res = None
            print(fg.red + "GET" + rs.all + " EMPTY")
        self.lock.release()
        time.sleep(0.1)
        return res 

Producers, as their name would suggest, produce data to given topics in Kafka. Consumers pull the data out of Kafka as it is produced by the producers.

In [316]:
class KafkaProducer:
    def __init__(self, stream: KafkaStream, serialize: Callable):
        self.stream = stream
        self.serialize = serialize
        self.buffer = []
    
    def produce(self, topic, key, value) -> None:
        self.buffer.append((topic, key, self.serialize(value, None)))
        
    def poll(self) -> int:
        size = len(self.buffer)
        for item in self.buffer:
            self.stream.add(*item)
        self.buffer = []
        return size

In [372]:
class KafkaConsumer:
    def __init__(self, stream: KafkaStream, deserialize: Callable):
        self.stream = stream
        self.deserialize = deserialize
        self.topic = None
        
    def subscribe(self, topic) -> None:
        self.topic = topic
    
    def poll(self) -> Any:
        res = self.stream.get(self.topic)
        if res is None:
            return None
        key, value = res
        return key, self.deserialize(value, None)

## Twitter

Here is a client that will produce random fake Tweet data for this demonstration. Access to the actual API requires authorization dependent on the user. I have an account with [Elevated](https://developer.twitter.com/en/docs/twitter-api/getting-started/about-twitter-api#v2-access-level) access that I personally requested from Twitter, so I figure it would not be a good idea to publicly post the tokens on GitHub.

In [318]:
import random
from dataclasses import dataclass
from datetime import datetime
from threading import Thread

import requests

In [319]:
@dataclass
class TweepyTweet:
    id: int
    text: str
    created_at: datetime

In [320]:
class TweepyClient(ABC):
    def __init__(self):
        word_site = "https://www.mit.edu/~ecprice/wordlist.10000"
        response = requests.get(word_site)
        self.vocab = [b.decode("utf-8") for b in response.content.splitlines()]

    def start(self, num: int = 100):
        return Thread(target=self.generate, args=(num,)).start()

    def generate(self, num: int):
        for i in range(num):
            tweet = TweepyTweet(
                id=random.randint(0, 10000),
                text=" ".join(random.sample(self.vocab, 15)),
                created_at=datetime.utcnow(),
            )
            self.on_tweet(tweet)

    @abstractmethod
    def on_tweet(self, tweet):
        pass

# Framework

This code here is similar to what I was actually writing. Here is an abstract data type used to define some form of data being stored in Kafka.

In [374]:
from abc import ABC, abstractmethod


class ADT(ABC):
    def to_dict(self, ctx):
        return self.__dict__

    @classmethod
    def from_dict(cls, obj, ctx):
        if obj is None:
            return None
        return cls(**obj)

    @classmethod
    @property
    @abstractmethod
    def schema(cls) -> str:
        pass

In [375]:
class Producer(KafkaProducer):
    def __init__(self, topic: str, data: ADT, stream: KafkaStream):
        self.topic = topic
        super().__init__(stream, data.to_dict)
        
    def produce(self, key: int, value: Any) -> None:
        super().produce(self.topic, key, value)

In [376]:
class Consumer(KafkaConsumer):
    def __init__(self, data: ADT, stream: KafkaStream):
        super().__init__(stream, data.from_dict)

# Twitter Ingest

In [344]:
import tweepy

The first step in the system is to ingest content from Twitter. In this case, data comes from the Twitter filtered stream endpoint via `POST /2/tweets/search/stream`.

## Data

Data is represented using one of my abstract data types. I also define a schema for the Kafka SchemaRegistry service so that it can properly understand the incoming and outgoing data.

Twitter data only needs 3 attributes for representing Tweets:

* **task** - the Heartbeat tasking. This is the subject of the search. In our case, I was searching for Russia-Ukraine information.
* **content** - the text content of the Tweet.
* **time** - timestamp with only second precision. High precision is not really needed for this purpose.

In [345]:
class Tweet(ADT):
    def __init__(self, task: str, content: str, time: int):
        self.task = task
        self.content = content
        self.time = time

    @classmethod
    @property
    def schema(cls) -> str:
        return """
        {
            "name": "tweet",
            "type": "record",
            "namespace": "heartbeat",
            "fields": [
                {
                    "name": "time",
                    "type": {
                        "type": "int", 
                        "logicalType": "timestamp-millis"
                    }
                },
                {"name": "content", "type": "string"},
                {"name": "task", "type": "string"}
            ]
        }
        """

## Tweepy - Kafka Interaction

Withe some of the mocking and setup out of the way, we can look at the services. The first part of the Heartbeat system involves ingesting the data from Twitter and producing it to the "ingest" topic in Kafka.

This simple service just connects to the Twitter stream endpoint, and as Tweets are received, pushes structured data to the topic. Data comes in from a Python wrapper for the Twitter API called `Tweepy`. This library allowed for very simple access to the Twitter API. The work here was just a matter of connecting the stream of data coming from Twitter and Tweepy to Kafka.

In [379]:
class TwitterIngest(TweepyClient):
    def __init__(self, task: str, producer: Producer):
        self.task = task
        self.producer = producer
        super().__init__()

    def on_tweet(self, tweet):
        payload = Tweet(self.task, tweet.text, tweet.created_at.timestamp())
        self.producer.produce(key=tweet.id, value=payload)

    def poll(self) -> None:
        empty = False
        while not empty:
            time.sleep(0.3)
            empty = self.producer.poll() == 0
        print(">>> Exiting Twitter ingest")

## Demo

In [380]:
stream = KafkaStream()

[36mKAFKA [0mConnected to new Kafka broker


In [381]:
producer = Producer("ingest", Tweet, stream)
ingest = TwitterIngest("RU-UKR", producer)

In [382]:
t = ingest.start(5)
ingest.poll()

[36mKAFKA [0m[35mingest [0m[32mADD[0m 7868 = {'task': 'RU-UKR', 'content': 'slovakia vibrators acrobat boy each reconstruction leaf bugs operated health army rendering audio bath continues', 'time': 1652072799.379839}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 7489 = {'task': 'RU-UKR', 'content': 'oldest copper romantic thats defense largely pas qty perceived uses playlist totally human tapes keyboard', 'time': 1652072799.379868}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 8968 = {'task': 'RU-UKR', 'content': 'dried pill expiration attached symbol estimated past instrumental conservation tickets litigation souls docs penetration outstanding', 'time': 1652072799.379883}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 5836 = {'task': 'RU-UKR', 'content': 'slideshow add associates decade learned bonds kruger hollow vinyl conditional flashers encryption instantly awards legislature', 'time': 1652072799.379897}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 2722 = {'task': 'RU-UKR', 'conten

So now we have an incoming stream of Tweets with a very simple data schema. Next, we just have to process this data for sentiment.

# Sentiment Analysis

## Data

As before, there will be some more data structures to deal with. This time, we will be using data storage for sentiment analysis results.

This time, we are still storing the task information, but we are now also storing 3 different floating-point values for Tweet sentiments.

* **task** - the Heartbeat tasking. This is the subject of the search. In our case, I was searching for Russia-Ukraine information.
* **time** - timestamp with only second precision. High precision is not really needed for this purpose. Should still be the timestamp from Twitter, not one that we create.
* **pos** - likelihood of positive sentiment. Values range from 0 to 1 such that higher values indicate higher likelihood. Positive sentiment indicates favorable opinion of the tasking.
* **neu** - likelihood of neutral sentiment. Neutral sentiment indicates no particular positive or negative opinion.
* **neg** - likelihood of negative sentiment. Negative sentiment indicates some degree of dislike with the tasking.

In [351]:
class Sentiment(ADT):
    def __init__(
        self, task: str, time: int, pos: float, neu: float, neg: float
    ):
        self.task = task
        self.time = time
        self.pos = pos
        self.neu = neu
        self.neg = neg

    @classmethod
    @property
    def schema(cls) -> str:
        return """
        {
            "name": "sentiment",
            "type": "record",
            "namespace": "heartbeat",
            "fields": [
                {
                    "name": "time",
                    "type": {"type": "int", "logicalType": "timestamp-millis"}
                },
                {"name": "task", "type": "string"},
                {"name": "pos", "type": "float"},
                {"name": "neu", "type": "float"},
                {"name": "neg", "type": "float"}
            ]
        }
        """

## Model Inference

In [352]:
from transformers import AutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
import numpy as np
from scipy.special import softmax


class SentimentAnalyzer:
    def __init__(self):
        model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.config = AutoConfig.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name
        )

    def score(self, text: str) -> List[float]:
        processed = SentimentAnalyzer.preprocess(text)
        encoded_input = self.tokenizer(processed, return_tensors="pt")
        output = self.model(**encoded_input)
        scores = output[0][0].detach().numpy()
        return softmax(scores) # negative, neutral, positive

    @staticmethod
    def preprocess(text: str) -> str:
        # Preprocess text (username and link placeholders)
        new_text = []
        for t in text.split(" "):
            t = "@user" if t.startswith("@") and len(t) > 1 else t
            t = "http" if t.startswith("http") else t
            new_text.append(t)
        return " ".join(new_text)

In [383]:
def analyze(stream, analyzer):
    consumer = Consumer(Tweet, stream)
    consumer.subscribe("ingest")
    producer = Producer("sentiment", Sentiment, stream)

    empty = 0
    while empty < 3:
        msg = consumer.poll()
        if msg is None:
            empty += 1
            continue

        key, tweet = msg
        if tweet is not None:
            negative, neutral, positive = analyzer.score(tweet.content)
            sentiment = Sentiment(
                task=tweet.task,
                time=tweet.time,
                pos=positive,
                neu=neutral,
                neg=negative,
            )
            producer.produce(key=key, value=sentiment)
        producer.poll()
    print(">>> Exiting sentiment analysis")

## Demo

Next, we just have to load the model from [HuggingFace](https://huggingface.co/cardiffnlp/twitter-roberta-base-sentiment-latest), where pretrained models are made extremely accessible for users.

In [333]:
# Set up sentiment analysis
analyzer = SentimentAnalyzer()

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


With the models loaded, we can determine sentiment from the text. Note that the return order from the model is [negative, neutral, positive].

In [354]:
analyzer.score("We are absolutely loving this!")

array([0.00420294, 0.00935337, 0.98644376], dtype=float32)

In [355]:
analyzer.score("This is terrible")

array([0.8545521 , 0.11869626, 0.02675165], dtype=float32)

In [356]:
analyzer.score("HuggingFace is a machine learning library")

array([0.02685222, 0.7657344 , 0.20741342], dtype=float32)

We can even attach this analyzer to the Tweet data. The text is random gibberish, so the sentiment outputs are not very useful, but the system can still be demonstrated in this manner.

In [357]:
class DemoAnalyzer(TweepyClient):
    def __init__(self, analyzer, *args, **kwargs):
        self.analyzer = analyzer
        super().__init__(*args, **kwargs)

    def on_tweet(self, tweet):
        print(f"> TEXT: {tweet.text}")
        pos, neu, neg = self.analyzer.score(tweet.text)
        print(f"  SENTIMENT: pos={pos:.3f}, neu={neu:.3f}, neg={neg:.3f}")

In [358]:
da = DemoAnalyzer(analyzer)

In [359]:
da.start(5)

> TEXT: breath richard peru compiled sacramento quiet coding cache identified john pd being automotive sisters mountain
  SENTIMENT: pos=0.030, neu=0.900, neg=0.070
> TEXT: neural special broker cio leading collect handled audi widescreen newspapers representative em buried flower ri
  SENTIMENT: pos=0.025, neu=0.928, neg=0.047
> TEXT: mattress gays sea rss whether interests describes logs recovered beverage reviews fr support acceptable be
  SENTIMENT: pos=0.041, neu=0.920, neg=0.040
> TEXT: properties thereafter harper wicked colour chile weekends liberal liabilities rank bukkake seller commodities belize portal
  SENTIMENT: pos=0.637, neu=0.347, neg=0.016
> TEXT: proper sacred shirts highs nasdaq buzz adrian contrast gd cart around preventing player not blogging
  SENTIMENT: pos=0.105, neu=0.781, neg=0.114


# Database Storage

The last step is transfer of the results from Kafka to a database for storage. For this project, I used [InfluxDB](https://github.com/influxdata/influxdb), an efficient time-series database for storing the sentiments according to the timestamps at which they were retrieved. This will again be mocked.

In [389]:
class Database:
    def __init__(self, lock):
        self.data = []
        self.lock = lock
        
    def write(self, item):
        self.lock.acquire()
        print(fg.grey + "DB " + rs.all, end="")
        print(fg.green + "ADD" + rs.all + f" {item}")
        
        self.lock.release()
        self.data.append(item)
        
    def get_all(self):
        print("Database dump...")
        for idx, item in enumerate(self.data):
            print(f"{idx}) {item}")

In [394]:
def transfer(stream: KafkaStream, db: Database):
    consumer = Consumer(Sentiment, stream)
    consumer.subscribe("sentiment")

    empty = 0
    while empty < 10:
        msg = consumer.poll()
        if msg is None:
            empty += 1
            continue

        key, sent = msg
        db.write([key, sent.task, sent.time, sent.pos, sent.neu, sent.neg])
    print(">>> Exiting data storage")

# Full Demo

Now let's put it all together and let the ingest service retrieve data from "Twitter" and push it to "Kafka". Meanwhile, we will let the analyzer service retrieve data from "Kafka" and produce sentiment analysis results.

In [395]:
stream = KafkaStream()
db = Database(stream.lock)
producer = Producer("ingest", Tweet, stream)
ingest = TwitterIngest("RU-UKR", producer)

t = ingest.start(5)
Thread(target=ingest.poll()).start()
Thread(target=analyze, args=(stream, analyzer)).start()
Thread(target=transfer, args=(stream, db)).start()

[36mKAFKA [0mConnected to new Kafka broker
[36mKAFKA [0m[35mingest [0m[32mADD[0m 1783 = {'task': 'RU-UKR', 'content': 'catalogs frozen creatures pontiac fast quarters cite earn saskatchewan performing manually thats closed meet robinson', 'time': 1652073462.491736}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 9088 = {'task': 'RU-UKR', 'content': 'skilled going lie anonymous military rehab convinced heavy mw indonesian top entered museums sixth sensitive', 'time': 1652073462.491762}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 3479 = {'task': 'RU-UKR', 'content': 'piss casual buyers rise soap fa audit ranks monica association buried annotation sao buy killed', 'time': 1652073462.491772}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 3815 = {'task': 'RU-UKR', 'content': 'baths dan precise pot independent difficulty gear lower transit apparently chicago spider mesa crm antiques', 'time': 1652073462.491781}
[36mKAFKA [0m[35mingest [0m[32mADD[0m 289 = {'task': 'RU-UKR', 'content': 

And then, of course, we can get the results in the database and easily inspect them. InfluxDB makes this very easy as well, as it has built-in dashboards for visualization of time series data.

In [396]:
db.get_all()

Database dump...
0) [1783, 'RU-UKR', 1652073462.491736, 0.023776477, 0.82779956, 0.14842393]
1) [9088, 'RU-UKR', 1652073462.491762, 0.03266897, 0.6172506, 0.35008043]
2) [3479, 'RU-UKR', 1652073462.491772, 0.008746965, 0.12436981, 0.8668834]
3) [3815, 'RU-UKR', 1652073462.491781, 0.054548632, 0.9132545, 0.032196872]
4) [289, 'RU-UKR', 1652073462.49179, 0.105619214, 0.86781275, 0.026568046]
