In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import joblib
import numpy as np
import pandas as pd
from azure.storage.blob import BlobServiceClient
import io
import os

spark = SparkSession.builder \
    .appName("ETL ML Training and Deployment") \
    .getOrCreate()

new_data = [
    (1.5, 2.3, 3.1, 4.0, 5.2),
    (1.4, 2.2, 3.0, 4.1, 5.1),
    (1.7, 2.5, 3.3, 4.3, 5.4),
    (1.8, 2.6, 3.4, 4.4, 5.5),
]

columns = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5']

new_data_df = spark.createDataFrame(new_data, columns)

assembler = VectorAssembler(inputCols=columns, outputCol="features")
assembled_data = assembler.transform(new_data_df)

X = np.random.rand(100, 5)
y = np.random.rand(100)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = LinearRegression()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error (MSE): {mse}")

connection_string = "DefaultEndpointsProtocol=https;AccountName=thirdtry3;AccountKey=R2tz5fvj0ZdyJ2Cjb4OjyRlaNc1MS9tTPfc5uWjWjnpzm2C7MWtZr1k+7Hnawhesoj/eIPfO05+F+AStoGzFjQ==;EndpointSuffix=core.windows.net"
container_name = "mycontainer"
model_blob_name = "linear_regression_model.pkl"

blob_service_client = BlobServiceClient.from_connection_string(connection_string)

blob_client = blob_service_client.get_blob_client(container=container_name, blob=model_blob_name)

os.makedirs("/dbfs/tmp", exist_ok=True)

with open("/dbfs/tmp/linear_regression_model.pkl", "wb") as model_file:
    joblib.dump(model, model_file)

with open("/dbfs/tmp/linear_regression_model.pkl", "rb") as model_file:
    blob_client.upload_blob(model_file, overwrite=True)

print(f"Model uploaded successfully to Azure Blob Storage as {model_blob_name}")

def generate_predictions(model, assembled_data):
    pandas_df = assembled_data.select('features').toPandas()
    features = np.array(pandas_df['features'].tolist())

    predictions = model.predict(features)

    prediction_df = spark.createDataFrame(pd.DataFrame({'prediction': predictions}))
    return prediction_df

prediction_df = generate_predictions(model, assembled_data)

display(prediction_df)

prediction_pandas_df = prediction_df.toPandas()

predictions_blob_name = "predictions.csv"
predictions_blob_client = blob_service_client.get_blob_client(container=container_name, blob=predictions_blob_name)

with io.BytesIO() as output:
    prediction_pandas_df.to_csv(output, index=False)
    output.seek(0)
    predictions_blob_client.upload_blob(output, overwrite=True)

print(f"Predictions saved to Azure Blob Storage as {predictions_blob_name}")

Mean Squared Error (MSE): 0.09300374423452125
Model uploaded successfully to Azure Blob Storage as linear_regression_model.pkl


prediction
1.497724558074618
1.450763840579021
1.5426892711696023
1.5700672999067151


Predictions saved to Azure Blob Storage as predictions.csv
