In [12]:
import os, sys

os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jre-17.0.17.10-hotspot"
os.environ["SPARK_HOME"] = r"C:\Spark\spark-3.5.7-bin-hadoop3"
os.environ['HADOOP_HOME'] = r"C:\hadoop"
os.environ["PATH"] = os.environ["JAVA_HOME"] + r"\bin;" + os.environ["PATH"]

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable


In [2]:
from pyspark.sql import SparkSession

spark:SparkSession = (
    SparkSession.builder \
    .master("local[*]") \
    .appName("KafkaStreamingNotebook")
    # =========================
    # 1. ADD KAFKA CONNECTOR
    # =========================
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
    )
    # =========================
    # 2. RESOURCE – MÁY YẾU
    # =========================
    .config("spark.driver.memory", "1g")
    .config("spark.driver.cores", "1")

    .config("spark.executor.instances", "1")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "768m")
    .config("spark.executor.memoryOverhead", "256m")

    # # =========================
    # # 3. STREAMING TUNE
    # # =========================
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "2")
    .config("spark.streaming.kafka.maxRatePerPartition", "50")

    # # =========================
    # # 4. STABILITY
    # # =========================
    # .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)



In [None]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, BooleanType

schema = (
    StructType()
    .add("type", StringType(), True)
    .add("id", StringType(), True)
    .add("videoId", StringType(), True)
    .add(
        "data",
        StructType()
        .add("message", StringType(),True)
        .add("authorId", StringType(), True)
        .add("authorImage", StringType(), True)
        .add("authorName", StringType(), True)
        .add("timestamp", StringType(), True)
        .add("isModerator", BooleanType(), True))
        ,True
)

streaming_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9094")
    .option("subscribe", "comment")
    .option("MaxOffsetsPerTrigger", "10000")
    .option("startingOffsets", "earliest")
    .load()
)


In [None]:
from pyspark.sql.functions import col, from_json, current_timestamp

# 1. Parse JSON và giữ lại các metadata quan trọng từ Kafka
raw_df = streaming_df.select(
    from_json(col("value").cast("string"), schema).alias("data"),
    col("timestamp").alias("kafka_timestamp"), # Thời gian tin nhắn vào Kafka
    col("offset"),
    col("partition")
)

# 2. Flatten và xử lý lỗi
processed_df = raw_df.select("data.*", "kafka_timestamp", "offset", "partition") \
    .filter("id IS NOT NULL") # Ví dụ: Loại bỏ các bản ghi không parse được (rác)

# 3. Thêm Processing Timestamp (để đo độ trễ hệ thống - Observability)
final_df = processed_df.withColumn("processing_time", current_timestamp())

In [None]:
final_df.printSchema()

root
 |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- videoId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- authorId: string (nullable = true)
 |    |-- authorImage: string (nullable = true)
 |    |-- authorName: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- isModerator: boolean (nullable = true)



In [None]:
df_exploded = final_df.selectExpr("type", "id", "videoId", "data.*")

In [17]:
from pyspark.sql.functions import pandas_udf
import pandas as pd
import unicodedata

# Định nghĩa hàm chuẩn hóa bằng Python (chạy trên Worker)
@pandas_udf("string")
def unicode_normalize_udf(s: pd.Series) -> pd.Series:
    return s.apply(lambda x: unicodedata.normalize('NFC', x) if x else x)

In [None]:
from pyspark.sql.functions import trim, lower, col, regexp_replace, when, length, expr

# 1. Định nghĩa Patterns
url_pattern = r"https?://\S+|www\.\S+"
email_pattern = r"\S+@\S+"
special_char_pattern = r"(?<=[a-zA-Z])([^\w\s])(?=[a-zA-Z])"
from pyspark.sql.functions import trim, lower, col, regexp_replace, when, length

# Pipeline tối ưu hóa hiệu năng
df_comment_cleaned = df_exploded \
    .dropna(subset=["message"]) \
    .withColumn("message_clean", unicode_normalize_udf(col("message"))) \
    .withColumn(
        "message_clean",
        trim(
            regexp_replace(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(
                            lower(col("message_clean")), 
                            url_pattern, ""
                        ), 
                        email_pattern, ""
                    ),
                special_char_pattern, " "
                ),
                r"\s+", " " # Squish khoảng trắng
                )
            )
        )

