In [2]:
import pyspark

In [34]:
# Import SparkSession to create Spark app and access DataFrame APIs

from pyspark.sql import SparkSession

In [35]:
# Create a Spark session with the name 'spark-pract'

spark = SparkSession.builder.appName('spark-pract').getOrCreate()

In [5]:
df_spark = spark.read.csv('food_consumption.csv',header=True,inferSchema=True)

In [6]:
df_spark.show()

+---------+--------------------+-----------+-------------+
|  country|       food_category|consumption|co2_emmission|
+---------+--------------------+-----------+-------------+
|Argentina|                Pork|      10.51|         37.2|
|Argentina|             Poultry|      38.66|        41.53|
|Argentina|                Beef|      55.48|       1712.0|
|Argentina|         Lamb & Goat|       1.56|        54.63|
|Argentina|                Fish|       4.36|         6.96|
|Argentina|                Eggs|      11.39|        10.46|
|Argentina|  Milk - inc. cheese|     195.08|       277.87|
|Argentina|Wheat and Wheat P...|     103.11|        19.66|
|Argentina|                Rice|       8.77|        11.22|
|Argentina|            Soybeans|        0.0|          0.0|
|Argentina|Nuts inc. Peanut ...|       0.49|         0.87|
|Australia|                Pork|      24.14|        85.44|
|Australia|             Poultry|      46.12|        49.54|
|Australia|                Beef|      33.86|      1044.8

In [7]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [36]:
# Data Quality Check: Count null values in each column
# This helps identify data completeness issues before modeling
# Remove rows where target variable (co2_emmission) or key features are missing
# Clean data is essential for reliable model training

In [8]:
from pyspark.sql.functions import col, isnan, when, count

