**Import libraries and initialize spark session**

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark


# Initialize Spark session
spark = SparkSession.builder.appName("######").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/01 20:23:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/01 20:23:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


**Setup MLflow tracking and UI**

In [2]:
# Set the MLFlow tracking URI and experiment name
mlflow.set_tracking_uri("######")
mlflow.set_experiment("######")

<Experiment: artifact_location='mlflow-artifacts:/825739814655208411', creation_time=1719876015359, experiment_id='825739814655208411', last_update_time=1719876015359, lifecycle_stage='active', name='######', tags={}>

**Load the train and test data**

In [3]:
# Load the train and test data
train_data = spark.read.csv('Data/mining_train_data.csv', header=True, inferSchema=True)
test_data = spark.read.csv('Data/mining_test_data.csv', header=True, inferSchema=True)

# Assemble features
feature_columns = [col for col in train_data.columns if col != '% Iron Concentrate']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

train_data = assembler.transform(train_data).select('features', '% Iron Concentrate')
test_data = assembler.transform(test_data).select('features', '% Iron Concentrate')

                                                                                

**Define models**

In [4]:
# Define models with a non-zero regParam for Linear Regression
models = {
    "Linear Regression": LinearRegression(featuresCol='features', labelCol='% Iron Concentrate', regParam=0.1),
    "Decision Tree": DecisionTreeRegressor(featuresCol='features', labelCol='% Iron Concentrate'),
    "Random Forest": RandomForestRegressor(featuresCol='features', labelCol='% Iron Concentrate')
}

**Modelexperimentation and selection**

In [5]:
# Experiment with models
best_model = None
best_rmse = float('inf')
best_model_name = ""
evaluator = RegressionEvaluator(labelCol='% Iron Concentrate', predictionCol='prediction', metricName='rmse')

for name, model in models.items():
    with mlflow.start_run(run_name=name):
        # Train the model
        model = model.fit(train_data)
        
        # Make predictions
        predictions = model.transform(test_data)
        
        # Evaluate RMSE
        rmse = evaluator.evaluate(predictions)
        
        # Log parameters, metrics, and the model
        mlflow.log_param("model", name)
        mlflow.log_metric("rmse", rmse)
        mlflow.spark.log_model(model, artifact_path=name)
        
        # Identify the best model
        if rmse < best_rmse:
            best_rmse = rmse
            best_model = model
            best_model_name = name

# Save the best model
mlflow.spark.save_model(best_model, "best_model")

print(f"Model experimentation complete. Best model: {best_model_name} with RMSE: {best_rmse}")

24/07/01 20:23:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/07/01 20:23:56 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/07/01 20:24:00 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.ap

Model experimentation complete. Best model: Decision Tree with RMSE: 0.6313495279143028
