# Iteration 4 BDAS Step 4

## xzha710 408571028

Log Transformation and Data Reduction

In [1]:
from pyspark.sql import SparkSession

# Create or retrieve a Spark session
spark = SparkSession.builder.appName("agrofood_co2_emission").getOrCreate()

# Load the dataset
final_data = spark.read.csv("final_agrofood_co2_emission.csv", header=True, inferSchema=True)

from pyspark.sql.functions import log1p, rand

# Log transformation on 'total_emission' column
final_data = final_data.withColumn('log_total_emission', log1p('total_emission'))

# Randomly sample 75% of the records
sampled_data = final_data.sample(False, 0.75, seed=42)

# Save the transformed and sampled data
sampled_data.write.csv("processed_agrofood_co2_emission.csv", header=True)


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/10 07:02:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/10 07:02:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


AnalysisException: path file:/home/ubuntu/test/processed_agrofood_co2_emission.csv already exists.

Feature Importance Calculation with Random Forest

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor

# Convert all features into a single vector column
input_columns = [col for col in final_data.columns if col not in ['Area', 'Year', 'Average Temperature °C', 'Yearly Average Temperature °C']]

# Use the handleInvalid parameter to skip rows with null values
assembler = VectorAssembler(inputCols=input_columns, outputCol='features', handleInvalid="skip")

# RandomForestRegressor expects the target variable to be named 'label'
data_renamed = final_data.withColumnRenamed('Average Temperature °C', 'label')

# Build and fit the random forest model
rf = RandomForestRegressor(numTrees=100, seed=42)
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(data_renamed)

# Extract feature importances
importances = model.stages[-1].featureImportances.toArray()
feature_importance_list = list(zip(input_columns, importances))
sorted_feature_importance = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)
for feature, importance in sorted_feature_importance:
    print(f"{feature}: {importance}")


[Stage 16:>                                                         (0 + 1) / 1]

Food Retail: 0.15079007346392076
IPPU: 0.08734389312608137
Rice Cultivation: 0.07900474781117581
Food Transport: 0.07483611376187287
Manure left on Pasture: 0.0723557829455519
Forest fires: 0.05613027276949092
Manure applied to Soils: 0.04831403091097261
On-farm Electricity Use: 0.04257742328286308
Net Forest conversion: 0.04149277488518622
Manure Management: 0.03600984961297173
Fertilizers Manufacturing: 0.030971579163071698
Rural population: 0.03047718534007001
Food Household Consumption: 0.030109285508795335
Forestland: 0.027338367817635816
Agrifood Systems Waste Disposal: 0.024434375814709215
Food Packaging: 0.023594612191859838
total_emission: 0.023072093145928823
Savanna fires: 0.021767927727322004
log_total_emission: 0.02070770754788654
Urban population: 0.020266024221879314
Food Processing: 0.018298980862755797
Total Population - Male: 0.01439661571663916
Total Population - Female: 0.013763795611651856
Crop Residues: 0.011946486759707225


                                                                                