# BIG DATA MIDDLE TERM EXAM
## Spark Streaming
- Name: Kurnia Cahya Febryanto
- Student ID: 5025201073
- Class: Big Data A
- Lecturer: Abdul Munif, S.Kom., M.Sc.

## Install Library and setup environment

In [17]:
# Install necessary packages
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [18]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [19]:
# Create Spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("News Streaming") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [20]:
# Set log level to ERROR
spark.sparkContext.setLogLevel("ERROR")

## Define Data Structure

In [21]:
# Define the schema
schema = StructType([
    StructField("link", StringType(), True),
    StructField("headline", StringType(), True),
    StructField("category", StringType(), True),
    StructField("short_description", StringType(), True),
    StructField("authors", StringType(), True),
    StructField("date", StringType(), True),
])

## Upload File from Local Computer & saving in folder Colab

In [28]:
from google.colab import files

uploaded = files.upload()

Saving news.zip to news (3).zip


In [29]:
import zipfile
import io

for file_name in uploaded:
    if file_name.endswith('.zip'):
        with zipfile.ZipFile(io.BytesIO(uploaded[file_name]), 'r') as z:
            z.extractall('./input-5025201073')


## Set Input Output Path

In [30]:
# Set the input and output folder paths
input_folder = "./input-5025201073"
output_folder = "/content/output-5025201073"

## Read JSON File

In [32]:
# Read JSON files from the input folder
news_df = spark.readStream.schema(schema).json(input_folder)

## Spark Streaming Process

In [33]:
# Group by category and count the number of news items in each category
grouped_news_df = news_df.groupBy("category").count()

In [41]:
from pyspark.sql.functions import from_unixtime, unix_timestamp, window

news_df_with_timestamp = news_df.withColumn("timestamp", unix_timestamp("date", "yyyy-MM-dd").cast("timestamp"))

watermarked_news_df = news_df_with_timestamp.withWatermark("timestamp", "1 minutes")

grouped_news_df = watermarked_news_df.groupBy(window("timestamp", "1 minutes"), "category").count()

# Write the aggregated results to the output folder
query = grouped_news_df.writeStream \
    .format("json") \
    .option("path", output_folder) \
    .option("checkpointLocation", "/content/checkpoints") \
    .outputMode("append") \
    .start()


In [None]:
# Wait for the streaming to terminate
query.awaitTermination()

In [None]:
# Write the aggregated results to the console
console_query = grouped_news_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

# Write the aggregated results to the output folder
json_query = grouped_news_df.writeStream \
    .format("json") \
    .option("path", output_folder) \
    .option("checkpointLocation", "/content/checkpoints") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

# Wait for the streaming to terminate
console_query.awaitTermination()
json_query.awaitTermination()

## Summary


1.   Install Package and Library
2.   Spark Session Create and definition JSON file 
3. Set input & output folder
4. Read JSON File
5. Aggregation
6. JSON Output and Stream Termination

