<a href="https://colab.research.google.com/github/omisonawane619os/Smart-Farming-Crop-Yield-Prediction-Spark/blob/main/Smart_Farming_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==========================================
# PART 1: INIT & LOAD
# ==========================================
import findspark
findspark.init()

from pyspark.sql import SparkSession

# 1. Initialize Spark Session
# We use '*' to use all available cores to speed it up
spark = SparkSession.builder \
    .appName("SmartFarming_Fix") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("1. Spark Session Created Successfully.")

# 2. Load Data
# Make sure the filename matches EXACTLY what is in your folder
file_path = "Smart_Farming_Crop_Yield_2024.csv"

try:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    print(f"2. Data Loaded. Total Rows: {df.count()}")

    # 3. Clean Data (Drop duplicates and nulls)
    df_clean = df.dropDuplicates().dropna()
    print(f"3. Data Cleaned. Rows remaining: {df_clean.count()}")

    # PRINT COLUMNS (Crucial for debugging)
    print("\n--- YOUR COLUMNS ---")
    print(df_clean.columns)

except Exception as e:
    print(f"ERROR: Could not load file. Make sure '{file_path}' is in the folder.")
    print(e)


1. Spark Session Created Successfully.
2. Data Loaded. Total Rows: 500
3. Data Cleaned. Rows remaining: 500

--- YOUR COLUMNS ---
['farm_id', 'region', 'crop_type', 'soil_moisture_%', 'soil_pH', 'temperature_C', 'rainfall_mm', 'humidity_%', 'sunlight_hours', 'irrigation_type', 'fertilizer_type', 'pesticide_usage_ml', 'sowing_date', 'harvest_date', 'total_days', 'yield_kg_per_hectare', 'sensor_id', 'timestamp', 'latitude', 'longitude', 'NDVI_index', 'crop_disease_status']


In [None]:
# ==========================================
# PART 2: PIPELINE & PROCESSING (FIXED)
# ==========================================
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
import sys

# 1. COLUMN DETECTIVE
# Let's see what columns are ACTUALLY in your dataframe
print("--- YOUR DATA COLUMNS ---")
actual_columns = df_clean.columns
print(actual_columns)

# We define lists of "Possible Names" to catch variations (spaces vs underscores)
possible_cats = [
    'Region', 'region',
    'Crop_Type', 'Crop Type', 'crop_type',
    'Irrigation_Type', 'Irrigation Type', 'irrigation_type',
    'Soil_Type', 'Soil Type', 'soil_type'
]

possible_nums = [
    'Soil_Moisture', 'Soil Moisture', 'soil_moisture',
    'Temperature_C', 'Temperature', 'temperature',
    'Rainfall_mm', 'Rainfall', 'rainfall',
    'Humidity', 'humidity',
    'Fertilizer_Used_kg_per_ha', 'Fertilizer', 'fertilizer'
]

# Filter: We only keep the ones that actually exist in your CSV
cat_cols = [c for c in possible_cats if c in actual_columns]
num_cols = [c for c in possible_nums if c in actual_columns]

print(f"\nSUCCESS MATCHED Categorical: {cat_cols}")
print(f"SUCCESS MATCHED Numerical: {num_cols}")

# SAFETY CHECK: If no columns were found, stop here so we don't crash Spark
if len(cat_cols) == 0 and len(num_cols) == 0:
    print("\nCRITICAL ERROR: No matching columns found!")
    print("Please manually update the 'possible_cats' and 'possible_nums' lists above to match your printed column names.")
    raise ValueError("Stopping execution: No features found.")

# 2. Build Pipeline Stages
stages = []

# Process Text Columns
for c in cat_cols:
    # Index: Text -> Number
    s_indexer = StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="skip")
    # Encode: Number -> Vector
    o_encoder = OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_vec"])
    stages += [s_indexer, o_encoder]

# Combine all features into one vector
assembler_inputs = [f"{c}_vec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="raw_features")
stages += [assembler]

# Scale features (Standardize numbers)
# Note: StandardScaler requires a vector. If we have features, this will now work.
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withStd=True, withMean=True)
stages += [scaler]

# 3. Run Pipeline
print("\nRunning Pipeline... (This may take a moment)")
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_clean)
final_df = model.transform(df_clean)

# Split Data for Training
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)

print("Pipeline Complete. Data is ready for ML.")

