In [1]:
import findspark

In [2]:
findspark.init('/home/pushya/spark-2.1.0-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('lr_example').getOrCreate()

In [6]:
from pyspark.ml.regression import LinearRegression

In [7]:
# Load training data
data = spark.read.csv("cruise_ship_info.csv",inferSchema=True,header=True)

In [8]:
data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [9]:
data.head(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)]

In [10]:
data.head(2)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55),
 Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)]

In [19]:
data.createOrReplaceTempView('abc')

In [24]:
data.select('Cruise_line').distinct().show()

+-----------------+
|      Cruise_line|
+-----------------+
|            Costa|
|              P&O|
|           Cunard|
|Regent_Seven_Seas|
|              MSC|
|         Carnival|
|          Crystal|
|           Orient|
|         Princess|
|        Silversea|
|         Seabourn|
| Holland_American|
|         Windstar|
|           Disney|
|        Norwegian|
|          Oceania|
|          Azamara|
|        Celebrity|
|             Star|
|  Royal_Caribbean|
+-----------------+



In [25]:
from pyspark.ml.feature import StringIndexer

In [27]:
Indexer =  StringIndexer(inputCol= 'Cruise_line', outputCol='Cruise_line_1')

In [28]:
Indexed =Indexer.fit(data).transform(data)

In [33]:
Indexed.select('Cruise_line_1','Cruise_line').distinct().sort('Cruise_line_1').show()

+-------------+-----------------+
|Cruise_line_1|      Cruise_line|
+-------------+-----------------+
|          0.0|  Royal_Caribbean|
|          1.0|         Carnival|
|          2.0|         Princess|
|          3.0| Holland_American|
|          4.0|        Norwegian|
|          5.0|            Costa|
|          6.0|        Celebrity|
|          7.0|              MSC|
|          8.0|             Star|
|          9.0|              P&O|
|         10.0|Regent_Seven_Seas|
|         11.0|        Silversea|
|         12.0|          Oceania|
|         13.0|         Seabourn|
|         14.0|         Windstar|
|         15.0|           Cunard|
|         16.0|          Azamara|
|         17.0|           Disney|
|         18.0|          Crystal|
|         19.0|           Orient|
+-------------+-----------------+



In [34]:
Indexed.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Cruise_line_1: double (nullable = true)



In [35]:
Indexed.head(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, Cruise_line_1=16.0)]

In [36]:
Total_data = Indexed.select('Cruise_line_1','Age','Tonnage','Tonnage',
                           'passengers','length','cabins','passenger_density',
                           'crew')

In [37]:
Total_data.printSchema()

root
 |-- Cruise_line_1: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [39]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [44]:
assembler = VectorAssembler(inputCols=['Cruise_line_1',
                                       'Age',
                                       'Tonnage',
                                       'Tonnage',
                                       'passengers',
                                       'length',
                                       'cabins',
                                       'passenger_density']
                                ,outputCol='features')

In [45]:
output = assembler.transform(Total_data)

In [46]:
output.printSchema()

root
 |-- Cruise_line_1: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- features: vector (nullable = true)



In [48]:
final_data = output.select('features','crew')

In [50]:
final_data.show()

