In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression


In [0]:
spark = SparkSession.builder \
    .appName("crop price Prediction") \
    .getOrCreate()


In [0]:
# Replace with your dataset path
data_path = "/Volumes/workspace/default/market_data/data.csv"
df = spark.read.option("header", True).option("inferSchema", False).csv(data_path)

# Check schema
df.printSchema()
df.show(5)


root
 |-- event_id: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- city: string (nullable = true)
 |-- crop: string (nullable = true)
 |-- market: string (nullable = true)
 |-- soil_moisture: string (nullable = true)
 |-- soil_ph: string (nullable = true)
 |-- rain_mm: string (nullable = true)
 |-- temp_c: string (nullable = true)
 |-- yield_kg: string (nullable = true)
 |-- pest_flag: string (nullable = true)
 |-- key: string (nullable = true)

+--------------------+--------------------+---------+---------+------------+------------------+------------------+------------------+------+--------+---------+--------------------+
|            event_id|          event_time|     city|     crop|      market|     soil_moisture|           soil_ph|           rain_mm|temp_c|yield_kg|pest_flag|                 key|
+--------------------+--------------------+---------+---------+------------+------------------+------------------+------------------+------+--------+---------+----

In [0]:
# Numeric columns (features + label)
numeric_cols = ['temp_c', 'soil_ph', 'rain_mm', 'yield_kg']

# Categorical columns
categorical_cols = ['city', 'market', 'crop']  # update as per your dataset


In [0]:
# Only keep numeric values, drop malformed strings like 'False', 'Rice', etc.
for c in numeric_cols:
    df = df.withColumn(
        c,
        when(col(c).rlike("^[+-]?([0-9]*[.])?[0-9]+$"), col(c).cast("double")).otherwise(None)
    )

# Drop rows with NULLs in any numeric column
df = df.dropna(subset=numeric_cols)

df.show(5)


+--------------------+--------------------+---------+---------+------------+------------------+------------------+------------------+------+--------+---------+--------------------+
|            event_id|          event_time|     city|     crop|      market|     soil_moisture|           soil_ph|           rain_mm|temp_c|yield_kg|pest_flag|                 key|
+--------------------+--------------------+---------+---------+------------+------------------+------------------+------------------+------+--------+---------+--------------------+
|358902c2-9064-49d...|2025-01-25T04:12:...|Bengaluru|     Rice|Yeshwanthpur| 18.35185578515821|7.1423795106084516|18.206146588044817|  22.0|   833.0|    False|Bengaluru|Rice|Ye...|
|f62f6c3a-f04e-4d5...|2025-01-26T13:11:...|Bengaluru|    Wheat|   APMC_Yard| 27.59256148616627| 6.506742918454209| 44.68756167043308|  25.0|  4960.0|    False|Bengaluru|Wheat|A...|
|c11e0dfd-e555-44a...|2025-01-03T02:53:...|    Delhi|    Wheat|       Okhla|7.9316311249474625|

In [0]:
indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c+"_index", outputCol=c+"_vec") for c in categorical_cols]

