## Category 8: Scalable Machine Learning - Linear Regression using Spark

> You've been contracted by Hyundai Heavy Industries to help them build a predictive model for some ships. Hyundai Heavy Industries is one of the world's largest ship manufacturing companies and builds cruise liners. You've been flown to their headquarters in Ulsan, South Korea to help them give accurate estimates of how many crew members a ship will require.
They are currently building new ships for some customers and want you to create a model and use it to predict how many crew members the ships will need.

> Here is what the data looks like: Description: Measurements of ship size, capacity, crew, and age for 158 cruise ships.

> ** Variables/Columns **
- Ship Name     1-20
- Cruise Line   21-40
- Age (as of 2013)   46-48
- Tonnage (1000s of tons)   50-56
- passengers (100s)   58-64
- Length (100s of feet)  66-72
- Cabins  (100s)   74-80
- Passenger Density   82-88
- Crew  (100s)   90-96

** Step 1: Import Pyspark and start the session **

In [39]:
import findspark
findspark.init('C:\spark')
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lr_crewprediction').getOrCreate()

In [40]:
from pyspark.ml.regression import LinearRegression

** Step 2: Read in the training data and explore **

In [41]:
# Use Spark to read in the training data
ships = spark.read.csv("cruise_ship_train.csv",inferSchema=True,header=True)

In [42]:
# Print the Schema of the DataFrame
ships.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [43]:
ships.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [44]:
ships.head()

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)

In [45]:
for ship in ships.head(5):
    print(ship)
    print("\n")

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)


Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)


Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, Tonnage=47.262, passengers=14.86, length=7.22, cabins=7.43, passenger_density=31.8, crew=6.7)


Row(Ship_name='Conquest', Cruise_line='Carnival', Age=11, Tonnage=110.0, passengers=29.74, length=9.53, cabins=14.88, passenger_density=36.99, crew=19.1)


Row(Ship_name='Destiny', Cruise_line='Carnival', Age=17, Tonnage=101.353, passengers=26.42, length=8.92, cabins=13.21, passenger_density=38.36, crew=10.0)




In [46]:
ships.groupBy('Cruise_line').count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
| Holland_American|   14|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|  Royal_Caribbean|   10|
+-----------------+-----+



In [47]:
ships.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

In [48]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Cruise_line", outputCol="CruiseLineIndex")
ships = indexer.fit(ships).transform(ships)
ships.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|CruiseLineIndex|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|            0.0|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|            0.0|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|            0.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|

In [49]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [50]:
assembler = VectorAssembler(
    inputCols=["Age", "Tonnage", "passengers",
               "length","cabins","passenger_density"],
    outputCol="numerical_features")

