In [None]:
pip install pyspark



In [None]:
#dataset can be found on this link: https://github.com/DeltaOptimist/Multiple_Linear_Regression_PySpark/blob/main/cruise_ship_info.csv

import pyspark
from pyspark.sql import SparkSession
#SparkSession is now the entry point of Sparl
#SparkSession can also be contrued as gateway to spark libraries

#Create instance of spark class
spark = SparkSession.builder.appName('housing_price_model').getOrCreate()

#Create spark dataframe of input csv file
df = spark.read.csv('/content/cruise_ship_info.csv', inferSchema = True, header = True)
df.show(10)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [None]:
#prints structure of dataframe along with dataframe
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [None]:
#In our predictive model, below are the columns
df.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

In [None]:
#columns identified as features are are below:\
#['Cruise_line','Age','Tonnage','passengers','Length','cabins','passenger_density']
#to work on the features, spark MLlib expects every value to be in numeric format
#feature String Indexer, string types will be typecast to numeric datatype
#import library strnindexer for typecasting

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = 'Cruise_line', outputCol = 'cruise_cat')
indexed = indexer.fit(df).transform(df)

In [None]:
#above code will convert string to numeric feature and create a new dataframe
#new dataframe contains a new feature 'cruise_cat' and can be used further
#feaure cruise_cat is now vectorized and can be used to fed to model

for item in indexed.head(5):
  print(item)
  print('\n')

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)


Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)


Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, Tonnage=47.262, passengers=14.86, length=7.22, cabins=7.43, passenger_density=31.8, crew=6.7, cruise_cat=1.0)


Row(Ship_name='Conquest', Cruise_line='Carnival', Age=11, Tonnage=110.0, passengers=29.74, length=9.53, cabins=14.88, passenger_density=36.99, crew=19.1, cruise_cat=1.0)


Row(Ship_name='Destiny', Cruise_line='Carnival', Age=17, Tonnage=101.353, passengers=26.42, length=8.92, cabins=13.21, passenger_density=38.36, crew=10.0, cruise_cat=1.0)




In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
#creating vectors from features
#Apache MLlib takes input if vector form
assembler=VectorAssembler(inputCols=['Age','Tonnage','passengers','length','cabins','passenger_density','cruise_cat'],outputCol='features')
output=assembler.transform(indexed)
output.select('features','crew').show(5)

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
+--------------------+----+
only showing top 5 rows



In [None]:
#final data consist of features and label which is crew.
final_data=output.select('features','crew')
#splitting data into train and test
train_data,test_data=final_data.randomSplit([0.7,0.3])
train_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              107|
|   mean|7.444672897196272|
| stddev|3.319543136830337|
|    min|             0.59|
|    max|             19.1|
+-------+-----------------+



In [None]:
test_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|               51|
|   mean|8.527450980392157|
| stddev|3.790878970970851|
|    min|             0.88|
|    max|             21.0|
+-------+-----------------+



In [None]:
#import LinearRegression library
from pyspark.ml.regression import LinearRegression

#creating an object of class LinearRegression
#object takes features and label as input arguments
ship_lr=LinearRegression(featuresCol='features',labelCol='crew')

#pass train_data to train model
trained_ship_model=ship_lr.fit(train_data)

#evaluating model trained for Rsquared error
ship_results=trained_ship_model.evaluate(train_data)

In [None]:
print('Rsquared Error :',ship_results.r2)

Rsquared Error : 0.9233287630913424


In [None]:
#testing Model on unlabeled data
#create unlabeled data from test_data
#testing model on unlabeled data
unlabeled_data=test_data.select('features')
unlabeled_data.show(5)

+--------------------+
|            features|
+--------------------+
|[4.0,220.0,54.0,1...|
|[5.0,86.0,21.04,9...|
|[5.0,115.0,35.74,...|
|[6.0,93.0,23.94,9...|
|[6.0,110.23899999...|
+--------------------+
only showing top 5 rows



In [None]:
predictions=trained_ship_model.transform(unlabeled_data)
predictions.show()
#below are the results of output from test data

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[4.0,220.0,54.0,1...|21.285957787777253|
|[5.0,86.0,21.04,9...| 9.117000919998318|
|[5.0,115.0,35.74,...|11.715218618053164|
|[6.0,93.0,23.94,9...|10.366553946532335|
|[6.0,110.23899999...|11.042283105412547|
|[6.0,113.0,37.82,...|11.572995563614256|
|[6.0,158.0,43.7,1...|14.044016157011319|
|[8.0,91.0,22.44,9...|  9.93326237393997|
|[9.0,90.09,25.01,...| 9.200488371118364|
|[9.0,113.0,26.74,...|11.406507685151512|
|[10.0,68.0,10.8,7...| 6.326552694804576|
|[10.0,77.0,20.16,...| 8.569987752181065|
|[10.0,90.09,25.01...| 8.840103987364431|
|[10.0,151.4,26.2,...|11.358602276547547|
|[11.0,58.6,15.66,...|7.1763388874252305|
|[11.0,90.0,22.4,9...| 9.900465120363044|
|[11.0,138.0,31.14...|13.204015272428313|
|[12.0,25.0,3.88,5...| 2.874850925144705|
|[12.0,58.6,15.66,...| 7.178435209405044|
|[12.0,77.104,20.0...| 8.697733423529334|
+--------------------+------------