In [0]:
# ==============================================================
# 🌾 AGRICULTURE INDUSTRY ANALYTICS - UNIFIED DATA PIPELINE
# Author: Onteddu Krishna Chaitanya
# Compatible with: Databricks Serverless + Unity Catalog
# ==============================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# --------------------------------------------------------------
# 1️⃣ Spark Session (already active in Databricks)
# --------------------------------------------------------------
spark = SparkSession.builder.appName("Agriculture_Data_Pipeline").getOrCreate()

# --------------------------------------------------------------
# 2️⃣ Data Ingestion (Fixed for Serverless - Read from Table)
# --------------------------------------------------------------
print("🚀 Ingesting Agriculture Dataset from Unity Catalog Table...")
df = spark.table("workspace.default.agriculture")  # your uploaded table name
print("✅ Data Ingested Successfully!")
display(df.limit(5))

# --------------------------------------------------------------
# 3️⃣ Data Cleaning
# --------------------------------------------------------------
print("🧹 Cleaning and Transforming Data...")

df_clean = df.dropna(how="any")
df_clean = df_clean.withColumn("Year", col("Year").cast("int")) \
                   .withColumn("Rainfall_mm", col("Rainfall_mm").cast("float")) \
                   .withColumn("Temperature_C", col("Temperature_C").cast("float")) \
                   .withColumn("Fertilizer_kg_per_acre", col("Fertilizer_kg_per_acre").cast("float")) \
                   .withColumn("Yield_ton_per_acre", col("Yield_ton_per_acre").cast("float"))

print("✅ Data Cleaning Completed Successfully!")
display(df_clean.limit(5))

# --------------------------------------------------------------
# 4️⃣ Use Case 1: Crop Yield Estimation
# --------------------------------------------------------------
print("🚜 Running Crop Yield Estimation...")

assembler = VectorAssembler(
    inputCols=["Rainfall_mm", "Temperature_C", "Fertilizer_kg_per_acre", "Soil_Health_Index"],
    outputCol="features"
)

feature_df = assembler.transform(df_clean.na.drop())

lr = LinearRegression(featuresCol="features", labelCol="Yield_ton_per_acre")
lr_model = lr.fit(feature_df)
predictions = lr_model.transform(feature_df)

yield_output = predictions.select("Crop_Type", "Yield_ton_per_acre", "prediction") \
                          .withColumnRenamed("prediction", "Predicted_Yield")

yield_output.createOrReplaceTempView("agri_crop_yield_predictions")
print("✅ Crop Yield Estimation Completed!")

# --------------------------------------------------------------
# 5️⃣ Use Case 2: Commodity Price Forecasting
# --------------------------------------------------------------
print("📈 Running Commodity Price Forecasting...")

price_forecast = df_clean.groupBy("Crop_Type", "Year") \
    .agg(avg("Average_Price_INR").alias("Avg_Price_INR")) \
    .orderBy("Crop_Type", "Year")

price_forecast.createOrReplaceTempView("agri_commodity_price_forecast")
print("✅ Commodity Price Forecasting Completed!")

# --------------------------------------------------------------
# 6️⃣ Use Case 3: Precision Farming using IoT Sensors
# --------------------------------------------------------------
print("🌱 Running Precision Farming Analytics...")

iot_df = df_clean.withColumn(
    "Sensor_Deviation_Score",
    ((when((col("Sensor_Soil_Moisture_%") < 35) | (col("Sensor_Soil_Moisture_%") > 60), 1).otherwise(0)) +
     (when((col("Sensor_Temperature_C") < 20) | (col("Sensor_Temperature_C") > 35), 1).otherwise(0)) +
     (when(col("NDVI_Index") < 0.6, 1).otherwise(0))) / 3
)

iot_df.createOrReplaceTempView("agri_precision_farming_monitoring")
print("✅ Precision Farming Analytics Completed!")

# --------------------------------------------------------------
# 7️⃣ Use Case 4: Agri-Insurance Claim Fraud Detection
# --------------------------------------------------------------
print("💰 Running Agri-Insurance Fraud Detection...")

fraud_df = df_clean.withColumn("Claim_to_Yield_Ratio",
                               col("Claim_Amount_INR") / (col("Yield_ton_per_acre") * col("Average_Price_INR"))) \
                   .withColumn("Fraud_Flag", when(col("Claim_to_Yield_Ratio") > 0.5, 1).otherwise(0))