--- YOUR DATA COLUMNS ---
['farm_id', 'region', 'crop_type', 'soil_moisture_%', 'soil_pH', 'temperature_C', 'rainfall_mm', 'humidity_%', 'sunlight_hours', 'irrigation_type', 'fertilizer_type', 'pesticide_usage_ml', 'sowing_date', 'harvest_date', 'total_days', 'yield_kg_per_hectare', 'sensor_id', 'timestamp', 'latitude', 'longitude', 'NDVI_index', 'crop_disease_status']

SUCCESS MATCHED Categorical: ['region', 'crop_type', 'irrigation_type']
SUCCESS MATCHED Numerical: []

Running Pipeline... (This may take a moment)
Pipeline Complete. Data is ready for ML.


In [None]:
# ==========================================
# PART 3: REGRESSION MODELS (FIXED)
# ==========================================
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

print("Starting Model Training...")

# CORRECT COLUMN NAME: lowercase 'y' based on your error log
target_col = "yield_kg_per_hectare"

# Define Evaluator (R2 Score)
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="r2")

# 1. Linear Regression
print("Training Linear Regression...")
lr = LinearRegression(featuresCol="features", labelCol=target_col)
lr_model = lr.fit(train_data)
lr_preds = lr_model.transform(test_data)
print(f"Linear Regression R2: {evaluator.evaluate(lr_preds):.4f}")

# 2. Random Forest (Usually better)
print("Training Random Forest...")
rf = RandomForestRegressor(featuresCol="features", labelCol=target_col, numTrees=20)
rf_model = rf.fit(train_data)
rf_preds = rf_model.transform(test_data)
print(f"Random Forest R2: {evaluator.evaluate(rf_preds):.4f}")

# 3. Gradient Boosted Trees (GBT)
print("Training GBT...")
gbt = GBTRegressor(featuresCol="features", labelCol=target_col, maxIter=10)
gbt_model = gbt.fit(train_data)
gbt_preds = gbt_model.transform(test_data)
print(f"GBT R2: {evaluator.evaluate(gbt_preds):.4f}")

Starting Model Training...
Training Linear Regression...
Linear Regression R2: -0.0189
Training Random Forest...
Random Forest R2: -0.0157
Training GBT...
GBT R2: -0.1223


In [None]:
# ==========================================
# PART 4: EXPORT DATA (FIXED)
# ==========================================
import pandas as pd
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

print("Exporting data for Tableau...")

# 1. Export Regression Results (Using GBT predictions)
# We convert to Pandas to save as a simple CSV
# NOTE: We use the lowercase 'yield_kg_per_hectare' here too
print("Processing Regression Export...")
reg_df = gbt_preds.select("region", "crop_type", "yield_kg_per_hectare", "prediction").toPandas()
reg_df["Residual"] = reg_df["yield_kg_per_hectare"] - reg_df["prediction"]
reg_df.to_csv("Tableau_Regression_Results.csv", index=False)
print("-> Saved: Tableau_Regression_Results.csv")

# 2. Quick Clustering for Visualization
print("Running Clustering...")
# Re-define numerical columns based on your error log (using lowercase or correct names)
# I am grabbing the ones visible in your error log:
cluster_cols = ['soil_moisture_%', 'temperature_C', 'rainfall_mm', 'humidity_%']
# Ensure these exist in the dataframe before using
valid_cluster_cols = [c for c in cluster_cols if c in df_clean.columns]

if valid_cluster_cols:
    clust_assembler = VectorAssembler(inputCols=valid_cluster_cols, outputCol="cluster_features")
    clust_data = clust_assembler.transform(df_clean)

    kmeans = KMeans(featuresCol="cluster_features", k=3, seed=1)
    km_model = kmeans.fit(clust_data)
    clust_res = km_model.transform(clust_data)

    # Export Clustering
    clust_pdf = clust_res.select("region", "crop_type", "prediction", *valid_cluster_cols).toPandas()
    clust_pdf.to_csv("Tableau_Cluster_Results.csv", index=False)
    print("-> Saved: Tableau_Cluster_Results.csv")
else:
    print("Skipping clustering export: Could not find matching sensor columns.")

print("\nALL DONE! Check your folder for the CSV files.")

Exporting data for Tableau...
Processing Regression Export...
-> Saved: Tableau_Regression_Results.csv
Running Clustering...
-> Saved: Tableau_Cluster_Results.csv

ALL DONE! Check your folder for the CSV files.
