**Use case 4:- Rainfall Prediction for Crop recommendation**

**STEP 1:-  Load & inspect the data**

In [0]:
df = spark.table("workspace.default.crop_recommendation")
df.printSchema()
df.show(5)

root
 |-- N: long (nullable = true)
 |-- P: long (nullable = true)
 |-- K: long (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- rainfall: double (nullable = true)
 |-- label: string (nullable = true)
 |-- soil_moisture: double (nullable = true)
 |-- soil_type: long (nullable = true)
 |-- sunlight_exposure: double (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- co2_concentration: double (nullable = true)
 |-- organic_matter: double (nullable = true)
 |-- irrigation_frequency: long (nullable = true)
 |-- crop_density: double (nullable = true)
 |-- pest_pressure: double (nullable = true)
 |-- fertilizer_usage: double (nullable = true)
 |-- growth_stage: long (nullable = true)
 |-- urban_area_proximity: double (nullable = true)
 |-- water_source_type: long (nullable = true)
 |-- frost_risk: double (nullable = true)
 |-- water_usage_efficiency: double (nullable = true)

+---+---+---+

**STEP 2:- Rename key columns for consistency**

In [0]:
df = (df
    .withColumnRenamed("label", "CROP_LABEL")
    .withColumnRenamed("temperature", "TEMPERATURE")
    .withColumnRenamed("humidity", "HUMIDITY")
    .withColumnRenamed("ph", "PH")
    .withColumnRenamed("rainfall", "RAINFALL")
    .withColumnRenamed("soil_moisture", "SOIL_MOISTURE")
    .withColumnRenamed("soil_type", "SOIL_TYPE")
    .withColumnRenamed("sunlight_exposure", "SUNLIGHT_EXPOSURE")
    .withColumnRenamed("wind_speed", "WIND_SPEED")
    .withColumnRenamed("co2_concentration", "CO2_CONCENTRATION")
    .withColumnRenamed("organic_matter", "ORGANIC_MATTER")
    .withColumnRenamed("irrigation_frequency", "IRRIGATION_FREQUENCY")
    .withColumnRenamed("crop_density", "CROP_DENSITY")
    .withColumnRenamed("pest_pressure", "PEST_PRESSURE")
    .withColumnRenamed("fertilizer_usage", "FERTILIZER_USAGE")
    .withColumnRenamed("growth_stage", "GROWTH_STAGE")
    .withColumnRenamed("urban_area_proximity", "URBAN_AREA_PROXIMITY")
    .withColumnRenamed("water_source_type", "WATER_SOURCE_TYPE")
    .withColumnRenamed("frost_risk", "FROST_RISK")
    .withColumnRenamed("water_usage_efficiency", "WATER_USAGE_EFFICIENCY")
)

print(df.columns)

['N', 'P', 'K', 'TEMPERATURE', 'HUMIDITY', 'PH', 'RAINFALL', 'CROP_LABEL', 'SOIL_MOISTURE', 'SOIL_TYPE', 'SUNLIGHT_EXPOSURE', 'WIND_SPEED', 'CO2_CONCENTRATION', 'ORGANIC_MATTER', 'IRRIGATION_FREQUENCY', 'CROP_DENSITY', 'PEST_PRESSURE', 'FERTILIZER_USAGE', 'GROWTH_STAGE', 'URBAN_AREA_PROXIMITY', 'WATER_SOURCE_TYPE', 'FROST_RISK', 'WATER_USAGE_EFFICIENCY']


**STEP 3:- Clean and filter data**

In [0]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType

df = (df
      .withColumn("N", col("N").cast(DoubleType()))
      .withColumn("P", col("P").cast(DoubleType()))
      .withColumn("K", col("K").cast(DoubleType()))
      .withColumn("TEMPERATURE", col("TEMPERATURE").cast(DoubleType()))
      .withColumn("HUMIDITY", col("HUMIDITY").cast(DoubleType()))
      .withColumn("PH", col("PH").cast(DoubleType()))
      .withColumn("RAINFALL", col("RAINFALL").cast(DoubleType()))
      .withColumn("SOIL_MOISTURE", col("SOIL_MOISTURE").cast(DoubleType()))
      .withColumn("SUNLIGHT_EXPOSURE", col("SUNLIGHT_EXPOSURE").cast(DoubleType()))
      .withColumn("WIND_SPEED", col("WIND_SPEED").cast(DoubleType()))
      .withColumn("CO2_CONCENTRATION", col("CO2_CONCENTRATION").cast(DoubleType()))
      .withColumn("ORGANIC_MATTER", col("ORGANIC_MATTER").cast(DoubleType()))
      .withColumn("CROP_DENSITY", col("CROP_DENSITY").cast(DoubleType()))
      .withColumn("PEST_PRESSURE", col("PEST_PRESSURE").cast(DoubleType()))
      .withColumn("FERTILIZER_USAGE", col("FERTILIZER_USAGE").cast(DoubleType()))
      .withColumn("URBAN_AREA_PROXIMITY", col("URBAN_AREA_PROXIMITY").cast(DoubleType()))
      .withColumn("FROST_RISK", col("FROST_RISK").cast(DoubleType()))
      .withColumn("WATER_USAGE_EFFICIENCY", col("WATER_USAGE_EFFICIENCY").cast(DoubleType()))
      .filter(col("RAINFALL").isNotNull())
      .filter(col("TEMPERATURE") > 0)
)

display(df.limit(5))

N,P,K,TEMPERATURE,HUMIDITY,PH,RAINFALL,CROP_LABEL,SOIL_MOISTURE,SOIL_TYPE,SUNLIGHT_EXPOSURE,WIND_SPEED,CO2_CONCENTRATION,ORGANIC_MATTER,IRRIGATION_FREQUENCY,CROP_DENSITY,PEST_PRESSURE,FERTILIZER_USAGE,GROWTH_STAGE,URBAN_AREA_PROXIMITY,WATER_SOURCE_TYPE,FROST_RISK,WATER_USAGE_EFFICIENCY
90.0,42.0,43.0,20.87974371,82.00274423,6.502985292,202.9355362,rice,29.44606392,2,8.677355267,10.10987524,435.6112257,3.121394502,4,11.7439101,57.60730814,188.1949578,1,2.719614268,3,95.64998537,1.193293298
85.0,58.0,41.0,21.77046169,80.31964408,7.038096361,226.6555374,rice,12.85118264,3,5.754287955,12.0480498,401.4518597,2.142020929,4,16.79710124,74.73687901,70.96362942,1,4.714427327,2,77.26569366,1.752671682
60.0,55.0,44.0,23.00445915,82.3207629,7.840207144,263.9642476,rice,29.36391289,2,9.875230096,9.051348913,357.4179627,1.474973725,1,12.65439458,1.034478009,191.9760773,1,30.43173648,2,18.19216786,3.03554102
74.0,35.0,40.0,26.49109635,80.15836264,6.980400905,242.8640342,rice,26.20773239,3,8.023684684,7.963606057,363.6943055,8.393907172,1,10.86436018,24.09188793,55.76138848,3,10.86107128,3,82.81872017,1.273340646
78.0,42.0,42.0,20.13017482,81.60487287,7.628472891,262.7173405,rice,28.23623614,2,8.120511883,19.26413342,410.3564578,5.202285435,3,13.85291005,38.81148144,185.2597015,2,47.19077675,3,25.46649893,2.578671085


**STEP 4:- Feature engineering**

In [0]:
# Create a binary "high_rainfall" column (1 if rainfall > median)
median_rainfall = df.approxQuantile("RAINFALL", [0.5], 0.01)[0]
df = df.withColumn("HIGH_RAINFALL", when(col("RAINFALL") > median_rainfall, 1).otherwise(0))
display(df.select("CROP_LABEL", "TEMPERATURE", "RAINFALL", "HIGH_RAINFALL").limit(5))

CROP_LABEL,TEMPERATURE,RAINFALL,HIGH_RAINFALL
rice,20.87974371,202.9355362,0
rice,21.77046169,226.6555374,0
rice,23.00445915,263.9642476,1
rice,26.49109635,242.8640342,1
rice,20.13017482,262.7173405,1


**STEP 5:- SQL exploratory analysis**

In [0]:
df.createOrReplaceTempView("crop_data")

# Average environmental conditions per crop
spark.sql("""
  SELECT CROP_LABEL, COUNT(*) AS total_records,
         ROUND(AVG(TEMPERATURE),2) AS avg_temp,
         ROUND(AVG(HUMIDITY),2) AS avg_humidity,
         ROUND(AVG(RAINFALL),2) AS avg_rainfall,
         ROUND(AVG(PH),2) AS avg_ph
  FROM crop_data
  GROUP BY CROP_LABEL
  ORDER BY avg_rainfall DESC
""").show(20)


+----------+-------------+--------+------------+------------+------+
|CROP_LABEL|total_records|avg_temp|avg_humidity|avg_rainfall|avg_ph|
+----------+-------------+--------+------------+------------+------+
|      rice|           49|   23.59|       82.13|      238.94|  6.42|
+----------+-------------+--------+------------+------------+------+



**STEP 6:- Prepare ML features**

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Index categorical variables
crop_indexer = StringIndexer(inputCol="CROP_LABEL", outputCol="CROP_idx", handleInvalid="keep")
soil_indexer = StringIndexer(inputCol="SOIL_TYPE", outputCol="SOIL_TYPE_idx", handleInvalid="keep")
water_indexer = StringIndexer(inputCol="WATER_SOURCE_TYPE", outputCol="WATER_SOURCE_idx", handleInvalid="keep")

# One-hot encode categorical variables
crop_encoder = OneHotEncoder(inputCols=["CROP_idx"], outputCols=["CROP_ohe"])
soil_encoder = OneHotEncoder(inputCols=["SOIL_TYPE_idx"], outputCols=["SOIL_TYPE_ohe"])
water_encoder = OneHotEncoder(inputCols=["WATER_SOURCE_idx"], outputCols=["WATER_SOURCE_ohe"])

# Assemble all features
assembler = VectorAssembler(
    inputCols=["N", "P", "K", "TEMPERATURE", "HUMIDITY", "PH", 
               "SOIL_MOISTURE", "SUNLIGHT_EXPOSURE", "WIND_SPEED", 
               "CO2_CONCENTRATION", "ORGANIC_MATTER", "IRRIGATION_FREQUENCY",
               "CROP_DENSITY", "PEST_PRESSURE", "FERTILIZER_USAGE", 
               "GROWTH_STAGE", "URBAN_AREA_PROXIMITY", "FROST_RISK",
               "WATER_USAGE_EFFICIENCY", "CROP_ohe", "SOIL_TYPE_ohe", "WATER_SOURCE_ohe"],
    outputCol="features"
)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

**STEP 7:- Model training**

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol="scaledFeatures", labelCol="RAINFALL", maxDepth=10, seed=42)
pipeline = Pipeline(stages=[crop_indexer, soil_indexer, water_indexer, crop_encoder, soil_encoder, water_encoder, assembler, scaler, dt])
train, test = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
pred = model.transform(test)

**STEP 8:-Model evaluation**

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate RMSE (Root Mean Squared Error)
rmse_evaluator = RegressionEvaluator(labelCol="RAINFALL", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(pred)
print("RMSE:", round(rmse, 3))

# Evaluate R2 (R-squared)
r2_evaluator = RegressionEvaluator(labelCol="RAINFALL", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(pred)
print("R-squared:", round(r2, 3))

# Evaluate MAE (Mean Absolute Error)
mae_evaluator = RegressionEvaluator(labelCol="RAINFALL", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(pred)
print("MAE:", round(mae, 3))

# Extract Decision Tree Model and Feature Importance
dt_model = model.stages[-1]  # Last stage is the Decision Tree
print("\n=== Decision Tree Metrics ===")
print("Tree Depth:", dt_model.depth)
print("Number of Nodes:", dt_model.numNodes)
print("\nFeature Importances:")
feature_importances = dt_model.featureImportances
print(feature_importances)
pred.createOrReplaceTempView("predictions")
spark.sql("""
SELECT CROP_LABEL, TEMPERATURE, RAINFALL as actual_rainfall, ROUND(prediction,2) as predicted_rainfall,
       ROUND(ABS(RAINFALL - prediction),2) as error
FROM predictions
ORDER BY error DESC
LIMIT 10
""").show()

# Prediction accuracy breakdown
spark.sql("""
SELECT 
  CASE 
    WHEN ABS(RAINFALL - prediction) < 10 THEN 'Excellent (< 10mm error)'
    WHEN ABS(RAINFALL - prediction) < 25 THEN 'Good (10-25mm error)'
    WHEN ABS(RAINFALL - prediction) < 50 THEN 'Fair (25-50mm error)'
    ELSE 'Poor (> 50mm error)'
  END as accuracy_category,
  COUNT(*) as count,
  ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM predictions
GROUP BY accuracy_category
ORDER BY count DESC
""").show()

RMSE: 38.922
R-squared: -0.706
MAE: 29.047

=== Decision Tree Metrics ===
Tree Depth: 9
Number of Nodes: 71

Feature Importances:
(26,[0,1,2,3,5,6,7,8,11,13,14,16,17,23],[0.013454344038295771,5.1913679875187834e-05,0.02429589872929367,0.002314085094554618,0.2059686515965499,0.0006874853212328826,0.17418413849865588,0.004252695702186585,0.2345929995777087,0.0004764747855107204,0.20569915884918322,0.04472955846789457,0.04879080158081457,0.04050179407824376])
+----------+-----------+---------------+------------------+-----+
|CROP_LABEL|TEMPERATURE|actual_rainfall|predicted_rainfall|error|
+----------+-----------+---------------+------------------+-----+
|      rice|25.59704938|     200.834898|            276.66|75.82|
|      rice|22.30157427|    197.9791215|            271.36|73.38|
|      rice|21.94766735|    213.3560921|            276.66| 63.3|
|      rice|23.83067496|    298.5601175|            260.26| 38.3|
|      rice|26.49109635|    242.8640342|            276.66|33.79|
|      rice

**STEP 9:- Save model and results**

In [0]:
pred.select("CROP_LABEL","TEMPERATURE","HUMIDITY","PH","N","P","K","RAINFALL","prediction") \
    .write.mode("overwrite").saveAsTable("crop_rainfall_predictions")

print("✅ Predictions saved as SQL table: crop_rainfall_predictions")


# Query saved results - Overall performance
spark.sql("""
SELECT CROP_LABEL, 
       COUNT(*) as sample_count,
       ROUND(AVG(RAINFALL),2) AS actual_avg_rainfall, 
       ROUND(AVG(prediction),2) AS predicted_avg_rainfall,
       ROUND(AVG(ABS(RAINFALL - prediction)),2) AS avg_error
FROM crop_rainfall_predictions
GROUP BY CROP_LABEL
ORDER BY actual_avg_rainfall DESC
""").show()
# Decision Tree specific insights - Best and Worst predictions by crop
spark.sql("""
SELECT CROP_LABEL,
       ROUND(MIN(ABS(RAINFALL - prediction)),2) as best_prediction_error,
       ROUND(MAX(ABS(RAINFALL - prediction)),2) as worst_prediction_error,
       ROUND(AVG(ABS(RAINFALL - prediction)),2) as avg_error
FROM crop_rainfall_predictions
GROUP BY CROP_LABEL
ORDER BY avg_error ASC
""").show()

✅ Predictions saved as SQL table: crop_rainfall_predictions
+----------+------------+-------------------+----------------------+---------+
|CROP_LABEL|sample_count|actual_avg_rainfall|predicted_avg_rainfall|avg_error|
+----------+------------+-------------------+----------------------+---------+
|      rice|          13|             238.65|                 256.0|    29.05|
+----------+------------+-------------------+----------------------+---------+

+----------+---------------------+----------------------+---------+
|CROP_LABEL|best_prediction_error|worst_prediction_error|avg_error|
+----------+---------------------+----------------------+---------+
|      rice|                 0.97|                 75.82|    29.05|
+----------+---------------------+----------------------+---------+



**STEP 10:- Overall summary table**

In [0]:
overall_summary_df = spark.sql("""
SELECT CROP_LABEL,
       COUNT(*) as sample_count,
       ROUND(AVG(RAINFALL),2) as actual_avg_rainfall,
       ROUND(AVG(prediction),2) as predicted_avg_rainfall,
       ROUND(AVG(ABS(RAINFALL - prediction)),2) as avg_error
FROM crop_rainfall_predictions
GROUP BY CROP_LABEL
ORDER BY actual_avg_rainfall DESC
""")

# Save and display
overall_summary_df.write.mode("overwrite").saveAsTable("dt_overall_performance")
print("✅ Table saved: dt_overall_performance")
display(overall_summary_df)

✅ Table saved: dt_overall_performance


CROP_LABEL,sample_count,actual_avg_rainfall,predicted_avg_rainfall,avg_error
rice,13,238.65,256.0,29.05


Databricks visualization. Run in Databricks to view.