fraud_df.createOrReplaceTempView("agri_insurance_fraud_detection")
print("✅ Fraud Detection Completed!")

# --------------------------------------------------------------
# 8️⃣ Use Case 5: Farm-to-Fork Supply Chain Optimization
# --------------------------------------------------------------
print("🚛 Running Supply Chain Optimization...")

supply_df = df_clean.withColumn("Supplier_Reliability", (0.7 + rand() * 0.3)) \
                    .withColumn("Lead_Time_days", (3 + rand() * 10)) \
                    .withColumn("Transport_Cost_per_ton", (2000 + rand() * 1000)) \
                    .withColumn(
                        "Supplier_Score",
                        round(
                            0.6 * col("Supplier_Reliability") +
                            0.2 * (1 - (col("Lead_Time_days") / 15)) +
                            0.2 * (1 - (col("Transport_Cost_per_ton") / 3000)), 3
                        )
                    )

supply_df.createOrReplaceTempView("agri_supply_chain_optimization")
print("✅ Supply Chain Optimization Completed!")

# --------------------------------------------------------------
# 9️⃣ Dashboard Summary Table
# --------------------------------------------------------------
print("📊 Creating Dashboard Summary Table...")

dashboard_summary = yield_output.join(price_forecast, "Crop_Type", "inner") \
    .join(iot_df.select("Crop_Type", "Sensor_Deviation_Score"), "Crop_Type", "inner") \
    .join(fraud_df.select("Crop_Type", "Fraud_Flag"), "Crop_Type", "inner") \
    .join(supply_df.select("Crop_Type", "Supplier_Score"), "Crop_Type", "inner") \
    .select("Crop_Type", "Predicted_Yield", "Avg_Price_INR", 
            "Sensor_Deviation_Score", "Fraud_Flag", "Supplier_Score")

dashboard_summary.createOrReplaceTempView("agri_dashboard_summary")

print("✅ Dashboard Summary Ready! All pipeline stages executed successfully.")
display(dashboard_summary.limit(10))


🚀 Ingesting Agriculture Dataset from Unity Catalog Table...
✅ Data Ingested Successfully!


Farmer_ID,District,Crop_Type,Year,Rainfall_mm,Fertilizer_kg_per_acre,Soil_Health_Index,Yield_ton_per_acre,Average_Price_INR,Temperature_C,Humidity_%,Soil_Moisture_%,Claim_Amount_INR,Fraud_Flag,Transport_Time_days,Storage_Duration_days,Quality_Score,Irrigation_Level_%,Pesticide_Litre_per_acre,Seed_Quality_Index,Sensor_Temperature_C,Sensor_Soil_Moisture_%,NDVI_Index,PH_Value,Expected_Yield_ton_per_acre,Yield_Variance,Market_Demand_Index,Export_Potential_Score,Claim_to_Yield_Ratio,Weather_Anomaly_Flag,Spoilage_Rate_%,Transport_Cost_INR,Revenue_per_Acre,Cost_of_Cultivation_INR,Profit_per_Acre,Sustainability_Score,Humidity_
1,Maharashtra,Sugarcane,2025,774,52,0.7669679903895565,3.678220667479192,2980,31.78964946729382,67,47,35000,0,3,11,89,60,2.25,0.86,32.219128667959794,48.35573407322144,0.56,7.42,2.29,1.01,1.14,1.03,3.53,0,1.44,1500,9900.0,7875,2025.0,0.71,67.56762893013929
2,Andhra Pradesh,Rice,2022,1029,50,0.8346848361668724,3.7569991321722287,1894,29.13046906582009,62,42,30000,0,3,12,90,93,4.26,0.73,28.452991550395875,41.20629481835661,0.85,6.2,2.44,1.06,1.18,0.68,4.29,0,1.51,1500,7000.0,7750,-750.0,0.77,62.716126894699
3,Tamil Nadu,Cotton,2024,1122,61,0.8688268383989872,3.44689646779122,6094,28.96988201453892,65,45,40000,1,5,14,92,72,3.65,0.85,29.073773894709067,44.09525744767596,0.86,6.81,2.84,1.06,1.12,2.12,1.65,0,1.72,2500,24180.0,9000,15180.0,0.81,62.79979093660284
4,Tamil Nadu,Cotton,2024,1052,64,0.852939474570495,3.668672482025307,5992,29.076972923797523,65,45,40000,1,5,14,92,70,2.55,0.79,29.735566265438507,45.29240273619952,0.67,6.6,2.84,1.06,0.89,2.12,1.65,0,1.72,2500,24180.0,9000,15180.0,0.75,67.69492933191937
5,Maharashtra,Sugarcane,2025,900,59,0.7765937924964239,2.822184742030301,2760,31.50913665241849,67,47,35000,0,3,11,89,64,1.79,0.97,31.03317565785571,47.40985162001283,0.79,7.29,2.29,1.01,0.52,1.03,3.53,0,1.44,1500,9900.0,7875,2025.0,0.78,63.87043748557523


