#### In this notbeook I do some durther preprocessing and data engineering. There are still questions I want to answer about the final data.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, udf, year, month, dayofmonth, dayofweek, datediff, to_date, regexp_replace, length, unix_timestamp, from_unixtime, log
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

import re
import matplotlib.pyplot as plt
from constants import LOGISTICS_DATA, MATERIALS_DATA, PROJECTS_DATA, SUPPLIERS_DATA, DATA, FINAL_DATA_parquet

In [2]:
spark = SparkSession.builder \
    .appName("CO2 Emission ML Pipeline - Data Preprocessing") \
    .getOrCreate()

#reading df
final_df = spark.read.parquet(FINAL_DATA_parquet)


your 131072x1 screen size is bogus. expect trouble
23/11/05 01:56:51 WARN Utils: Your hostname, Kris resolves to a loopback address: 127.0.1.1; using 172.18.209.221 instead (on interface eth0)
23/11/05 01:56:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/05 01:56:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

#### Handling Date Columns
I noticed that Transaction_Date column is in string format. Upon lookin ginto it I notice that some dates are in the wrong format. Here, I'm fixing this issue and then converting the column to_date to continue my preprocessing.

There are 9175 dates with a format that is not recognized as dd/MM/yyyy, and 1025 dates are in the right dd/MM/yyyy format. The date parsing function is expecting a specific format and cannot parse the dates that do not conform to it.

..this step was a bit annoying and took longer than I expcted :@ 

In [3]:
# Replace hyphens with slashes in the transaction_date column
final_df = final_df.withColumn("transaction_date", regexp_replace("transaction_date", "-", "/"))

# Show the updated DataFrame to confirm the changes
final_df.select("transaction_date").show(10, False)

                                                                                

+----------------+
|transaction_date|
+----------------+
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
|2020/01/01      |
+----------------+
only showing top 10 rows



In [4]:
# Define a function to convert different formats to date type
def convert_date_format(df, col_name):
    return df.withColumn(
        col_name, 
        when(
            col(col_name).rlike("^\d{4}/\d{2}/\d{2}$"),  # matches date like 'yyyy/MM/dd'
            to_date(col(col_name), 'yyyy/MM/dd')
        ).otherwise(
            to_date(col(col_name), 'dd/MM/yyyy')  # assumes the date is in 'dd/MM/yyyy' if not 'yyyy/MM/dd'
        )
    )

# Apply the function to your dataframe
final_df = convert_date_format(final_df, "transaction_date")

# Show the results
final_df.select("transaction_date").show(10, False)


+----------------+
|transaction_date|
+----------------+
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
|2020-01-01      |
+----------------+
only showing top 10 rows



In [5]:
# Check for nulls after conversion
null_counts = final_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Transaction_Date", "project_start_date", "project_end_date"]])
null_counts.show()
print(final_df.dtypes)

[Stage 3:>                                                          (0 + 1) / 1]

+----------------+------------------+----------------+
|Transaction_Date|project_start_date|project_end_date|
+----------------+------------------+----------------+
|               0|                 0|               0|
+----------------+------------------+----------------+

[('material_id', 'int'), ('project_id', 'int'), ('project_budget', 'double'), ('transaction_id', 'int'), ('transaction_date', 'date'), ('quantity', 'int'), ('transport_mode', 'string'), ('distance_covered', 'double'), ('CO2_emission', 'double'), ('project_name', 'string'), ('project_start_date', 'date'), ('project_end_date', 'date'), ('project_location', 'string'), ('supplier_id', 'int'), ('material_name', 'string'), ('material_category', 'string'), ('supplier_name', 'string'), ('supplier_location', 'string'), ('supplier_rating', 'double')]


                                                                                

In [6]:
# Extract Year, Month, and Day from 'Transaction_Date'
final_df = final_df.withColumn("Transaction_Year", year("transaction_date"))
final_df = final_df.withColumn("Transaction_Month", month("transaction_date"))
final_df = final_df.withColumn("Transaction_Day", dayofmonth("transaction_date"))
final_df = final_df.withColumn("is_weekend", (dayofweek("transaction_date").isin([1, 7])).cast("int"))
final_df = final_df.withColumn("project_duration", datediff("project_end_date", "project_start_date"))

#### DATA SUMMARY

Based on the summary statistics results below, the project_budget and CO2_emission variables stand out as the maximum value is substantially larger than the mean, which suggests a long right tail, indicative of a skewed distribution. Usually in such cases, a log transformation helps in stabilizing the variance across levels of input variables and making the relationships more linear for the predictive modeling.
- CO2_emission: As the target variable, it's essential that the error terms in any model predicting it are normally distributed. If CO2_emission is highly skewed, a log transformation could help to achieve a more normally distributed error term.
- project_budget: The maximum value is substantially larger than the mean, which suggests a long right tail, indicative of a skewed distribution. A log transformation could normalize this, which would be particularly useful if project_budget is a feature in a model predicting CO2_emissions.

