using Python 3.9.7


In [None]:
!pip install pyspark
!pip install spark-nlp
!pip install pymongo
!pip install kafka-python
!pip install torch
!pip install transformers


Collecting spark-nlp
  Downloading spark_nlp-5.5.2-py2.py3-none-any.whl.metadata (19 kB)
Downloading spark_nlp-5.5.2-py2.py3-none-any.whl (636 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m636.3/636.3 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-5.5.2
Collecting pymongo
  Downloading pymongo-4.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m35.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstall

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
from pymongo import MongoClient
from kafka import KafkaConsumer

# MongoDB Configuration
mongo_uri = "mongodb+srv://admin:admin@cluster0.zfe25.mongodb.net/ContentModerationSystem?retryWrites=true&w=majority&appName=Cluster0"
mongo_client = MongoClient(mongo_uri)
mongo_db = mongo_client['ContentModerationSystem']
mongo_collection = mongo_db['rcvdata']

# Kafka Configuration
KAFKA_BROKER = "103.48.193.225:9094"
KAFKA_TOPIC = "youtube-live-chat"
KAFKA_USERNAME = "admin"
KAFKA_PASSWORD = "admin"


In [2]:
import sparknlp
from pyspark.sql import SparkSession

spark = sparknlp.start()

print(f"Spark NLP Version: {sparknlp.version()}")


Spark NLP Version: 4.2.8


In [3]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())

2.5.1+cu118
True


In [8]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

# model_path = '/content/drive/MyDrive/models/pb'
model_path = './phoBert_model'

tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def predict_sentiment(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
    inputs = {key: value.to(device) for key, value in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    prediction = torch.argmax(logits, dim=1).item()
    return str(prediction)

predict_sentiment_udf = udf(predict_sentiment, StringType())


In [5]:
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_BROKER],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username=KAFKA_USERNAME,
    sasl_plain_password=KAFKA_PASSWORD,
    auto_offset_reset="earliest",
    group_id="nlp-group",
    session_timeout_ms=30000,
    heartbeat_interval_ms=5000,
    enable_auto_commit=True,
    max_poll_interval_ms=300000
)

In [6]:
import json
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pymongo import MongoClient
from kafka.errors import KafkaError

In [9]:
try:
    print("Waiting for data from Kafka...")

    for message in consumer:
        author = message.value.get('author', 'Unknown')
        text = message.value.get('message', '')


        if not text:
            print("No message content. Skipping...")
            continue

        predicted_label = predict_sentiment(text)

        result = {
            'author': author,
            'message': text,
            'label': int(predicted_label)
        }
        mongo_collection.insert_one(result)

        print(f"Message classified. Author: {author}, Label: {predicted_label}")

except KeyboardInterrupt:
    print("Program stopped (KeyboardInterrupt).")
except KafkaError as e:
    print(f"Kafka error: {e}")
except Exception as e:
    print(f"Error: {e}")

Waiting for data from Kafka...
Message classified. Author: Just little one :33, Label: 0
Message classified. Author: Just D3skyyy, Label: 0
Message classified. Author: Hào PiPy, Label: 0
Message classified. Author: Phương Nam Nguyễn, Label: 0
Message classified. Author: abcdefgiklm, Label: 0
Message classified. Author: MinhPoi, Label: 0
Message classified. Author: lmeomeo333, Label: 0
Message classified. Author: OKEBAYBE, Label: 0
Message classified. Author: Minh Huy Phạm, Label: 0
Message classified. Author: Dược Sỹ Lâm Pink, Label: 0
Message classified. Author: Hieu Dang, Label: 0
Message classified. Author: Mbrakk, Label: 0
Message classified. Author: Toản Trần, Label: 0
Message classified. Author: Dược Sỹ Lâm Pink, Label: 0
Message classified. Author: duc uwu do, Label: 0
Message classified. Author: Đức Nguyễn, Label: 0
Message classified. Author: Huỳnh Ngọc Duy, Label: 0
Message classified. Author: Hoang Le Minh, Label: 0
Message classified. Author: Trương Đức Nhâm, Label: 0
Messa