🧹 Cleaning and Transforming Data...
✅ Data Cleaning Completed Successfully!


Farmer_ID,District,Crop_Type,Year,Rainfall_mm,Fertilizer_kg_per_acre,Soil_Health_Index,Yield_ton_per_acre,Average_Price_INR,Temperature_C,Humidity_%,Soil_Moisture_%,Claim_Amount_INR,Fraud_Flag,Transport_Time_days,Storage_Duration_days,Quality_Score,Irrigation_Level_%,Pesticide_Litre_per_acre,Seed_Quality_Index,Sensor_Temperature_C,Sensor_Soil_Moisture_%,NDVI_Index,PH_Value,Expected_Yield_ton_per_acre,Yield_Variance,Market_Demand_Index,Export_Potential_Score,Claim_to_Yield_Ratio,Weather_Anomaly_Flag,Spoilage_Rate_%,Transport_Cost_INR,Revenue_per_Acre,Cost_of_Cultivation_INR,Profit_per_Acre,Sustainability_Score,Humidity_
1,Maharashtra,Sugarcane,2025,774.0,52.0,0.7669679903895565,3.6782207,2980,31.78965,67,47,35000,0,3,11,89,60,2.25,0.86,32.219128667959794,48.35573407322144,0.56,7.42,2.29,1.01,1.14,1.03,3.53,0,1.44,1500,9900.0,7875,2025.0,0.71,67.56762893013929
2,Andhra Pradesh,Rice,2022,1029.0,50.0,0.8346848361668724,3.756999,1894,29.130468,62,42,30000,0,3,12,90,93,4.26,0.73,28.452991550395875,41.20629481835661,0.85,6.2,2.44,1.06,1.18,0.68,4.29,0,1.51,1500,7000.0,7750,-750.0,0.77,62.716126894699
3,Tamil Nadu,Cotton,2024,1122.0,61.0,0.8688268383989872,3.4468966,6094,28.969883,65,45,40000,1,5,14,92,72,3.65,0.85,29.073773894709067,44.09525744767596,0.86,6.81,2.84,1.06,1.12,2.12,1.65,0,1.72,2500,24180.0,9000,15180.0,0.81,62.79979093660284
4,Tamil Nadu,Cotton,2024,1052.0,64.0,0.852939474570495,3.6686726,5992,29.076973,65,45,40000,1,5,14,92,70,2.55,0.79,29.735566265438507,45.29240273619952,0.67,6.6,2.84,1.06,0.89,2.12,1.65,0,1.72,2500,24180.0,9000,15180.0,0.75,67.69492933191937
5,Maharashtra,Sugarcane,2025,900.0,59.0,0.7765937924964239,2.8221848,2760,31.509136,67,47,35000,0,3,11,89,64,1.79,0.97,31.03317565785571,47.40985162001283,0.79,7.29,2.29,1.01,0.52,1.03,3.53,0,1.44,1500,9900.0,7875,2025.0,0.78,63.87043748557523


🚜 Running Crop Yield Estimation...
✅ Crop Yield Estimation Completed!
📈 Running Commodity Price Forecasting...
✅ Commodity Price Forecasting Completed!
🌱 Running Precision Farming Analytics...
✅ Precision Farming Analytics Completed!
💰 Running Agri-Insurance Fraud Detection...
✅ Fraud Detection Completed!
🚛 Running Supply Chain Optimization...
✅ Supply Chain Optimization Completed!
📊 Creating Dashboard Summary Table...
✅ Dashboard Summary Ready! All pipeline stages executed successfully.


Crop_Type,Predicted_Yield,Avg_Price_INR,Sensor_Deviation_Score,Fraud_Flag,Supplier_Score
Sugarcane,3.1728518399432475,3003.65,0.3333333333333333,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.3333333333333333,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
Sugarcane,3.1728518399432475,3003.65,0.0,1,0.72
