# **1. Install and import necessary libraries** #

In [1]:
# Install pyspark
!pip install pyspark



In [2]:
# Import SparkSession
from pyspark.sql import SparkSession

# Import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

# Import functions/Classes for pipeline creation

from pyspark.ml import Pipeline

# Import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# Create Spark Session
spark = SparkSession.builder.appName("PipelineMachineLearningwithSpark").getOrCreate()
spark.version

'3.5.0'

# **2. Load Dataset** #

In [4]:
# Connect Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Load dataset
df_mpg = spark.read.csv("/content/drive/MyDrive/Machine_Learning_wtih_Spark/dataset/mpg.csv",
                        header = True, inferSchema = True)

# Print schema of dataframe
df_mpg.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [6]:
# Show 5 rows
df_mpg.head(5)

[Row(MPG=15.0, Cylinders=8, Engine Disp=390.0, Horsepower=190, Weight=3850, Accelerate=8.5, Year=70, Origin='American'),
 Row(MPG=21.0, Cylinders=6, Engine Disp=199.0, Horsepower=90, Weight=2648, Accelerate=15.0, Year=70, Origin='American'),
 Row(MPG=18.0, Cylinders=6, Engine Disp=199.0, Horsepower=97, Weight=2774, Accelerate=15.5, Year=70, Origin='American'),
 Row(MPG=16.0, Cylinders=8, Engine Disp=304.0, Horsepower=150, Weight=3433, Accelerate=12.0, Year=70, Origin='American'),
 Row(MPG=14.0, Cylinders=8, Engine Disp=455.0, Horsepower=225, Weight=3086, Accelerate=10.0, Year=70, Origin='American')]

# **3. Define Pipeline Stage** #

In [7]:
# Stage 1: Assemble input column into a single vector
vectorAssembler = VectorAssembler(inputCols = ["Weight", "Horsepower", "Engine Disp"], outputCol = "features")

# Stage 2: Scale the column features
scaler = StandardScaler(inputCol = "features", outputCol = "scaledFeatures")

# Stage 3: Define Linear Regression model
lr = LinearRegression(featuresCol = "scaledFeatures", labelCol = "MPG")

# **4. Build the Pipeline and Split data** #

In [8]:
# Define pipeline
pipeline = Pipeline(stages = [vectorAssembler, scaler, lr])

In [9]:
# Split data
(training_data, testing_data) = df_mpg.randomSplit([0.7, 0.3], seed = 42)

# **5. Fit the pipeline** #

In [10]:
model = pipeline.fit(training_data)

# **6. Evaluate model** #

In [11]:
predictions = model.transform(testing_data)
predictions.head(5)

[Row(MPG=10.0, Cylinders=8, Engine Disp=360.0, Horsepower=215, Weight=4615, Accelerate=14.0, Year=70, Origin='American', features=DenseVector([4615.0, 215.0, 360.0]), scaledFeatures=DenseVector([5.7482, 5.8701, 3.6233]), prediction=8.164398550808087),
 Row(MPG=11.0, Cylinders=8, Engine Disp=429.0, Horsepower=208, Weight=4633, Accelerate=11.0, Year=72, Origin='American', features=DenseVector([4633.0, 208.0, 429.0]), scaledFeatures=DenseVector([5.7706, 5.6789, 4.3178]), prediction=8.19606223042593),
 Row(MPG=12.0, Cylinders=8, Engine Disp=350.0, Horsepower=180, Weight=4499, Accelerate=12.5, Year=73, Origin='American', features=DenseVector([4499.0, 180.0, 350.0]), scaledFeatures=DenseVector([5.6037, 4.9145, 3.5227]), prediction=10.751185817515413),
 Row(MPG=12.0, Cylinders=8, Engine Disp=383.0, Horsepower=180, Weight=4955, Accelerate=11.5, Year=71, Origin='American', features=DenseVector([4955.0, 180.0, 383.0]), scaledFeatures=DenseVector([6.1717, 4.9145, 3.8548]), prediction=8.2359941479

In [12]:
evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

Root Mean Squared Error (RMSE) = 3.8756646183839187


# **7. Save model** #

In [13]:
# Persist the model to the path "./model_saved/"

model.write().overwrite().save("/content/drive/MyDrive/Machine_Learning_wtih_Spark/model_saved/")

# **8. Load model** #

In [14]:
from pyspark.ml.pipeline import PipelineModel

# Load persisted model
loaded_model = PipelineModel.load("/content/drive/MyDrive/Machine_Learning_wtih_Spark/model_saved/")

In [15]:
predictions_loaded_model = loaded_model.transform(testing_data)
predictions_loaded_model.head(5)

[Row(MPG=10.0, Cylinders=8, Engine Disp=360.0, Horsepower=215, Weight=4615, Accelerate=14.0, Year=70, Origin='American', features=DenseVector([4615.0, 215.0, 360.0]), scaledFeatures=DenseVector([5.7482, 5.8701, 3.6233]), prediction=8.164398550808087),
 Row(MPG=11.0, Cylinders=8, Engine Disp=429.0, Horsepower=208, Weight=4633, Accelerate=11.0, Year=72, Origin='American', features=DenseVector([4633.0, 208.0, 429.0]), scaledFeatures=DenseVector([5.7706, 5.6789, 4.3178]), prediction=8.19606223042593),
 Row(MPG=12.0, Cylinders=8, Engine Disp=350.0, Horsepower=180, Weight=4499, Accelerate=12.5, Year=73, Origin='American', features=DenseVector([4499.0, 180.0, 350.0]), scaledFeatures=DenseVector([5.6037, 4.9145, 3.5227]), prediction=10.751185817515413),
 Row(MPG=12.0, Cylinders=8, Engine Disp=383.0, Horsepower=180, Weight=4955, Accelerate=11.5, Year=71, Origin='American', features=DenseVector([4955.0, 180.0, 383.0]), scaledFeatures=DenseVector([6.1717, 4.9145, 3.8548]), prediction=8.2359941479

# **9. Stop Spark Session** #

In [16]:
#spark.stop()