df_spark.select([count(when(col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()

+-------+-------------+-----------+-------------+
|country|food_category|consumption|co2_emmission|
+-------+-------------+-----------+-------------+
|     10|           10|         10|           10|
+-------+-------------+-----------+-------------+



In [None]:
df_spark = df_spark.dropna(subset=["co2_emmission", "consumption"])  # Drop if important columns are null

In [37]:
# Convert categorical variables to numerical indices

indexer_country = StringIndexer(inputCol="country", outputCol="country_index")
indexer_food = StringIndexer(inputCol="food_category", outputCol="food_index")


In [39]:
# Combine all features into a single vector

assembler = VectorAssembler(
    inputCols=["country_index", "food_index", "consumption"],
    outputCol="features"
)


In [40]:
# Create linear regression model
lr = LinearRegression(featuresCol="features", labelCol="co2_emmission")


In [41]:
# Build ML pipeline with all preprocessing and modeling steps
pipeline = Pipeline(stages=[indexer_country, indexer_food, assembler, lr])


In [42]:
# Split data into training (80%) and testing (20%) sets
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)


In [43]:
# Train the complete pipeline
model = pipeline.fit(train_data)


In [44]:
# Make predictions on test data
predictions = model.transform(test_data)
predictions.select("co2_emmission", "prediction").show(10)


+-------------+-------------------+
|co2_emmission|         prediction|
+-------------+-------------------+
|         6.15|-21.333345844735447|
|        38.51|-1.4304751446120463|
|         9.96|   42.0567986837257|
|         5.97|-17.398738617472645|
|          3.8|   42.8396783426316|
|         1.02|  71.85188756389933|
|        18.62|  37.16262771361305|
|         6.96|-12.385141413677445|
|        10.74|   70.8119920998454|
|         6.96|-25.235507471695684|
+-------------+-------------------+
only showing top 10 rows


In [45]:
# Evaluate model performance using multiple metrics
evaluator = RegressionEvaluator(
    labelCol="co2_emmission", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")


RMSE: 115.20849139249091


In [46]:
# Additional metrics for comprehensive evaluation

r2 = evaluator.setMetricName("r2").evaluate(predictions)
mae = evaluator.setMetricName("mae").evaluate(predictions)

print(f"R2: {r2}, MAE: {mae}")


R2: 0.15491332585713546, MAE: 63.228830160901886


As can be seen the label encoding , liner regression approach doesnt result into decent scores , as the models fails to learn on 15 % of variance.

We now shift to some advanced ML models , Random Forest

In [47]:
df_spark = spark.read.csv('food_consumption.csv',header=True,inferSchema=True)

In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import log1p
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [49]:
# 1. Log transform target and numeric feature
df_spark = df_spark.withColumn("log_co2", log1p("co2_emmission")) \
                   .withColumn("log_consumption", log1p("consumption"))

# 2. Drop rows with nulls in important columns
df_spark = df_spark.dropna(subset=["country", "food_category", "consumption", "co2_emmission"])


In [50]:
# String Indexing
indexer_country = StringIndexer(inputCol="country", outputCol="country_index")
indexer_food = StringIndexer(inputCol="food_category", outputCol="food_index")

# One-Hot Encoding
encoder_country = OneHotEncoder(inputCol="country_index", outputCol="country_ohe")
encoder_food = OneHotEncoder(inputCol="food_index", outputCol="food_ohe")

# Assemble features
assembler = VectorAssembler(
    inputCols=["country_ohe", "food_ohe", "log_consumption"],
    outputCol="features"
)


In [51]:
rf = RandomForestRegressor(featuresCol="features", labelCol="log_co2", numTrees=100, maxDepth=6)


In [54]:
pipeline = Pipeline(stages=[
    indexer_country,
    indexer_food,
    encoder_country,
    encoder_food,
    assembler,
    rf
])


In [55]:
df_spark.show()

+---------+--------------------+-----------+-------------+------------------+------------------+
|  country|       food_category|consumption|co2_emmission|           log_co2|   log_consumption|
+---------+--------------------+-----------+-------------+------------------+------------------+
|Argentina|                Pork|      10.51|         37.2|3.6428355156125294| 2.443216222733791|
|Argentina|             Poultry|      38.66|        41.53| 3.750209709265542|  3.68034312309165|
|Argentina|                Beef|      55.48|       1712.0|  7.44600149832412| 4.033886593184986|
|Argentina|         Lamb & Goat|       1.56|        54.63| 4.018722624087202|0.9400072584914712|
|Argentina|                Fish|       4.36|         6.96|2.0744289998562917|1.6789639750827106|
|Argentina|                Eggs|      11.39|        10.46|2.4388627112865935|2.5168896956410514|
|Argentina|  Milk - inc. cheese|     195.08|       277.87| 5.630745723412227|5.2785227392198575|
|Argentina|Wheat and Wheat P..

In [56]:
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)


In [57]:
predictions = model.transform(test_data)
predictions.select("co2_emmission", "prediction").show(10)


+-------------+------------------+
|co2_emmission|        prediction|
+-------------+------------------+
|         6.15|   2.3142473693444|
|        38.51|3.0568055867921315|
|         9.96| 2.647743908744865|
|         5.97|   2.3142473693444|
|          3.8|1.8846909515698747|
|         1.02|1.4492803731429416|
|        18.62|3.2977640185222974|
|         6.96|2.5429763385996362|
|        10.74|2.4305441509369943|
|         6.96|2.5429763385996362|
+-------------+------------------+
only showing top 10 rows


In [58]:
from pyspark.sql.functions import expm1

# Inverse log for actual and predicted
predictions = predictions.withColumn("prediction_exp", expm1("prediction"))
evaluator = RegressionEvaluator(labelCol="co2_emmission", predictionCol="prediction_exp")

print("📉 RMSE:", evaluator.setMetricName("rmse").evaluate(predictions))
print("📈 R2:", evaluator.setMetricName("r2").evaluate(predictions))
print("📉 MAE:", evaluator.setMetricName("mae").evaluate(predictions))


📉 RMSE: 80.10442487706983
📈 R2: 0.5914499317573629
📉 MAE: 26.199434662519803


Using the Random Forest Regressor , the variance capturing ability of the model increase by 4 times , and scaling the numerical features by log also added to the model capability.

In [59]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Step 10: Hyperparameter Tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [4, 6, 8]) \
    .addGrid(rf.minInstancesPerNode, [1, 2]) \
    .build()

In [60]:
evaluator = RegressionEvaluator(labelCol="log_co2", predictionCol="prediction", metricName="r2")

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2,
                    seed=42)

# Step 11: Fit CrossValidator
cv_model = cv.fit(train_data)

In [32]:
# Step 12: Get Best Model from CrossValidator
best_model = cv_model.bestModel

print("✅ Best Model Params:")
print(f"- Num Trees: {best_model.stages[-1]._java_obj.getNumTrees()}")
print(f"- Max Depth: {best_model.stages[-1]._java_obj.getMaxDepth()}")

# Step 13: Retrain Best Model (Optional, skipped here since it's already trained on full training set)

# Step 14: Predict on Test Set
predictions = best_model.transform(test_data)

# Step 15: Evaluate (Inverse log to get back real CO2 emission values)
predictions = predictions.withColumn("prediction_exp", expm1("prediction"))


✅ Best Model Params:
- Num Trees: 50
- Max Depth: 8


In [61]:
# Metrics on real scale
evaluator_real = RegressionEvaluator(labelCol="co2_emmission", predictionCol="prediction_exp")

rmse = evaluator_real.setMetricName("rmse").evaluate(predictions)
r2 = evaluator_real.setMetricName("r2").evaluate(predictions)
mae = evaluator_real.setMetricName("mae").evaluate(predictions)

print(f"\n📊 Final Evaluation on Test Set:")
print(f"- RMSE: {rmse:.4f}")
print(f"- R²: {r2:.4f}")
print(f"- MAE: {mae:.4f}")


📊 Final Evaluation on Test Set:
- RMSE: 80.1044
- R²: 0.5914
- MAE: 26.1994


Upon Hyper Parameter tuning , The model has improved to 80% variance capture which makes the model more able with right set of parameters for the model.