In [1]:
from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName("jupyter-nb-test").getOrCreate()

# Create data
data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
df = sparkSession.createDataFrame(data)

# Check dataframe
df.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---+
|    _1| _2|
+------+---+
| First|  1|
|Second|  2|
| Third|  3|
|Fourth|  4|
| Fifth|  5|
+------+---+

In [2]:
%%local
import os
ai_server = os.environ["MOSAIC_AI_SERVER"]
project_id = os.environ["PROJECT_ID"]
token = os.environ["TOKEN"]

In [3]:
%%send_to_spark -i ai_server -t str -n ai_server

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'ai_server' as 'ai_server' to Spark kernel

In [4]:
%%send_to_spark -i project_id -t str -n project_id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'project_id' as 'project_id' to Spark kernel

In [5]:
%%send_to_spark -i token -t str -n token

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'token' as 'token' to Spark kernel

In [6]:
import os
os.environ["MOSAIC_AI_SERVER"] = ai_server
os.environ["PROJECT_ID"] = project_id
os.environ["TOKEN"] = token

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Loading dataset from AZURE

In [7]:
storage_account_name = "pocmosaic"
storage_account_access_key = "iX6jZejeNh1bUejdSRYk+4X2S/pAbTy+idkTM5UAw2HxgxSnaYGJSN+ffzZAQ5/Q6xQe2Ja74Re9nKHMVmsGhA=="
file_location = "wasbs://container1@pocmosaic.blob.core.windows.net/catalog/local_file_upload/RatanBBostonHousing.csv"
file_type = "csv"
spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net",storage_account_access_key)
dataset = spark.read.option("header", "true").format(file_type).option("inferSchema", "true").load(file_location)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')

output = assembler.transform(dataset)

#Input vs Output
finalized_data = output.select("Attributes","medv")

finalized_data.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows

In [9]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.00632,18.0,2.3...|24.0| 30.67575840587645|
|[0.01096,55.0,2.2...|22.0|27.640508413982875|
|[0.01778,95.0,1.4...|32.9|31.080266438631806|
|[0.01951,17.5,1.3...|33.0|24.143165555744687|
|[0.01965,80.0,1.7...|20.1|20.143156573494434|
|[0.02009,95.0,2.6...|50.0|44.081283318029165|
|[0.02055,85.0,0.7...|24.7|25.115388133317964|
|[0.02177,82.5,2.0...|42.3|37.894735331004654|
|[0.02875,28.0,15....|25.0| 29.22501096088576|
|[0.03041,0.0,5.19...|18.5| 19.34918266144992|
|[0.03502,80.0,4.9...|28.5|33.643746853683275|
|[0.03584,80.0,3.3...|23.5|30.401468141319164|
|[0.03659,25.0,4.8...|24.8|26.138287304564713|
|[0.04203,28.0,15....|22.9|29.110558912576614|
|[0.04337,21.0,5.6...|20.5|24.143566725534058|
|[0.04819,80.0,3.6...|21.9|24.155284692441867|
|[0.05425,0.0,4.05...|24.6|29.401672809884996|
|[0.0566,0.0,3.41,...|23.6|31.302523690054272|
|[0.05789,12.

In [10]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The coefficient of the model is : DenseVector([-0.1, 0.0425, 0.0279, 2.4716, -15.2808, 4.2783, -0.0005, -1.4009, 0.2692, -0.013, -0.9905, 0.0084, -0.5533])
The Intercept of the model is : 33.826427

In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE: 4.744
MSE: 22.506
MAE: 3.430
r2: 0.594

In [12]:
from mosaicml import *
from mosaicml.constants import MLModelFlavours
from werkzeug.datastructures import FileStorage

@scoring_func
def score(model, request):
    data = request.json()
    accuracy = model.evaluate(data["test_data"])
    print("Test set accuracy = " + str(accuracy))
    return accuracy


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
register_model(regressor, score, "Boston_Regression_Analysis_101_3", "pyspark model", MLModelFlavours.pyspark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'created_by': 'ratan.boddu', 'created_on': '2021-02-02T08:02:06+00:00', 'deploymentstatus': False, 'description': 'pyspark model', 'flavour': 'pyspark', 'id': 'd3aeea57-de77-45fa-a6cb-4edce6414e4a', 'last_modified_by': 'ratan.boddu', 'last_modified_on': '2021-02-02T08:02:06+00:00', 'model_display': True, 'name': 'Boston_Regression_Analysis_101_3', 'project_id': '7c0d2ec3-b6fc-4166-9fba-6ae5cdacbd92', 'source': '', 'tags': None, 'type': 'model', 'versions': [{'created_by': 'ratan.boddu', 'created_on': '2021-02-02T08:02:06+00:00', 'datasource_name': '', 'dependent_model': None, 'deploy_info': None, 'deployments': [], 'description': None, 'docker_image_url': 'registry.lti-aiq.in:443/mosaic-ai-logistics/mosaic-ai-serving:1.0.0-07122021', 'gpu_docker_image_url': 'registry.lti-aiq.in:443/mosaic-ai-logistics/mosaic-ai-serving:gpu-1.0.0-07122021', 'id': '3fc0434b-55e9-4a6f-85fe-41d69fe932af', 'init_script': '"pip install --user absl-py==0.11.0\\n pip install --user alembic==1.5.3\\n pip insta

In [14]:
aws_id = 'AKIARZWAXCM2ANPGHXG6'
aws_key = '2Hk48QxHYY1133QeETm8TPh47FaUGK1YPVWGviIP'

# configure hadoop with s3 creds
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)

dataset = spark.read.option("header", "true").csv("s3n://mosaic-model-registry/287feeff-1c88-435c-b76b-24eb5c5ca231/BostonHousing.csv")
dataset.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm| age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632|  18| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|  24|
|0.02731|   0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729|   0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237|   0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905|   0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985|   0| 2.18|   0|0.458| 6.43|58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012|66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172|96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631| 100|6.0821|  5|311|   15.2|386.63|29.9

In [None]:
1+1