In [None]:
import requests
import json
from pyspark.sql.functions import col, lit, struct, to_json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# 1. Cấu hình HTTP Session với Retry (Global object trên mỗi Executor)
def get_http_session():
    session = requests.Session()
    retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
    session.mount('https://', HTTPAdapter(max_retries=retries))
    return session

def model_inference_api(batch_df, batch_id):
    # Dùng broadcast variable cho URL nếu cần
    MODEL_URL = "https://api.model-serving.local/v1/predict"
    TIMEOUT = 2.0  # Seconds (Phải nhỏ hơn Batch Interval)
    
    # Extract features thành JSON để call API
    records = batch_df.select("id", "model_input").collect()
    
    if not records:
        return

    payload = [
        {"id": row.id, "features": row.model_input.feature_vector.toArray().tolist()} 
        for row in records
    ]
    
    session = get_http_session()
    try:
        response = session.post(MODEL_URL, json=payload, timeout=TIMEOUT)
        if response.status_code == 200:
            results = response.json() # List of {id, score, label}
            # Join results back to processed_df (pseudo-code, normally done via another DataFrame join)
        else:
            print(f"API Error: {response.status_code}")
    except Exception as e:
        print(f"Request Failed: {e}")
        # Send to DLQ
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
# =========================================
# 5. FAN-OUT SINKS (ForeachBatch Implementation)
# =========================================

def write_to_postgres(df, epoch_id):
    # JDBC Properties
    jdbc_url = "jdbc:postgresql://postgres:5432/warehouse"
    props = {
        "user": "user",
        "password": "password",
        "driver": "org.postgresql.Driver"
    }
    # Write Mode: Append (Upsert handled via temporary table in real prod)
    df.select("id", "message_clean", "timestamp", "label") \
        .write.jdbc(url=jdbc_url, table="comments", mode="append", properties=props)

def write_to_elasticsearch(df, epoch_id):
    # ES Config
    es_conf = {
        "es.nodes": "elasticsearch",
        "es.port": "9200",
        "es.resource": "comments/_doc",
        "es.mapping.id": "id"
    }
    df.select("id", "message_clean", "label") \
        .write.format("org.elasticsearch.spark.sql") \
        .options(**es_conf) \
        .mode("append").save()

def write_to_clickhouse(df, epoch_id):
    # ClickHouse JDBC
    jdbc_url = "jdbc:clickhouse://clickhouse:8123/default"
    props = {
        "user": "default",
        "password": "",
        "driver": "com.clickhouse.jdbc.ClickHouseDriver"
    }
    df.select("id", "message_clean", "label") \
        .write.jdbc(url=jdbc_url, table="comments_analytics", mode="append", properties=props)

def write_to_s3(df, epoch_id):
    # Data Lake
    df.write.mode("append").partitionBy("timestamp").parquet("s3a://datalake/comments/")

def master_process_batch(batch_df, epoch_id):
    # Cache batch for multiple sinks
    batch_df.persist()
    
    # 1. Model Inference
    # model_inference_api(batch_df) --> In non-streaming/simulation, we might skip or mock this
    
    # 2. Fan-out
    print(f"Processing Batch {epoch_id}, Size: {batch_df.count()}")
    
    # Parallelize these in production, here sequential for safety
    try:
        write_to_postgres(batch_df, epoch_id)
    except Exception as e:
        print(f"Postgres Error: {e}")
        
    try:
        write_to_elasticsearch(batch_df, epoch_id)
    except Exception as e:
        print(f"ES Error: {e}")
        
    try:
        write_to_clickhouse(batch_df, epoch_id)
    except Exception as e:
        print(f"ClickHouse Error: {e}")
        
    try:
        write_to_s3(batch_df, epoch_id)
    except Exception as e:
        print(f"S3 Error: {e}")

    batch_df.unpersist()


In [None]:

query = (
    streaming_df.writeStream
    .format("console") # Hoặc "delta", "parquet"
    .option("checkpointLocation", "s3://my-bucket/checkpoints/comment_stream/") # Quan trọng nhất
    .trigger(processingTime='10 seconds') # Sử dụng Micro-batch mỗi 10s
    .start()
)

query.awaitTermination()