In [7]:
# Calculate summary statistics
summary_stats = final_df.describe()

# Fetch the statistics for each column from the summary DataFrame
means = {row['summary']: row.asDict() for row in summary_stats.collect()}["mean"]
maxs = {row['summary']: row.asDict() for row in summary_stats.collect()}["max"]
mins = {row['summary']: row.asDict() for row in summary_stats.collect()}["min"]
stddevs = {row['summary']: row.asDict() for row in summary_stats.collect()}["stddev"]

# calculate median for each numerical column
medians = {}
for column in final_df.columns:
    # Check if column is numerical by trying to cast it to a double; skip if casting fails (essentially changing the var type)
    try:
        final_df_numerical = final_df.withColumn(column, final_df[column].cast('double'))
        medians[column] = final_df_numerical.approxQuantile(column, [0.5], 0.0)[0]
    except Exception as e:
        continue

# Display stats for each column
for column in final_df_numerical.columns:
    if column in means:  # This checks if the column is numerical (present in the summary stats)
        print(f"Statistics for {column}:")
        print(f"Mean: {means[column]}")
        print(f"Max: {maxs[column]}")
        print(f"Min: {mins[column]}")
        print(f"Standard Deviation: {stddevs[column]}")
        print(f"Median: {medians.get(column, 'N/A')}")
        print("--------------")


23/11/05 01:57:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Statistics for material_id:
Mean: 496.76872549019606
Max: 1000
Min: 1
Standard Deviation: 290.96310745073055
Median: 497.0
--------------
Statistics for project_id:
Mean: 50.42980392156863
Max: 100
Min: 1
Standard Deviation: 28.75723049387397
Median: 51.0
--------------
Statistics for project_budget:
Mean: 295840.8306163639
Max: 1950581.5483643755
Min: 53805.64013990858
Standard Deviation: 209570.71331281099
Median: 270037.4610087137
--------------
Statistics for transaction_id:
Mean: 5006.130980392157
Max: 10000
Min: 1
Standard Deviation: 2884.331836773124
Median: 5009.0
--------------
Statistics for quantity:
Mean: 50.396470588235296
Max: 99
Min: 1
Standard Deviation: 28.470351337471453
Median: 50.0
--------------
Statistics for transport_mode:
Mean: None
Max: Truck
Min: Drone
Standard Deviation: None
Median: N/A
--------------
Statistics for distance_covered:
Mean: 304.99896981195866
Max: 4989.07372254418
Min: 10.00929808408966
Standard Deviation: 409.6849422645481
Median: 266.75006

#### Log Transformation

Doing Log transform of project_budget and CO2_emission

In [8]:
# Adding log-transformed columns for project_budget and CO2_emission
final_df = final_df.withColumn("log_project_budget", log("project_budget"))
final_df = final_df.withColumn("log_CO2_emission", log("CO2_emission"))

#### CORRELATION BETWEEN NUMERICAL FEATURES AND TARGET FEATURE CO2_EMISSIONS

In [9]:
# List of features
features = final_df.columns
features.remove('CO2_emission')  # Remove the target variable

# Calculate correlations with the target variable
correlations = [(feature, final_df.stat.corr(feature, 'CO2_emission')) for feature in features if final_df.select(feature).dtypes[0][1] in ('double', 'int')]

for feature in correlations:
    print(f"The correlation between CO2_emissions and {feature[0]} is: {feature[1]}")

The correlation between CO2_emissions and material_id is: 0.013309843960719402
The correlation between CO2_emissions and project_id is: 0.015389057139567884
The correlation between CO2_emissions and project_budget is: -0.008704442492230928
The correlation between CO2_emissions and transaction_id is: 0.011334588931078056
The correlation between CO2_emissions and quantity is: 0.5558180227071717
The correlation between CO2_emissions and distance_covered is: 0.21013827019533382
The correlation between CO2_emissions and supplier_id is: -0.01578169754900137
The correlation between CO2_emissions and supplier_rating is: 0.00353117673371521
The correlation between CO2_emissions and Transaction_Year is: 0.011702046301184451
The correlation between CO2_emissions and Transaction_Month is: -0.0010327637222996807
The correlation between CO2_emissions and Transaction_Day is: 4.095459896112875e-06
The correlation between CO2_emissions and is_weekend is: -0.014622640639648167
The correlation between CO

Given that "Quantity" and "Distance Covered" are somewhat correlated with the target variable, I can create polynomial features to capture non-linear relationship. An interaction term can capture the combined effect of two variables. For example, the interaction between "Quantity" and "Distance Covered" might be informative.

