<a href="https://colab.research.google.com/github/romirahmanw112/BigData2/blob/main/Praktek_big_data_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Install Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Kafka
!wget -q https://archive.apache.org/dist/kafka/3.6.0/kafka_2.12-3.6.0.tgz
!tar -xzf kafka_2.12-3.6.0.tgz

# Download Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Install Python libraries
!pip install -q findspark kafka-python


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/326.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m326.3/326.3 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["KAFKA_HOME"] = "/content/kafka_2.12-3.6.0"

findspark.init()


In [4]:
# Jalankan Zookeeper
!nohup $KAFKA_HOME/bin/zookeeper-server-start.sh \
$KAFKA_HOME/config/zookeeper.properties > /dev/null 2>&1 &

import time
print("Menunggu Zookeeper start (10 detik)...")
time.sleep(10)

# Jalankan Kafka Broker
!nohup $KAFKA_HOME/bin/kafka-server-start.sh \
$KAFKA_HOME/config/server.properties > /dev/null 2>&1 &

print("Menunggu Kafka Broker start (10 detik)...")
time.sleep(10)

print("Kafka Server SIAP!")


Menunggu Zookeeper start (10 detik)...
Menunggu Kafka Broker start (10 detik)...
Kafka Server SIAP!


In [5]:
!$KAFKA_HOME/bin/kafka-topics.sh \
--create \
--topic transaksi-toko \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1

print("Topik transaksi-toko berhasil dibuat")


Created topic transaksi-toko.
Topik transaksi-toko berhasil dibuat


In [6]:
from kafka import KafkaProducer
import json, time, random
from datetime import datetime

def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=json_serializer
)

products = ["Laptop", "Mouse", "Keyboard", "Monitor", "HDMI Cable"]

def send_stream_data(topic_name, num_messages=50):
    for i in range(num_messages):
        data = {
            "transaction_id": i,
            "product": random.choice(products),
            "price": random.randint(100000, 5000000),
            "quantity": random.randint(1, 5),
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        producer.send(topic_name, data)
        time.sleep(0.1)

    print("Data berhasil dikirim ke Kafka")


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, sum as _sum

spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
    ) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("Spark Session aktif")


Spark Session aktif


In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("transaction_id", IntegerType()),
    StructField("product", StringType()),
    StructField("price", IntegerType()),
    StructField("quantity", IntegerType()),
    StructField("timestamp", StringType())
])

df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transaksi-toko") \
    .option("startingOffsets", "earliest") \
    .load()

df_parsed = df_raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

df_parsed.printSchema()


root
 |-- transaction_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- timestamp: string (nullable = true)



In [9]:
df_with_revenue = df_parsed.withColumn(
    "revenue",
    col("price") * col("quantity")
)

df_analysis = df_with_revenue.groupBy("product") \
    .agg(_sum("revenue").alias("total_sales")) \
    .orderBy("total_sales", ascending=False)


In [10]:
query = df_analysis.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sales_table") \
    .start()

print("Streaming berjalan di background")


Streaming berjalan di background


In [11]:
send_stream_data("transaksi-toko", num_messages=50)

time.sleep(5)

result = spark.sql("SELECT * FROM sales_table")
result.show()


Data berhasil dikirim ke Kafka
+-------+-----------+
|product|total_sales|
+-------+-----------+
+-------+-----------+



In [12]:
query.stop()
print("Streaming dihentikan")


Streaming dihentikan
