# Big Data Project - PySpark Read
### Authors: 
Nadav Oren 316084599

Yarden Mizrahi 209521293

Michaella Ichak 209085422

## Imports

In [1]:
from confluent_kafka import  Consumer

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, when, col
from pyspark.sql.types import StructType, StructField,StringType, DoubleType, IntegerType, ArrayType


## Const Values

In [2]:
TOPIC = 'crop_data'
LABEL_INDEX_DATAFRAME_PATH = "label_index.csv"
PROCESSED_CROP_DATA_PATH = "processed_crop_data.csv"

## Set Consumer - PySpark stream

In [3]:

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CropOptimalConditionStreaming") \
    .getOrCreate()



### Define Kafka source
# Define Schema
cropSchema = StructType([
    StructField("N",IntegerType(), True),
    StructField("P",IntegerType(),True),
    StructField("K",IntegerType(),True),
    StructField("temperature",DoubleType(),True),
    StructField("humidity",DoubleType(),True),
    StructField("ph",DoubleType(),True),
    StructField("rainfall",DoubleType(),True),
    StructField("label",StringType(),True)
])

cropSchema_array = ArrayType(cropSchema)
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", TOPIC) \
    .load()

kafka_df_string = kafka_df.selectExpr("CAST(value AS STRING)")

# Assuming the value is in JSON format, convert it to a string
# Apply the schema to parse the JSON data
json_df = kafka_df_string.withColumn("jsonData", from_json(col("value"), cropSchema_array))

processed_df = json_df.selectExpr("explode(jsonData) as jsonData")

# Select the parsed data into a structured DataFrame
parsed_df = processed_df.select("jsonData.*")

### parsed_df = main data recievied ready


24/08/27 18:02:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Read Data and Preprocessing
note: trying the pyspark split instead of scklit learn split

In [None]:
import os

# Step 1: Load the existing label-to-index mapping from a CSV file
if not os.path.exists(LABEL_INDEX_DATAFRAME_PATH):
    label_index_df = spark.createDataFrame(
    [("rice",0)], schema=["label","index"])
else:
    label_index_df = spark.read.csv(LABEL_INDEX_DATAFRAME_PATH, header=True, inferSchema=True)
label_to_index = {row['label']: row['index'] for row in label_index_df.collect()}
next_index = max(label_to_index.values()) + 1 if label_to_index else 0

# Function to process each batch
def process_batch(batch_df, batch_id):
    global label_to_index, next_index

    # Step 2: Extract distinct labels from the current batch
    new_labels = batch_df.select("label").distinct().rdd.flatMap(lambda x: x).collect()

    # Update the label-to-index mapping
    for label in new_labels:
        if label not in label_to_index:
            label_to_index[label] = next_index
            next_index += 1

    # Convert the updated mapping back to a DataFrame
    new_label_index_df = spark.createDataFrame(
        [(label, index) for label, index in label_to_index.items()],
        schema=["label", "index"]
    )

    # Step 3: Manually transform the labels into indices
    mapping_expr = when(col("label").isNull(), -1)
    for label, index in label_to_index.items():
        mapping_expr = mapping_expr.when(col("label") == label, index)
    
    indexed_df = batch_df.withColumn("label_index", mapping_expr)

    # Step 4: Save the processed data to a CSV file
    indexed_df.write.mode("append").option("header","true").csv(PROCESSED_CROP_DATA_PATH)

    # Step 5: Update the label-to-index CSV file
    new_label_index_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(LABEL_INDEX_DATAFRAME_PATH)
process_data_query = parsed_df.writeStream \
    .foreachBatch(process_batch) \
    .start()

process_data_query.awaitTermination()

24/08/27 18:10:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4a807793-5af0-4ed8-90e9-ca16d68d6093. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/08/27 18:10:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/08/27 18:10:08 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/08/27 18:10:08 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/08/27 18:10:08 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/08/27 18:10:08 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con

In [5]:
process_data_query.stop()