In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('crew_members').getOrCreate()

In [14]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [3]:
#load data from csv file
data = spark.read.csv('cruise_ship_info.csv',inferSchema='True',header='True')

In [5]:
data.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [7]:
data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [9]:
#StringIndexer encodes a string column of labels to a column of label indices
indexer = StringIndexer(inputCol='Cruise_line',outputCol='Cruise_line_index')

In [10]:
indexer = indexer.fit(data).transform(data)

In [13]:
indexer.show()


+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_index|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+-----------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|             16.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|              1.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|              1.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|              1.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|       

In [16]:
indexer.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_line_index']

In [17]:
# transform features columns to single vector column
assembler = VectorAssembler(inputCols=['Cruise_line_index','Age','Tonnage','passengers',
                                      'length','cabins','passenger_density'],outputCol='features')

In [18]:
output = assembler.transform(indexer)

In [19]:
final_data = output.select('features','crew')

In [20]:
final_data.show()

+--------------------+----+
|            features|crew|
+--------------------+----+
|[16.0,6.0,30.2769...|3.55|
|[16.0,6.0,30.2769...|3.55|
|[1.0,26.0,47.262,...| 6.7|
|[1.0,11.0,110.0,2...|19.1|
|[1.0,17.0,101.353...|10.0|
|[1.0,22.0,70.367,...| 9.2|
|[1.0,15.0,70.367,...| 9.2|
|[1.0,23.0,70.367,...| 9.2|
|[1.0,19.0,70.367,...| 9.2|
|[1.0,6.0,110.2389...|11.5|
|[1.0,10.0,110.0,2...|11.6|
|[1.0,28.0,46.052,...| 6.6|
|[1.0,18.0,70.367,...| 9.2|
|[1.0,17.0,70.367,...| 9.2|
|[1.0,11.0,86.0,21...| 9.3|
|[1.0,8.0,110.0,29...|11.6|
|[1.0,9.0,88.5,21....|10.3|
|[1.0,15.0,70.367,...| 9.2|
|[1.0,12.0,88.5,21...| 9.3|
|[1.0,20.0,70.367,...| 9.2|
+--------------------+----+
only showing top 20 rows



In [21]:
#split data to train and test data; proportion 70%, 30%
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [22]:
train_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               120|
|   mean| 7.783000000000004|
| stddev|3.5890378383408015|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



In [23]:
test_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                38|
|   mean|7.8294736842105275|
| stddev|3.2638867125348776|
|    min|               0.6|
|    max|              13.6|
+-------+------------------+



In [24]:
#create linear regression model and select label column
lr = LinearRegression(labelCol='crew')

In [25]:
lr_model = lr.fit(train_data)

In [26]:
test_results = lr_model.evaluate(test_data)

In [48]:
# Residuals (label - predicted value)
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|-0.24226959013916272|
| -1.1232560768388762|
|-0.27735848658274254|
|  1.0991731520015113|
|  0.7014632821589668|
|  -0.538217910678437|
|  0.5716759755780636|
|  0.6212310768922666|
|   0.610743212642924|
|  0.6272351710644486|
| 0.38193671605442603|
|  1.0168626271083063|
|-0.16289679428984982|
|-0.23943682023321067|
| -1.0420840636540634|
| 0.39889645942297847|
| -0.6556831122750904|
| -0.8192003325367061|
|  0.6895570229130348|
|  0.6752155707231164|
+--------------------+
only showing top 20 rows



In [49]:
#show labels and predictions
test_results.predictions.show()

+--------------------+-----+------------------+
|            features| crew|        prediction|
+--------------------+-----+------------------+
|[0.0,6.0,158.0,43...| 13.6|13.842269590139162|
|[0.0,14.0,138.0,3...|11.76|12.883256076838876|
|[0.0,17.0,70.0,20...|  7.2| 7.477358486582743|
|[0.0,23.0,48.563,...| 6.71| 5.610826847998489|
|[1.0,9.0,88.5,21....| 10.3| 9.598536717841034|
|[1.0,10.0,110.0,2...| 11.6|12.138217910678437|
|[1.0,15.0,70.367,...|  9.2| 8.628324024421936|
|[1.0,18.0,70.367,...|  9.2| 8.578768923107733|
|[1.0,26.0,47.262,...|  6.7| 6.089256787357076|
|[1.0,28.0,46.052,...|  6.6| 5.972764828935551|
|[2.0,8.0,77.499,1...|  9.0| 8.618063283945574|
|[2.0,9.0,113.0,26...|12.38|11.363137372891694|
|[2.0,9.0,116.0,26...| 11.0| 11.16289679428985|
|[2.0,10.0,91.6270...|  9.0|  9.23943682023321|
|[3.0,14.0,63.0,14...| 5.61| 6.652084063654064|
|[4.0,6.0,93.0,23....|11.09|10.691103540577021|
|[5.0,10.0,105.0,2...|10.68| 11.33568311227509|
|[5.0,17.0,75.166,...| 7.66| 8.479200332

In [28]:
test_results.rootMeanSquaredError

0.7085529233099417

In [33]:
final_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              158|
|   mean|7.794177215189873|
| stddev|3.503486564627034|
|    min|             0.59|
|    max|             21.0|
+-------+-----------------+



In [29]:
#take only features from test data
unlabeled_data = test_data.select('features')

In [30]:
predictions = lr_model.transform(unlabeled_data)

In [31]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.0,6.0,158.0,43...|13.842269590139162|
|[0.0,14.0,138.0,3...|12.883256076838876|
|[0.0,17.0,70.0,20...| 7.477358486582743|
|[0.0,23.0,48.563,...| 5.610826847998489|
|[1.0,9.0,88.5,21....| 9.598536717841034|
|[1.0,10.0,110.0,2...|12.138217910678437|
|[1.0,15.0,70.367,...| 8.628324024421936|
|[1.0,18.0,70.367,...| 8.578768923107733|
|[1.0,26.0,47.262,...| 6.089256787357076|
|[1.0,28.0,46.052,...| 5.972764828935551|
|[2.0,8.0,77.499,1...| 8.618063283945574|
|[2.0,9.0,113.0,26...|11.363137372891694|
|[2.0,9.0,116.0,26...| 11.16289679428985|
|[2.0,10.0,91.6270...|  9.23943682023321|
|[3.0,14.0,63.0,14...| 6.652084063654064|
|[4.0,6.0,93.0,23....|10.691103540577021|
|[5.0,10.0,105.0,2...| 11.33568311227509|
|[5.0,17.0,75.166,...| 8.479200332536706|
|[6.0,5.0,122.0,28...| 6.010442977086965|
|[6.0,11.0,91.0,20...| 9.314784429276884|
+--------------------+------------

In [32]:
test_results.r2

0.951598780025373

In [86]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
import pandas as pd 