Creating polynomial features can be a useful step in capturing non-linear relationships between the features and the target variable. In a linear model, each feature is multiplied by a weight and summed up to make a prediction. This assumes that the relationship between each feature and the target variable is linear, but the real-world relationship between variables can often be more complex and non-linear.

For example, the relationship between "Quantity" and "CO2_emission" might not be a straight line; it could be a curve. In such cases, simply using the "Quantity" feature as-is in a linear model might not capture this curve effectively. But if you add a new feature that is "Quantity" squared, the model has a better chance of capturing this curved relationship.

The same logic applies to "Distance Covered," or any interaction terms between "Quantity" and "Distance Covered." By including these polynomial and interaction terms, you allow the model to fit to a more flexible, potentially non-linear function, which could result in a more accurate model.

In [10]:
# Create polynomial features for 'Quantity' and 'Distance Covered'
final_df = final_df.withColumn("Quantity_Squared", col("Quantity")**2)
final_df = final_df.withColumn("Distance_Covered_Squared", col("Distance_Covered")**2)

# Create interaction term between 'Quantity' and 'Distance Covered'
final_df = final_df.withColumn("Quantity_Distance_Interaction", col("Quantity") * col("Distance_Covered"))


#### TRAIN/TEST SPLIT - ONE HOT ENCODING AND SCALING

In [11]:
# Split the data into training and test sets
train_df, test_df = final_df.randomSplit([0.80, 0.20], seed=42)

identifiers dont need to be scaled (material_id, project_id)

In [39]:
# List of categorical columns to one hot encode
categorical_columns = ["transport_mode", "project_location", "material_category", "supplier_location"]
# list of columns to scale
columns_to_scale = ["Quantity_Squared", "Distance_Covered_Squared", "Quantity_Distance_Interaction", "supplier_rating", "Transaction_Year", "Transaction_Month", "project_duration"]

In [40]:
# Stages in the pipeline
stages = []

# One-Hot Encoding for categorical columns
for categoricalCol in categorical_columns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "Vec"])
    stages += [stringIndexer, encoder]

# Assemble numerical columns that need to be scaled
assembler_for_scaling = VectorAssembler(inputCols=columns_to_scale, outputCol="features_to_scale")
stages += [assembler_for_scaling]

# Scale the numerical columns
scaler = StandardScaler(inputCol="features_to_scale", outputCol="scaledFeatures")
stages += [scaler]

# Assemble all features into one vector column
assembledInputs = [c + "Vec" for c in categorical_columns] + ["scaledFeatures"] + ["log_project_budget"]
final_assembler = VectorAssembler(inputCols=assembledInputs, outputCol="features")
stages += [final_assembler]

# Create a Pipeline with the stages defined above
pipeline = Pipeline(stages=stages)

# Fit the pipeline on the training data
pipeline_model = pipeline.fit(train_df)

# Transform the training and test data
train_transformed = pipeline_model.transform(train_df) # use this to train model
test_transformed = pipeline_model.transform(test_df) # use this to test model

                                                                                

In [41]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the RandomForest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="log_CO2_emission")

# Train the model
rf_model = rf.fit(train_transformed)

# Predict on the test data
predictions = rf_model.transform(test_transformed)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="log_CO2_emission", predictionCol="prediction", metricName="rmse")

# Calculate RMSE
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

# If you want to evaluate R-squared
evaluator = RegressionEvaluator(labelCol="log_CO2_emission", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(f"R-squared on test data = {r2}")




                                                                                

Root Mean Squared Error (RMSE) on test data = 0.5787417089628518
R-squared on test data = 0.796317652770236


In [37]:
# Get the feature importances
importances = rf_model.featureImportances.toArray()

# Start with an empty list for feature names
feature_names = []

# Add names for the one-hot encoded categorical features
# You need to know the number of categories in each categorical feature after one-hot encoding
for categoricalCol in categorical_columns:
    # Assuming we know the number of categories for each column (replace with actual number)
    num_categories = train_transformed.select(categoricalCol + "Vec").head()[0].size
    feature_names += [f"{categoricalCol}_{i}" for i in range(num_categories)]

# Add names for the scaled numerical features
feature_names += columns_to_scale

# Add the log-transformed features if they are also included
feature_names.append("log_project_budget")

# The length of feature names should now match the length of importances
assert len(feature_names) == len(importances), f"Length of feature names ({len(feature_names)}) does not match the number of importances ({len(importances)})"

# Now you can match the importances to the feature names
named_importances = sorted(zip(feature_names, importances), key=lambda x: x[1], reverse=True)

# Print the feature importances
for name, importance in named_importances:
    print(f"{name}: {importance}")


AssertionError: Length of feature names (18) does not match the number of importances (17)