In [None]:
# notebooks/transform.ipynb
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
import pandas as pd
import json

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("StockSentimentAnalysis") \
    .config("spark.mongodb.input.uri", "mongodb://mongodb:27017/stock_db.news") \
    .config("spark.mongodb.output.uri", "mongodb://mongodb:27017/stock_db.processed_news") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Tải dữ liệu từ Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "stock_news") \
    .load()

# Parse JSON từ Kafka
def parse_json(value):
    try:
        data = json.loads(value.decode('utf-8'))
        return data['title']
    except:
        return ""

parse_udf = udf(parse_json, StringType())
df = df.select(parse_udf("value").alias("title"))

# Tiền xử lý văn bản
nltk.download('punkt')
nltk.download('stopwords')
stop_words = set(stopwords.words('vietnamese'))

def preprocess_text(text):
    tokens = word_tokenize(text.lower())
    tokens = [word for word in tokens if word.isalpha() and word not in stop_words]
    return " ".join(tokens)

preprocess_udf = udf(preprocess_text, StringType())
df_processed = df.withColumn("cleaned_text", preprocess_udf("title"))

# Lưu dữ liệu thô vào MongoDB
query_raw = df_processed.writeStream \
    .format("mongo") \
    .option("database", "stock_db") \
    .option("collection", "raw_news") \
    .outputMode("append") \
    .start()

# Phân loại sentiment
def process_batch(df_batch, batch_id):
    pandas_df = df_batch.toPandas()
    if not pandas_df.empty:
        tfidf = TfidfVectorizer(max_features=5000)
        X = tfidf.fit_transform(pandas_df['cleaned_text'])
        y = [1 if "tăng" in text else 0 for text in pandas_df['cleaned_text']]
        model = MultinomialNB()
        model.fit(X, y)
        predictions = model.predict(X)
        pandas_df['sentiment'] = predictions
        spark_df = spark.createDataFrame(pandas_df)
        spark_df.write.format("mongo") \
            .mode("append") \
            .option("database", "stock_db") \
            .option("collection", "processed_news") \
            .save()

query_processed = df_processed.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .start()

# Chờ xử lý
query_raw.awaitTermination()
query_processed.awaitTermination()

: 