In [1]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m420.2 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession

try:
    spark = SparkSession.getActiveSession()
    if spark:
        spark.stop()
        print("Stopped existing Spark session")
except:
    pass

spark = SparkSession.builder \
    .appName("KafkaToHDFS_And_PostgreSQL_SmartFarming") \
    .config("spark.sql.streaming.kafka.useDeprecatedOffsetFetching", "false") \
    .getOrCreate()

print("✓ New Spark session created")
print(f"Spark version: {spark.version}")

✓ New Spark session created
Spark version: 3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType
import psycopg2
import os
import shutil

# Stop all active streaming queries first
spark = SparkSession.getActiveSession()
if spark:
    for query in spark.streams.active:
        print(f"Stopping query: {query.name}")
        query.stop()
    print("All queries stopped")

# Clear checkpoints
checkpoint_paths = [
    "/tmp/checkpoints/kafka_to_hdfs_smartfarming",
    "/tmp/checkpoints/postgres_checkpoint",
    "/tmp/checkpoints/hdfs_checkpoint"
]

for path in checkpoint_paths:
    if os.path.exists(path):
        shutil.rmtree(path)
        print(f"Cleared checkpoint: {path}")

# Schema
sensor_schema = StructType() \
    .add("sensor_id", StringType()) \
    .add("timestamp", StringType()) \
    .add("soil_moisture", DoubleType()) \
    .add("soil_pH", DoubleType()) \
    .add("temperature", DoubleType()) \
    .add("rainfall", DoubleType()) \
    .add("humidity", DoubleType()) \
    .add("sunlight_intensity", DoubleType()) \
    .add("pesticide_usage_ml", DoubleType()) \
    .add("farm_id", StringType()) \
    .add("region", StringType()) \
    .add("crop_type", StringType())

spark = SparkSession.builder \
    .appName("KafkaToHDFS_And_PostgreSQL_SmartFarming") \
    .config("spark.sql.streaming.kafka.useDeprecatedOffsetFetching", "false") \
    .getOrCreate()

topic_name = "smart_farming_data"

kafka_bootstrap = "broker:29092"

print(f"Attempting to connect to Kafka at: {kafka_bootstrap}")

# Read from Kafka
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .option("kafka.session.timeout.ms", "30000") \
    .option("kafka.request.timeout.ms", "40000") \
    .option("kafka.default.api.timeout.ms", "60000") \
    .option("maxOffsetsPerTrigger", "1000") \
    .load()

print("Successfully connected to Kafka!")

# Parse JSON
df_parsed = df_raw.selectExpr("CAST(value AS STRING) AS json_str") \
    .withColumn("data", from_json(col("json_str"), sensor_schema)) \
    .select("data.*")

# PostgreSQL writer function
def write_to_postgres(batch_df, epoch_id):
    if batch_df.isEmpty():
        print(f"Batch {epoch_id} is empty")
        return
    
    try:
        conn = psycopg2.connect(
            dbname="smart_farming",
            user="admin", 
            password="password", 
            host="postgres"
        )
        cur = conn.cursor()
        
        rows_written = 0
        for row in batch_df.collect():
            cur.execute("""
                INSERT INTO public.sensor_data (
                    sensor_id, timestamp, soil_moisture, soil_ph, 
                    temperature, rainfall, humidity, sunlight_intensity, 
                    pesticide_usage_ml, farm_id, region, crop_type
                )
                VALUES (%s::uuid, %s::timestamp, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (sensor_id) DO NOTHING;
            """, (
                row.sensor_id, 
                row.timestamp, 
                row.soil_moisture, 
                row.soil_pH,
                row.temperature, 
                row.rainfall, 
                row.humidity, 
                row.sunlight_intensity,
                row.pesticide_usage_ml,
                row.farm_id,
                row.region,
                row.crop_type
            ))
            rows_written += 1
        
        conn.commit()
        cur.close()
        conn.close()
        print(f"Batch {epoch_id}: {rows_written} records to PostgreSQL")
    except Exception as e:
        print(f"Error batch {epoch_id}: {str(e)}")

# Start streaming to PostgreSQL
postgres_query = df_parsed.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/postgres_checkpoint") \
    .trigger(processingTime='10 seconds') \
    .start()

# Start streaming to HDFS
hdfs_output_path = "hdfs://namenode:9000/user/smart_farming_data"

hdfs_query = df_parsed.writeStream \
    .format("parquet") \
    .option("path", hdfs_output_path) \
    .option("checkpointLocation", "/tmp/checkpoints/hdfs_checkpoint") \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .start()

print("Streaming started:")
print(f"  PostgreSQL: smart_farming.sensor_data")
print(f"  HDFS: {hdfs_output_path}")

# Wait for termination
spark.streams.awaitAnyTermination()

All queries stopped
Attempting to connect to Kafka at: broker:29092
Successfully connected to Kafka!
Streaming started:
  PostgreSQL: smart_farming.sensor_data
  HDFS: hdfs://namenode:9000/user/smart_farming_data
Batch 0: 438 records to PostgreSQL
Batch 1: 7 records to PostgreSQL
Batch 2: 4 records to PostgreSQL
Batch 3: 10 records to PostgreSQL
Batch 4: 10 records to PostgreSQL
Batch 5: 10 records to PostgreSQL
Batch 6: 10 records to PostgreSQL
Batch 7: 10 records to PostgreSQL
Batch 8: 10 records to PostgreSQL
Batch 9: 10 records to PostgreSQL
Batch 10: 10 records to PostgreSQL
Batch 11: 10 records to PostgreSQL
Batch 12: 10 records to PostgreSQL
Batch 13: 9 records to PostgreSQL
Batch 14: 10 records to PostgreSQL
Batch 15: 10 records to PostgreSQL
Batch 16: 10 records to PostgreSQL
Batch 17: 10 records to PostgreSQL
Batch 18: 10 records to PostgreSQL
Batch 19: 10 records to PostgreSQL
Batch 20: 10 records to PostgreSQL
Batch 21: 10 records to PostgreSQL
Batch 22: 10 records to Postg

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