In [51]:
ships = assembler.transform(ships)
ships.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|CruiseLineIndex|  numerical_features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|[6.0,30.276999999...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|[6.0,30.276999999...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|            0.0|[26.0,47.262,14.8...|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|            0.0|[11.0,110.0,29.74...|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8

In [52]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="numerical_features",
                        outputCol="scaled_numerical_features",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(ships)

# Normalize each feature to have unit standard deviation.
ships = scalerModel.transform(ships)
ships.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+--------------------+-------------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|CruiseLineIndex|  numerical_features|scaled_numerical_features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+--------------------+-------------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|[6.0,30.276999999...|     [0.76548338226047...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|[6.0,30.276999999...|     [0.76548338226047...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|            0.0|[26.0,47.262,14.8...|     [3.31709465646204...|
|   Conquest|   Carnival| 11|           

** Step 3: Setup the dataframe for Machine Learning **

In [53]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns
# ("label","features")
assembler = VectorAssembler(
    inputCols=["scaled_numerical_features","CruiseLineIndex"],
    outputCol="features")

In [54]:
shipsFinal = assembler.transform(ships)

In [55]:
shipsFinal.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+--------------------+-------------------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|CruiseLineIndex|  numerical_features|scaled_numerical_features|            features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+---------------+--------------------+-------------------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|[6.0,30.276999999...|     [0.76548338226047...|[0.76548338226047...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           12.0|[6.0,30.276999999...|     [0.76548338226047...|[0.76548338226047...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7

In [56]:
final_data = shipsFinal.select("features",'crew')

In [57]:
train_data,test_data = final_data.randomSplit([0.8,0.2])

In [58]:
train_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              106|
|   mean| 8.05245283018869|
| stddev|2.867268423437111|
|    min|              0.6|
|    max|             13.6|
+-------+-----------------+



In [59]:
test_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|               23|
|   mean|8.443478260869563|
| stddev|3.654825712416708|
|    min|             3.55|
|    max|             19.1|
+-------+-----------------+



** Step 4: Perform Machine Learning **

In [60]:
# Create a Linear Regression Model object
lr = LinearRegression(labelCol='crew')

In [61]:
# Fit the model to the data and call this model lrModel
lrModel = lr.fit(train_data)

In [62]:
# Print the coefficients and intercept for linear regression
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [-0.0746549583622,0.0210093186735,-0.329535986141,0.86395614975,2.24889167554,0.113933937898,0.00130756968944] Intercept: -1.9614311761858967


In [63]:
test_results = lrModel.evaluate(test_data)

In [64]:
# Interesting results....
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|6.376340023432903E-4|
| -0.3743970384759421|
| -0.5594560369944777|
|  1.2918705092364409|
|-0.38544177771136745|
| -0.9059779778791821|
|  0.7221487095357286|
|    7.31465341888387|
|-0.49810573628670607|
|-0.07129351064908995|
|  0.6562234197638173|
|-0.05682172491318571|
| 0.21457921707809957|
| -1.2618964611452643|
|   0.682828174517228|
|  0.5373521619429908|
|    0.54687668361459|
| -1.0711961547324895|
|    0.56592572695779|
|  0.3775354697549238|
+--------------------+
only showing top 20 rows



In [65]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("RSquared: {}".format(test_results.r2))

RMSE: 1.6503107713336111
MSE: 2.7235256419797387
RSquared: 0.7868411932070525


** Step 5: Predict on Test data **

In [66]:
# Use Spark to read in the testing data
ships_test = spark.read.csv("cruise_ship_test.csv",inferSchema=True,header=True)

In [67]:
indexer = StringIndexer(inputCol="Cruise_line", outputCol="CruiseLineIndex")
ships_test = indexer.fit(ships_test).transform(ships_test)
ships_test.show()

+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+
|Ship_name|    Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density| crew|CruiseLineIndex|
+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+
|  Liberty|Royal_Caribbean|  6|  158.0|      43.7| 11.25|  18.0|            36.16| 13.6|            0.0|
|  Majesty|Royal_Caribbean| 21| 73.941|     27.44|   8.8| 11.75|            26.95| 8.22|            0.0|
|  Mariner|Royal_Caribbean| 10|  138.0|     31.14|  10.2| 15.57|            44.32|11.85|            0.0|
|  Monarch|Royal_Caribbean| 22| 73.941|     27.44|   8.8| 11.77|            30.94| 8.22|            0.0|
|Navigator|Royal_Caribbean| 11|  138.0|     31.14|  10.2| 15.57|            44.32|11.85|            0.0|
|    Oasis|Royal_Caribbean|  4|  220.0|      54.0| 11.82|  27.0|            40.74| 21.0|            0.0|
| Radiance|Royal_Caribbean| 12|  90.09|     25.01|  9.6

In [68]:
assembler = VectorAssembler(
    inputCols=["Age", "Tonnage", "passengers",
               "length","cabins","passenger_density"],
    outputCol="numerical_features")

In [69]:
ships_test = assembler.transform(ships_test)
ships_test.show()

+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+--------------------+
|Ship_name|    Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density| crew|CruiseLineIndex|  numerical_features|
+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+--------------------+
|  Liberty|Royal_Caribbean|  6|  158.0|      43.7| 11.25|  18.0|            36.16| 13.6|            0.0|[6.0,158.0,43.7,1...|
|  Majesty|Royal_Caribbean| 21| 73.941|     27.44|   8.8| 11.75|            26.95| 8.22|            0.0|[21.0,73.941,27.4...|
|  Mariner|Royal_Caribbean| 10|  138.0|     31.14|  10.2| 15.57|            44.32|11.85|            0.0|[10.0,138.0,31.14...|
|  Monarch|Royal_Caribbean| 22| 73.941|     27.44|   8.8| 11.77|            30.94| 8.22|            0.0|[22.0,73.941,27.4...|
|Navigator|Royal_Caribbean| 11|  138.0|     31.14|  10.2| 15.57|            44.32|11.85|            0.0|[11.0,138.0,31

In [70]:
scaler = StandardScaler(inputCol="numerical_features",
                        outputCol="scaled_numerical_features",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(ships_test)

# Normalize each feature to have unit standard deviation.
ships_test = scalerModel.transform(ships_test)
ships_test.show()

+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+--------------------+-------------------------+
|Ship_name|    Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density| crew|CruiseLineIndex|  numerical_features|scaled_numerical_features|
+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+--------------------+-------------------------+
|  Liberty|Royal_Caribbean|  6|  158.0|      43.7| 11.25|  18.0|            36.16| 13.6|            0.0|[6.0,158.0,43.7,1...|     [0.94731348381523...|
|  Majesty|Royal_Caribbean| 21| 73.941|     27.44|   8.8| 11.75|            26.95| 8.22|            0.0|[21.0,73.941,27.4...|     [3.31559719335332...|
|  Mariner|Royal_Caribbean| 10|  138.0|     31.14|  10.2| 15.57|            44.32|11.85|            0.0|[10.0,138.0,31.14...|     [1.57885580635872...|
|  Monarch|Royal_Caribbean| 22| 73.941|     27.44|   8.8| 11.77|            30.94| 8.22|

In [71]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns
# ("label","features")
assembler = VectorAssembler(
    inputCols=["scaled_numerical_features","CruiseLineIndex"],
    outputCol="features")

In [72]:
ships_test_Final = assembler.transform(ships_test)

In [73]:
ships_test_Final.show()

+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+--------------------+-------------------------+--------------------+
|Ship_name|    Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density| crew|CruiseLineIndex|  numerical_features|scaled_numerical_features|            features|
+---------+---------------+---+-------+----------+------+------+-----------------+-----+---------------+--------------------+-------------------------+--------------------+
|  Liberty|Royal_Caribbean|  6|  158.0|      43.7| 11.25|  18.0|            36.16| 13.6|            0.0|[6.0,158.0,43.7,1...|     [0.94731348381523...|[0.94731348381523...|
|  Majesty|Royal_Caribbean| 21| 73.941|     27.44|   8.8| 11.75|            26.95| 8.22|            0.0|[21.0,73.941,27.4...|     [3.31559719335332...|[3.31559719335332...|
|  Mariner|Royal_Caribbean| 10|  138.0|     31.14|  10.2| 15.57|            44.32|11.85|            0.0|[10.0,138.0,31.14...|     [1.57

In [74]:
final_data_test = ships_test_Final.select("features")

In [75]:
predictions = lrModel.transform(final_data_test)

In [76]:
predictions.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|[0.94731348381523...|   7.113161336166961|
|[3.31559719335332...|   4.288233561507203|
|[1.57885580635872...|   6.268263608786496|
|[3.47348277398920...|   4.323437067758272|
|[1.73674138699460...|   6.256476667338136|
|[0.63154232254349...|  10.160454540001936|
|[1.89462696763047...|   4.400673478219362|
|[2.52616929017396...|   4.004229909669224|
|[1.57885580635872...|   4.424247361116082|
|[3.94713951589682...|   4.080175586179859|
|[2.68405487080983...|  3.6004819619861412|
|[2.36828370953809...|   4.016016851117584|
|[2.21039812890222...|   6.221115842993054|
|[3.31559719335332...| 0.03466256198301121|
|[4.26291067716856...|-0.03605908670715019|
|[3.78925393526094...|-6.98262362069268...|
|[2.99982603208158...|  0.5179003535744204|
|[2.05251254826634...|  1.0870010269947694|
|[1.89462696763047...|   1.087127037164676|
|[2.99982603208158...|  0.517900