In [1]:
import findspark
findspark.init('/home/raj/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('linreg').getOrCreate()

In [6]:
cruiseData=spark.read.csv('/home/raj/Documents/Udemy-Spark/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Linear_Regression/cruise_ship_info.csv',
                       header=True,inferSchema=True)

In [7]:
cruiseData.show(5)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
only showing top 5 rows



In [8]:
cruiseData.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [9]:
from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import VectorAssembler

In [10]:
cruiseData.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

In [17]:
assembler =VectorAssembler(inputCols=[ 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density'],outputCol='cruiseFeatures' )

In [18]:
cruiseFeatures=assembler.transform(cruiseData)

In [20]:
cruiseFeatures.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|      cruiseFeatures|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|[6.0,30.276999999...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|[6.0,30.276999999...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|[26.0,47.262,14.8...|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|[11.0,110.0,29.74...|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|[17.0,101.353,26....|
|    Ecstasy|   Carnival| 22|            70.367|     20.

In [21]:
cleanedData=cruiseFeatures.select('cruiseFeatures','crew')

In [22]:
cleanedData.show()

+--------------------+----+
|      cruiseFeatures|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
|[22.0,70.367,20.5...| 9.2|
|[15.0,70.367,20.5...| 9.2|
|[23.0,70.367,20.5...| 9.2|
|[19.0,70.367,20.5...| 9.2|
|[6.0,110.23899999...|11.5|
|[10.0,110.0,29.74...|11.6|
|[28.0,46.052,14.5...| 6.6|
|[18.0,70.367,20.5...| 9.2|
|[17.0,70.367,20.5...| 9.2|
|[11.0,86.0,21.24,...| 9.3|
|[8.0,110.0,29.74,...|11.6|
|[9.0,88.5,21.24,9...|10.3|
|[15.0,70.367,20.5...| 9.2|
|[12.0,88.5,21.24,...| 9.3|
|[20.0,70.367,20.5...| 9.2|
+--------------------+----+
only showing top 20 rows



In [23]:
trainData,testData=cleanedData.randomSplit([0.75,0.25])

In [25]:
trainData.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               119|
|   mean|  7.65638655462186|
| stddev|3.2583472287140514|
|    min|              0.59|
|    max|              13.6|
+-------+------------------+



In [27]:
testData.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|               39|
|   mean|8.214615384615385|
| stddev|4.183755456118247|
|    min|             0.59|
|    max|             21.0|
+-------+-----------------+



In [29]:
from pyspark.ml.regression import LinearRegression
linReg= LinearRegression(labelCol='crew',featuresCol='cruiseFeatures')

In [30]:
linRegModel=linReg.fit(trainData)

In [31]:
testResults=linRegModel.evaluate(testData)

In [35]:
testResults.r2

0.8899825284294947

In [37]:
testResults.predictions.show()

+--------------------+-----+------------------+
|      cruiseFeatures| crew|        prediction|
+--------------------+-----+------------------+
|[4.0,220.0,54.0,1...| 21.0| 19.74123996217489|
|[5.0,86.0,21.04,9...|  8.0| 9.415042541351369|
|[5.0,122.0,28.5,1...|  6.7| 7.297502025076213|
|[6.0,30.276999999...| 3.55| 4.143441633133522|
|[6.0,158.0,43.7,1...| 13.6|13.813356536937706|
|[7.0,116.0,31.0,9...| 12.0|12.238561884666273|
|[9.0,85.0,19.68,9...| 8.69| 9.191970782194637|
|[10.0,58.825,15.6...|  7.0| 7.303600699421699|
|[10.0,105.0,27.2,...|10.68|10.814632403722364|
|[10.0,138.0,31.14...|11.85|12.757389927394417|
|[11.0,91.0,20.32,...| 9.99| 9.228461668339614|
|[11.0,91.62700000...|  9.0| 9.373192374036302|
|[11.0,108.977,26....| 12.0|10.908349848312763|
|[11.0,110.0,29.74...| 19.1| 11.75783515956023|
|[12.0,91.0,20.32,...| 9.99| 9.202899534003492|
|[13.0,30.27699999...|  4.0|3.8810553075566414|
|[13.0,91.0,20.32,...| 9.99| 9.177337399667369|
|[13.0,101.509,27....| 11.5|10.822335788

In [38]:
testResults.meanSquaredError

1.8763473266440143

In [39]:
testResults.rootMeanSquaredError

1.3697982795448438

In [40]:
from pyspark.ml.feature import StringIndexer

In [42]:
indexer=StringIndexer(inputCol='Cruise_line',outputCol='Cruise_line_Category')

In [43]:
indexedCruiseData=indexer.fit(cruiseData).transform(cruiseData)

In [46]:
indexedCruiseData.show(30)

+-------------+-----------+---+------------------+----------+------+------+-----------------+-----+--------------------+
|    Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density| crew|Cruise_line_Category|
+-------------+-----------+---+------------------+----------+------+------+-----------------+-----+--------------------+
|      Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64| 3.55|                16.0|
|        Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64| 3.55|                16.0|
|  Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8|  6.7|                 1.0|
|     Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99| 19.1|                 1.0|
|      Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36| 10.0|                 1.0|
|      Ecstasy|   Carnival| 22| 

In [47]:
from pyspark.ml.feature import OneHotEncoder

In [48]:
encoder =OneHotEncoder(inputCol='Cruise_line_Category',outputCol='Cruise_line_Features')

In [49]:
encodedCruiseData=encoder.transform(indexedCruiseData)

In [50]:
encodedCruiseData.show(5)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_Category|Cruise_line_Features|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|                16.0|     (19,[16],[1.0])|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|                16.0|     (19,[16],[1.0])|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|                 1.0|      (19,[1],[1.0])|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|                 1.0|      (19,[1],[1.0])|
|    Destiny|   Carnival| 1

In [51]:
encodedCruiseData.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_line_Category',
 'Cruise_line_Features']

In [54]:
encodedCruiseVector=VectorAssembler(inputCols=['Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_line_Features'],outputCol='encodedFeatures')

In [55]:
encodedCruiseDf=encodedCruiseVector.transform(encodedCruiseData)

In [56]:
encodedCruiseDf.show(5)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+--------------------+--------------------+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_Category|Cruise_line_Features|     encodedFeatures|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+--------------------+--------------------+--------------------+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|                16.0|     (19,[16],[1.0])|(26,[0,1,2,3,4,5,...|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|                16.0|     (19,[16],[1.0])|(26,[0,1,2,3,4,5,...|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|                 1.0|      (19,[1],[1.0])|(26,[0,1,2,3,4,5,...|
|   Conquest|   Carnival| 11|           

In [57]:
encodedCruiseDfCleamed=encodedCruiseDf.select('encodedFeatures','crew')

In [58]:
encodedCruiseDfCleamed.show(5)

+--------------------+----+
|     encodedFeatures|crew|
+--------------------+----+
|(26,[0,1,2,3,4,5,...|3.55|
|(26,[0,1,2,3,4,5,...|3.55|
|(26,[0,1,2,3,4,5,...| 6.7|
|(26,[0,1,2,3,4,5,...|19.1|
|(26,[0,1,2,3,4,5,...|10.0|
+--------------------+----+
only showing top 5 rows



In [59]:
encodedTrainData,encodedTestData=encodedCruiseDfCleamed.randomSplit([0.75,0.25])

In [60]:
encodedTrainData.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               121|
|   mean| 7.783471074380171|
| stddev|3.3581611314189517|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



In [61]:
linRegEncoded= LinearRegression(labelCol='crew',featuresCol='encodedFeatures')

In [62]:
linRegEncodedModel=linRegEncoded.fit(encodedTrainData)

In [63]:
testResultsEncoded=linRegEncodedModel.evaluate(encodedTestData)

In [65]:
testResultsEncoded.r2

1.0

In [67]:
testResultsEncoded.predictions.show()

+--------------------+-----+------------------+
|     encodedFeatures| crew|        prediction|
+--------------------+-----+------------------+
|(26,[0,1,2,3,4,5,...|11.85|11.849999999999993|
|(26,[0,1,2,3,4,5,...|11.85|11.849999999999993|
|(26,[0,1,2,3,4,5,...|11.85|11.849999999999993|
|(26,[0,1,2,3,4,5,...|  7.6| 7.599999999999998|
|(26,[0,1,2,3,4,5,...|  7.2| 7.200000000000001|
|(26,[0,1,2,3,4,5,...| 11.6|11.599999999999993|
|(26,[0,1,2,3,4,5,...| 19.1|19.100000000000097|
|(26,[0,1,2,3,4,5,...|10.29|10.290000000000006|
|(26,[0,1,2,3,4,5,...| 10.0| 9.999999999999988|
|(26,[0,1,2,3,4,5,...| 12.0|11.999999999999995|
|(26,[0,1,2,3,4,5,...|  9.0| 8.999999999999995|
|(26,[0,1,2,3,4,5,...| 12.0| 12.00000000000001|
|(26,[0,1,2,3,4,5,...|  8.0| 7.999999999999992|
|(26,[0,1,2,3,4,5,...| 5.31|5.3099999999999845|
|(26,[0,1,2,3,4,5,...| 5.57| 5.569999999999999|
|(26,[0,1,2,3,4,5,...| 11.0|11.000000000000009|
|(26,[0,1,2,3,4,5,...|  8.0| 7.999999999999983|
|(26,[0,1,2,3,4,5,...|  6.3| 6.299999999

In [68]:
testResultsEncoded.meanSquaredError

3.565058313384745e-28

In [69]:
from pyspark.sql.functions import corr

In [70]:
cruiseData.select(corr('crew','passengers')).show()

+----------------------+
|corr(crew, passengers)|
+----------------------+
|    0.9152341306065384|
+----------------------+



In [71]:
cruiseData.select(corr('crew','cabins')).show()

+------------------+
|corr(crew, cabins)|
+------------------+
|0.9508226063578497|
+------------------+