+--------------------+----+
|            features|crew|
+--------------------+----+
|[16.0,6.0,30.2769...|3.55|
|[16.0,6.0,30.2769...|3.55|
|[1.0,26.0,47.262,...| 6.7|
|[1.0,11.0,110.0,1...|19.1|
|[1.0,17.0,101.353...|10.0|
|[1.0,22.0,70.367,...| 9.2|
|[1.0,15.0,70.367,...| 9.2|
|[1.0,23.0,70.367,...| 9.2|
|[1.0,19.0,70.367,...| 9.2|
|[1.0,6.0,110.2389...|11.5|
|[1.0,10.0,110.0,1...|11.6|
|[1.0,28.0,46.052,...| 6.6|
|[1.0,18.0,70.367,...| 9.2|
|[1.0,17.0,70.367,...| 9.2|
|[1.0,11.0,86.0,86...| 9.3|
|[1.0,8.0,110.0,11...|11.6|
|[1.0,9.0,88.5,88....|10.3|
|[1.0,15.0,70.367,...| 9.2|
|[1.0,12.0,88.5,88...| 9.3|
|[1.0,20.0,70.367,...| 9.2|
+--------------------+----+
only showing top 20 rows



In [51]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [55]:
train_data.count()


105

In [54]:
test_data.count()

53

In [56]:
from pyspark.ml.regression import LinearRegression 

In [59]:
lr = LinearRegression(labelCol='crew')

In [60]:
lr_model= lr.fit(train_data)

In [61]:
test_results = lr_model.evaluate(test_data)

In [63]:
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
| -0.4188747886761366|
| -0.3410632707280534|
| -0.9531812951925396|
| -0.9828483449025196|
|  -1.707395693876224|
| -0.9517468474571835|
| -0.4287863150202993|
| -0.6825877034902721|
|-0.30580305422562937|
|-0.17659486763905186|
|   7.238581659619982|
|  0.8488441574892889|
| -0.8907063427080271|
|  0.6778700570278495|
|  0.7180920238878628|
|  0.7382030073178694|
|  0.8286288307309686|
|  0.6303611028847165|
|   0.250040783728652|
| 0.06021159595415071|
+--------------------+
only showing top 20 rows



In [78]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R2: {}".format(test_results.r2))

RMSE: 1.398395890936903
MSE: 1.955511067789215
R2: 0.836831119681319


In [67]:
unlabeled_data=test_data.select('features')

In [71]:
predictions = lr_model.transform(unlabeled_data)

In [77]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.0,6.0,158.0,15...|14.018874788676136|
|[0.0,7.0,158.0,15...|13.941063270728053|
|[0.0,10.0,138.0,1...| 12.80318129519254|
|[0.0,13.0,138.0,1...| 12.74284834490252|
|[0.0,15.0,78.491,...| 8.307395693876224|
|[0.0,16.0,74.137,...| 8.551746847457183|
|[0.0,17.0,70.0,70...|7.6287863150202995|
|[0.0,21.0,73.941,...| 8.902587703490273|
|[1.0,8.0,110.0,11...|11.905803054225629|
|[1.0,11.0,86.0,86...| 9.476594867639053|
|[1.0,11.0,110.0,1...| 11.86141834038002|
|[1.0,12.0,88.5,88...|  9.44115584251071|
|[1.0,12.0,88.5,88...|10.190706342708028|
|[1.0,15.0,70.367,...|  8.52212994297215|
|[1.0,17.0,70.367,...| 8.481907976112137|
|[1.0,18.0,70.367,...|  8.46179699268213|
|[1.0,23.0,70.367,...|  8.37137116926903|
|[1.0,26.0,47.262,...| 6.069638897115284|
|[2.0,15.0,108.806...|10.849959216271348|
|[2.0,16.0,77.499,...|  8.93978840404585|
+--------------------+------------

In [73]:
test_data.show()

+--------------------+-----+
|            features| crew|
+--------------------+-----+
|[0.0,6.0,158.0,15...| 13.6|
|[0.0,7.0,158.0,15...| 13.6|
|[0.0,10.0,138.0,1...|11.85|
|[0.0,13.0,138.0,1...|11.76|
|[0.0,15.0,78.491,...|  6.6|
|[0.0,16.0,74.137,...|  7.6|
|[0.0,17.0,70.0,70...|  7.2|
|[0.0,21.0,73.941,...| 8.22|
|[1.0,8.0,110.0,11...| 11.6|
|[1.0,11.0,86.0,86...|  9.3|
|[1.0,11.0,110.0,1...| 19.1|
|[1.0,12.0,88.5,88...|10.29|
|[1.0,12.0,88.5,88...|  9.3|
|[1.0,15.0,70.367,...|  9.2|
|[1.0,17.0,70.367,...|  9.2|
|[1.0,18.0,70.367,...|  9.2|
|[1.0,23.0,70.367,...|  9.2|
|[1.0,26.0,47.262,...|  6.7|
|[2.0,15.0,108.806...| 11.1|
|[2.0,16.0,77.499,...|  9.0|
+--------------------+-----+
only showing top 20 rows



In [82]:
# R2 of 0.86 is pretty good, let's check the data a little closer
from pyspark.sql.functions import corr

In [83]:
data.select(corr('crew','passengers')).show()

+----------------------+
|corr(crew, passengers)|
+----------------------+
|    0.9152341306065384|
+----------------------+



In [85]:
data.select(corr('crew','cabins')).show()

+------------------+
|corr(crew, cabins)|
+------------------+
|0.9508226063578497|
+------------------+