# Assemble feature columns
feature_cols = [c for c in numeric_cols if c != 'yield_kg'] + [c+"_vec" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Build pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit and transform dataset
df = pipeline.fit(df).transform(df)
df.select('features', 'yield_kg').show(5)


+--------------------+--------+
|            features|yield_kg|
+--------------------+--------+
|(24,[0,1,2,3,8,21...|   833.0|
|(24,[0,1,2,3,7,22...|  4960.0|
|(24,[0,1,2,6,13,2...|  7979.0|
|(24,[0,1,2,4,12,2...|  3206.0|
|(24,[0,1,2,4,12,2...|  5210.0|
+--------------------+--------+
only showing top 5 rows


In [0]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


In [0]:
lr = LinearRegression(featuresCol='features', labelCol='yield_kg')
model = lr.fit(train_data)


In [0]:
predictions = model.transform(test_data)
predictions.select("yield_kg", "prediction").show(5)

# Optional: Model metrics
trainingSummary = model.summary
print("RMSE:", trainingSummary.rootMeanSquaredError)
print("R2:", trainingSummary.r2)


+--------+------------------+
|yield_kg|        prediction|
+--------+------------------+
|  5420.0|3905.2220403208566|
|  3336.0| 3914.865871230568|
|  2112.0| 3913.567709652408|
|   303.0|3928.1131542602056|
|  3379.0|3902.6100208247644|
+--------+------------------+
only showing top 5 rows
RMSE: 2283.5124100625417
R2: 3.512244040504431e-05


In [0]:
# ------------------------------------------------------------
# 🌾 Agricultural Yield Prediction - Clean & Train Version
# ------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, regexp_replace, when, isnan
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# ------------------------------------------------------------
# 1️⃣  Create Spark session
# ------------------------------------------------------------
spark = SparkSession.builder.appName("CropYieldPredictor").getOrCreate()

# ------------------------------------------------------------
# 2️⃣  Load data safely
# ------------------------------------------------------------
path = "/Volumes/workspace/default/market_data/data.csv"  # update if needed
df = spark.read.option("header", True).csv(path)
print("✅ Data loaded successfully")
print("Available columns:", df.columns)

# ------------------------------------------------------------
# 3️⃣  Clean up numeric columns (remove invalid strings like 'False')
# ------------------------------------------------------------
numeric_cols = ["soil_moisture", "soil_ph", "rain_mm", "temp_c", "yield_kg"]

for c in numeric_cols:
    # Replace non-numeric values (like 'False', 'NA', '') with NULL
    df = df.withColumn(
        c,
        when(
            col(c).rlike("^[0-9.]+$"),  # keep only valid numbers
            col(c)
        ).otherwise(None)
    )
    # Cast to double
    df = df.withColumn(c, col(c).cast(DoubleType()))

print("✅ Cleaned data (numeric columns casted properly)")
df.select("rain_mm", "temp_c", "yield_kg").show(5)

# ------------------------------------------------------------
# 4️⃣  Drop rows with nulls in important numeric fields
# ------------------------------------------------------------
df = df.na.drop(subset=numeric_cols)

# ------------------------------------------------------------
# 5️⃣  Feature engineering
# ------------------------------------------------------------
feature_cols = ["soil_moisture", "soil_ph", "rain_mm", "temp_c"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# ------------------------------------------------------------
# 6️⃣  Split train/test
# ------------------------------------------------------------
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# ------------------------------------------------------------
# 7️⃣  Train model
# ------------------------------------------------------------
lr = LinearRegression(
    featuresCol="features",
    labelCol="yield_kg",
    maxIter=10,
    regParam=0.3,
    elasticNetParam=0.5
)

model = lr.fit(train_data)
print("✅ Model trained successfully")

# ------------------------------------------------------------
# 8️⃣  Evaluate model
# ------------------------------------------------------------
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="yield_kg", predictionCol="prediction")

r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

print(f"📈 R² Score: {r2:.4f}")
print(f"📉 RMSE: {rmse:.4f}")

# ------------------------------------------------------------
# 9️⃣  Save model (optional)
# ------------------------------------------------------------
model.save("/Volumes/workspace/default/market_data/agri_yield_lr_model")
print(" Model saved successfully at /Volumes/workspace/default/market_data/agri_yield_lr_model")


✅ Data loaded successfully
Available columns: ['event_id', 'event_time', 'city', 'crop', 'market', 'soil_moisture', 'soil_ph', 'rain_mm', 'temp_c', 'yield_kg', 'pest_flag', 'key']
✅ Cleaned data (numeric columns casted properly)
+------------------+------+--------+
|           rain_mm|temp_c|yield_kg|
+------------------+------+--------+
|18.206146588044817|  22.0|   833.0|
| 44.68756167043308|  25.0|  4960.0|
| 1.845755525791223|  12.0|  7979.0|
| 32.07795372309581|  42.0|  3206.0|
| 10.86139235717659|  33.0|  5210.0|
+------------------+------+--------+
only showing top 5 rows
✅ Model trained successfully
📈 R² Score: -0.0001
📉 RMSE: 2287.8152
 Model saved successfully at /Volumes/workspace/default/market_data/agri_yield_lr_model


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder.getOrCreate()

# Define schema
schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_time", StringType(), True),
    StructField("city", StringType(), True),
    StructField("crop", StringType(), True),
    StructField("market", StringType(), True),
    StructField("soil_moisture", StringType(), True),
    StructField("soil_ph", StringType(), True),
    StructField("rain_mm", StringType(), True),
    StructField("temp_c", StringType(), True),
    StructField("yield_kg", StringType(), True),
    StructField("pest_flag", StringType(), True),
    StructField("key", StringType(), True)
])

# Sample data
data = [
    ("E001", "2025-10-16 10:00", "CityA", "Wheat", "Market1", "0.3", "6.5", "25.0", "30.0", "5000", "0", "K001"),
    ("E002", "2025-10-16 11:00", "CityB", "Rice", "Market2", "0.4", "6.8", "40.0", "32.0", "6000", "1", "K002")
]

df = spark.createDataFrame(data, schema=schema)

# Save to your volume
df.write.csv("/Volumes/workspace/default/market_data/new_product.csv", header=True, mode="overwrite")

print("✅ new_product.csv created successfully")


✅ new_product.csv created successfully


In [0]:
# ------------------------------------------------------------
# 1️⃣ Import required libraries
# ------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegressionModel

# ------------------------------------------------------------
# 2️⃣ Initialize Spark session (if not already)
# ------------------------------------------------------------
spark = SparkSession.builder.getOrCreate()

# ------------------------------------------------------------
# 3️⃣ Load new data for prediction
# ------------------------------------------------------------
new_data_path = "/Volumes/workspace/default/market_data/new_product.csv"

new_data = spark.read.option("header", True).option("inferSchema", True).csv(new_data_path)
print("✅ New data loaded successfully")
print("Columns:", new_data.columns)
new_data.show(5)

# ------------------------------------------------------------
# 4️⃣ Convert columns to float/double if needed
# ------------------------------------------------------------
numeric_cols = ["soil_moisture", "soil_ph", "rain_mm", "temp_c"]

for col in numeric_cols:
    new_data = new_data.withColumn(col, new_data[col].cast("double"))

# ------------------------------------------------------------
# 5️⃣ Assemble features
# ------------------------------------------------------------
feature_cols = ["soil_moisture", "soil_ph", "rain_mm", "temp_c"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
new_data_prepared = assembler.transform(new_data)

# ------------------------------------------------------------
# 6️⃣ Load the trained Linear Regression model
# ------------------------------------------------------------
lr_model_path = "/Volumes/workspace/default/market_data/agri_yield_lr_model"
lr_model = LinearRegressionModel.load(lr_model_path)

# ------------------------------------------------------------
# 7️⃣ Make predictions
# ------------------------------------------------------------
predictions = lr_model.transform(new_data_prepared)

# ------------------------------------------------------------
# 8️⃣ Show predictions
# ------------------------------------------------------------
predictions.select("event_id", "crop", "market", "prediction").show()

# ------------------------------------------------------------
# 9️⃣ Optionally save predictions
# ------------------------------------------------------------
predicted_output_path = "/Volumes/workspace/default/market_data/predicted_trends.csv"
predictions.select("event_id", "crop", "market", "prediction")\
    .write.mode("overwrite").option("header", True).csv(predicted_output_path)

print(f"✅ Predictions saved at {predicted_output_path}")




✅ New data loaded successfully
Columns: ['event_id', 'event_time', 'city', 'crop', 'market', 'soil_moisture', 'soil_ph', 'rain_mm', 'temp_c', 'yield_kg', 'pest_flag', 'key']
+--------+-------------------+-----+-----+-------+-------------+-------+-------+------+--------+---------+----+
|event_id|         event_time| city| crop| market|soil_moisture|soil_ph|rain_mm|temp_c|yield_kg|pest_flag| key|
+--------+-------------------+-----+-----+-------+-------------+-------+-------+------+--------+---------+----+
|    E001|2025-10-16 10:00:00|CityA|Wheat|Market1|          0.3|    6.5|   25.0|  30.0|    5000|        0|K001|
|    E002|2025-10-16 11:00:00|CityB| Rice|Market2|          0.4|    6.8|   40.0|  32.0|    6000|        1|K002|
+--------+-------------------+-----+-----+-------+-------------+-------+-------+------+--------+---------+----+

+--------+-----+-------+------------------+
|event_id| crop| market|        prediction|
+--------+-----+-------+------------------+
|    E001|Wheat|Marke

In [0]:
# Load the predictions CSV
pred_df = spark.read.option("header", True).csv("/Volumes/workspace/default/market_data/predicted_trends.csv")

# Show the top rows
pred_df.show()



+--------+-----+-------+------------------+
|event_id| crop| market|        prediction|
+--------+-----+-------+------------------+
|    E001|Wheat|Market1| 3909.222742136389|
|    E002| Rice|Market2|3912.3059472572713|
+--------+-----+-------+------------------+

