In [19]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import VectorUDT
import mlflow
import mlflow.spark

# Create a Spark session
spark = SparkSession.builder.appName("LinearRegressionExample").getOrCreate()

def train_linear_regression_model(data, train_fraction=0.8):
    # Create a Spark session
    spark = SparkSession.builder.appName("LinearRegressionExample").getOrCreate()

    # Specify the MLflow YAML configuration file
    mlflow_config_file = "/Users/salvadorgarcia/Repos/kaggle_experiments/spark/mlflow.yaml"

    # Load the MLflow configuration from the YAML file
    mlflow.set_tracking_uri("file://{}".format(mlflow_config_file))

    # Create an MLflow run
    with mlflow.start_run():
        mlflow.log_param("train_fraction", train_fraction)
        
        # Create a DataFrame with the correct data type
        df = spark.createDataFrame(data, ["label", "features"]).withColumn("features", col("features").cast(VectorUDT()))

        # Split the dataset into training and testing sets
        train_data, test_data = df.randomSplit([train_fraction, 1 - train_fraction], seed=123)

        # Create a LinearRegression model
        lr = LinearRegression(featuresCol="features", labelCol="label")

        # Fit the model to the training data
        lr_model = lr.fit(train_data)

        # Log model parameters
        mlflow.log_param("model_type", "LinearRegression")

        # Log the model itself
        mlflow.spark.log_model(lr_model, "model")

        return lr_model, test_data, spark

def score_linear_regression_model(model, test_data,spark):
    # Make predictions on the test data
    predictions = model.transform(test_data)
    return predictions

def evaluate_linear_regression_model(predictions):
    # Check if there are any predictions
    if predictions.count() > 0:
        # Evaluate the model
        evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
        rmse = evaluator.evaluate(predictions)
        
        # Log RMSE as a metric
        mlflow.log_metric("RMSE", rmse)

        return rmse
    else:
        return None
    
def main():
    # Create a larger synthetic dataset with 50 data points
    data = [(1.0, Vectors.dense(0.1, 0.2)),
            (2.0, Vectors.dense(0.2, 0.3)),
            (3.0, Vectors.dense(0.3, 0.4)),
            (4.0, Vectors.dense(0.4, 0.5)),
            (5.0, Vectors.dense(0.5, 0.6)),
            (6.0, Vectors.dense(0.6, 0.7)),
            (7.0, Vectors.dense(0.7, 0.8)),
            (8.0, Vectors.dense(0.8, 0.9)),
            (9.0, Vectors.dense(0.9, 1.0)),
            (10.0, Vectors.dense(1.0, 1.1)),
            (11.0, Vectors.dense(1.1, 1.2)),
            (12.0, Vectors.dense(1.2, 1.3)),
            (13.0, Vectors.dense(1.3, 1.4)),
            (14.0, Vectors.dense(1.4, 1.5)),
            (15.0, Vectors.dense(1.5, 1.6)),
            (16.0, Vectors.dense(1.6, 1.7)),
            (17.0, Vectors.dense(1.7, 1.8)),
            (18.0, Vectors.dense(1.8, 1.9)),
            (19.0, Vectors.dense(1.9, 2.0)),
            (20.0, Vectors.dense(2.0, 2.1)),
            (21.0, Vectors.dense(2.1, 2.2)),
            (22.0, Vectors.dense(2.2, 2.3)),
            (23.0, Vectors.dense(2.3, 2.4)),
            (24.0, Vectors.dense(2.4, 2.5)),
            (25.0, Vectors.dense(2.5, 2.6)),
            (26.0, Vectors.dense(2.6, 2.7)),
            (27.0, Vectors.dense(2.7, 2.8)),
            (28.0, Vectors.dense(2.8, 2.9)),
            (29.0, Vectors.dense(2.9, 3.0)),
            (30.0, Vectors.dense(3.0, 3.1)),
            (31.0, Vectors.dense(3.1, 3.2)),
            (32.0, Vectors.dense(3.2, 3.3)),
            (33.0, Vectors.dense(3.3, 3.4)),
            (34.0, Vectors.dense(3.4, 3.5)),
            (35.0, Vectors.dense(3.5, 3.6)),
            (36.0, Vectors.dense(3.6, 3.7)),
            (37.0, Vectors.dense(3.7, 3.8)),
            (38.0, Vectors.dense(3.8, 3.9)),
            (39.0, Vectors.dense(3.9, 4.0)),
            (40.0, Vectors.dense(4.0, 4.1)),
            (41.0, Vectors.dense(4.1, 4.2)),
            (42.0, Vectors.dense(4.2, 4.3)),
            (43.0, Vectors.dense(4.3, 4.4)),
            (44.0, Vectors.dense(4.4, 4.5)),
            (45.0, Vectors.dense(4.5, 4.6)),
            (46.0, Vectors.dense(4.6, 4.7)),
            (47.0, Vectors.dense(4.7, 4.8)),
            (48.0, Vectors.dense(4.8, 4.9)),
            (49.0, Vectors.dense(4.9, 5.0)),
            (50.0, Vectors.dense(5.0, 5.1))]

    # Train the Linear Regression model
    lr_model, test_data, spark = train_linear_regression_model(data, train_fraction=0.8)

    # Score the model on the test data
    predictions = score_linear_regression_model(lr_model, test_data, spark)

    # Evaluate the model
    rmse = evaluate_linear_regression_model(predictions)

    if rmse is not None:
        print("Root Mean Squared Error (RMSE):", rmse)
    else:
        print("No predictions were made. Check your data or model.")

    # Stop the Spark session
    spark.stop()

if __name__ == "__main__":
    main()



NotADirectoryError: [Errno 20] Not a directory: '/Users/salvadorgarcia/Repos/kaggle_experiments/spark/mlflow.yaml/.trash'

In [6]:
#!mlflow ui

[2023-10-26 23:19:35 -0600] [31923] [INFO] Starting gunicorn 21.2.0
[2023-10-26 23:19:35 -0600] [31923] [INFO] Listening at: http://127.0.0.1:5000 (31923)
[2023-10-26 23:19:35 -0600] [31923] [INFO] Using worker: sync
[2023-10-26 23:19:35 -0600] [31924] [INFO] Booting worker with pid: 31924
[2023-10-26 23:19:35 -0600] [31925] [INFO] Booting worker with pid: 31925
[2023-10-26 23:19:35 -0600] [31926] [INFO] Booting worker with pid: 31926
[2023-10-26 23:19:35 -0600] [31927] [INFO] Booting worker with pid: 31927
^C
[2023-10-26 23:19:44 -0600] [31923] [INFO] Handling signal: int
[2023-10-26 23:19:44 -0600] [31925] [INFO] Worker exiting (pid: 31925)
[2023-10-26 23:19:44 -0600] [31924] [INFO] Worker exiting (pid: 31924)
[2023-10-26 23:19:44 -0600] [31927] [INFO] Worker exiting (pid: 31927)
[2023-10-26 23:19:44 -0600] [31926] [INFO] Worker exiting (pid: 31926)
