In [1]:
# Bản dựng 1

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

# Kafka settings
kafka_topic_name = "stack_exchange"
kafka_bootstrap_servers = "localhost:39092"

# Định nghĩa schema cho dữ liệu
schema = StructType(
    [
        StructField("title", StringType(), True),
        StructField("content", StringType(), True),
        StructField("time", StringType(), True),
        StructField("category", StringType(), True),
        StructField("views", StringType(), True),
        StructField("num_answer", StringType(), True),
        StructField("votes", StringType(), True),
        StructField("solved", StringType(), True),
    ]
)

# Khởi tạo Spark Session
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()

# Định nghĩa schema cho dữ liệu
schema = StructType(
    [
        StructField("title", StringType(), True),
        StructField("content", StringType(), True),
        StructField("time", StringType(), True),
        StructField("category", StringType(), True),
        StructField("views", StringType(), True),
        StructField("num_answer", StringType(), True),
        StructField("votes", StringType(), True),
        StructField("solved", StringType(), True),
    ]
)

# Đọc dữ liệu từ Kafka
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:39092")
    .option("subscribe", "stack_exchange")
    .load()
)

# Chuyển đổi dữ liệu JSON từ Kafka thành DataFrame
df_json = (
    df.selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.*")
)

# Lưu dữ liệu vào file JSON
query = (
    df_json.writeStream.outputMode("append")
    .format("json")
    .option("checkpointLocation", "./checkpoint_folder")
    .option("path", "./data.json")
    .start()
)

# Chờ query kết thúc
query.awaitTermination()

In [None]:
# Bản dựng 2

import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Kafka settings
kafka_topic_name = "stack_exchange"
kafka_bootstrap_servers = "localhost:39092"

# Define schema for the data
schema = StructType(
    [
        StructField("title", StringType(), True),
        StructField("content", StringType(), True),
        StructField("time", StringType(), True),
        StructField("category", StringType(), True),
        StructField("views", StringType(), True),
        StructField("num_answer", StringType(), True),
        StructField("votes", StringType(), True),
        StructField("solved", StringType(), True),
    ]
)

# Initialize Spark Session
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()

# Read data from Kafka
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", kafka_topic_name)
    .load()
)


# Function to process HTML and extract text
def process_html(content):
    # Use regular expressions to remove HTML tags
    cleaned_text = re.sub(r"<.*?>", "", content)
    # Replace multiple whitespaces with a single space
    cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip()
    return cleaned_text


# Define UDF for HTML to text conversion
process_html_udf = udf(process_html, StringType())


# Function to convert views, num_answer, votes to numeric values
def convert_to_numeric(value):
    try:
        if value.endswith("k"):
            return int(float(value[:-1]) * 1000)
        elif value.endswith("m"):
            return int(float(value[:-1]) * 1000000)
        elif value.endswith("b"):
            return int(float(value[:-1]) * 1000000000)
        else:
            return int(value)
    except ValueError:
        return 0


# Define UDF for conversion
convert_to_numeric_udf = udf(convert_to_numeric, IntegerType())

# Convert JSON data from Kafka to DataFrame and apply HTML to text conversion
df_json = (
    df.selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.*")
    .withColumn("content", process_html_udf("content"))
    .withColumn("views", convert_to_numeric_udf("views"))
    .withColumn("num_answer", convert_to_numeric_udf("num_answer"))
    .withColumn("votes", convert_to_numeric_udf("votes"))
)

# Write data to a JSON file
query = (
    df_json.writeStream.outputMode("append")
    .format("json")
    .option("checkpointLocation", "./checkpoint_folder")
    .option("path", "./data")
    .start()
)

# Wait for the query to finish
query.awaitTermination()


In [None]:
# Bản dựng 3

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
import os

# Import UDFs from preprocess.py
import preprocess

# AWS S3 settings
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
s3_bucket = "btl-bigdata"
s3_output_path = "s3a://{}/test/data".format(s3_bucket)

# Kafka settings
kafka_topic_name = "stack_exchange"
kafka_bootstrap_servers = "localhost:39092"

# Initialize Spark Session with package configurations
spark = (
    SparkSession.builder.appName("KafkaConsumer")
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id)
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key)
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    )
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")


# Define schema for the data
schema = StructType(
    [
        StructField("title", StringType(), True),
        StructField("content", StringType(), True),
        StructField("time", StringType(), True),
        StructField("category", StringType(), True),
        StructField("views", StringType(), True),
        StructField("num_answer", StringType(), True),
        StructField("votes", StringType(), True),
        StructField("solved", StringType(), True),
    ]
)


# Read data from Kafka
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", kafka_topic_name)
    .option("startingOffsets", "earliest")
    .load()
)

# Convert JSON data from Kafka to DataFrame and apply HTML to text conversion
df_json = (
    df.selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.*")
    .withColumn("content", preprocess.process_html("content"))
    .withColumn("views", preprocess.convert_to_numeric("views"))
    .withColumn("num_answer", preprocess.convert_to_numeric("num_answer"))
    .withColumn("votes", preprocess.convert_to_numeric("votes"))
)

# # Write data to Amazon S3
# query = (
#     df_json.repartition(1).writeStream.outputMode("append")
#     .format("json")  # Chọn định dạng là JSON
#     .option("checkpointLocation", "s3a://{}/test/checkpoint".format(s3_bucket))
#     .option("path", s3_output_path)  # Đặt tên thư mục là s3_output_path
#     .start()
# )

# Write data to local directory
df_json.show()

# Write data to local directory
query = (
    df_json.writeStream.outputMode("append")
    .format("json")  # Chọn định dạng là JSON
    .option("checkpointLocation", "./output/checkpoint")  # Đặt checkpoint local
    .option("path", "./output/data")  # Đặt đường dẫn local
    .start()
)
# Wait for the query to finish
query.awaitTermination()


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

# Import UDFs from preprocess.py
import preprocess

# Kafka settings
kafka_topic_name = "stack_exchange"
kafka_bootstrap_servers = "localhost:39092"

# Initialize Spark Session with package configurations
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Define schema for the data
schema = StructType(
    [
        StructField("title", StringType(), True),
        StructField("content", StringType(), True),
        StructField("time", StringType(), True),
        StructField("category", StringType(), True),
        StructField("views", StringType(), True),
        StructField("num_answer", StringType(), True),
        StructField("votes", StringType(), True),
        StructField("solved", StringType(), True),
    ]
)

# Read data from Kafka
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", kafka_topic_name)
    .option("startingOffsets", "latest")
    .load()
)

# Convert JSON data from Kafka to DataFrame and apply preprocessing
df_json = (
    df.selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.*")
    .withColumn("content", preprocess.process_html("content"))
    .withColumn("views", preprocess.convert_to_numeric("views"))
    .withColumn("num_answer", preprocess.convert_to_numeric("num_answer"))
    .withColumn("votes", preprocess.convert_to_numeric("votes"))
)

# Write data to a JSON file
query = (
    df_json.writeStream.outputMode("append")
    .format("json")
    .option("checkpointLocation", "./output/checkpoint_folder")
    .option("path", "./output/data")
    .start()
)

# Wait for the query to finish
query.awaitTermination()

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.