In [None]:
pip install kafka-python

In [None]:
pip install transformers

In [None]:
from kafka.consumer import KafkaConsumer

In [None]:
from kafka.producer import KafkaProducer

In [None]:
from kafka.errors import KafkaError
from transformers import pipeline
import ssl
import json
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from datetime import datetime

In [None]:
bootstrap_servers = ['XXXXXXXXXX:443']
topic = 'consume-topic'
produce_topic = 'language-topic'
username = 'XXXXXXXXXX'
password = 'XXXXXXXXXX'
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Set up a Kafka consumer
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=bootstrap_servers,
    sasl_plain_username=username,
    sasl_plain_password=password,
    security_protocol=security_protocol,
    sasl_mechanism=sasl_mechanism,
    auto_offset_reset='latest',
    enable_auto_commit=True,
)

os.environ['TRANSFORMERS_CACHE'] = '/cache/'

# Set up a Kafka producer
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    sasl_plain_username=username,
    sasl_plain_password=password,
    security_protocol=security_protocol,
    sasl_mechanism=sasl_mechanism
)

# Load the BERT model and tokenizer
model_name = 'Hate-speech-CNERG/english-abusive-MuRIL'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Continuously listen for incoming messages and analyze their sentiment
#for message in consumer:
#    text = message.value.decode('utf-8')
#    sentiment = analyze_sentiment(text)
#    print(text)
#    producer.send(produce_topic, sentiment.encode('utf-8'))
    
# Start consuming Kafka messages
for message in consumer:
    # Get the text message from the Kafka message
    timestamp = datetime.fromtimestamp(message.timestamp/1000.0)
    print(timestamp)
    text = message.value.decode('utf-8')
    print(text)
    # Tokenize the text message
    inputs = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors='pt')
    inputs = inputs.to(device)

    # Use the BERT model to predict the sentiment
    outputs = model(**inputs)
    print(outputs)
    predictions = torch.softmax(outputs.logits, dim=1).detach().cpu().numpy()
    sentiment = int(predictions.argmax(axis=1)[0]) - 1  # Convert 0-4 to -1-3
    print(sentiment)
    customer_id = 1023
    product_id = 1
    sentiment_output = f"{customer_id},{product_id},{sentiment}" 
    # Produce a response message with the sentiment
    response_message = f"{timestamp} {customer_id},{product_id}, {text} ({'Non-Abusive' if sentiment < 0 else 'Abusive'})"
    if sentiment == 0:
        producer.send(produce_topic, response_message.encode('utf-8'))