In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Linear Regression').getOrCreate()

In [3]:
spark

In [6]:
data = spark.read.csv('/content/CrewMemebers.csv',
                      header=True, inferSchema=True)

In [7]:
data.show()

+---+------+----------+------+----------+----------------+-------+------------+
|Age|Cabins|CruiseType|Length|Passengers|PassengerDensity|Tonnage|CrewRequired|
+---+------+----------+------+----------+----------------+-------+------------+
| 22|  0.33|     Costa|   2.8|      0.66|           50.62|  3.341|        0.59|
| 22|  0.33|     Costa|  2.79|      0.66|           50.62|  3.341|        0.59|
| 12|  0.45|     Costa|  2.96|      0.94|           24.78|  2.329|         0.6|
| 27|  0.74|     Costa|   4.4|      1.67|           32.04|   5.35|        0.88|
| 25|  0.74|     Costa|   4.4|      1.58|           33.86|   5.35|        0.88|
| 27|  0.88|     Costa|  4.36|      3.94|           31.73|   12.5|        1.46|
| 21|  1.04|     Costa|   4.4|      2.08|           48.08|   10.0|         1.6|
| 27|  1.04|     Costa|   4.4|      2.08|           48.08|   10.0|         1.6|
| 24|  1.04|     Costa|   4.4|      2.08|           48.08|   10.0|         1.6|
| 23|  1.56|     Costa|  6.17|      3.08

In [8]:
data.groupBy('CruiseType').count().orderBy('count').show()

+----------+-----+
|CruiseType|count|
+----------+-----+
|      Star|   15|
| Celebrity|   25|
|     Costa|   28|
|  Carnival|   45|
|  Princess|   45|
+----------+-----+



In [9]:
from pyspark.ml.feature import StringIndexer

In [10]:
indexer = StringIndexer(inputCol='CruiseType', outputCol='CruiseType_Label')

In [11]:
data = indexer.fit(data).transform(data)

In [12]:
data.show(5)

+---+------+----------+------+----------+----------------+-------+------------+----------------+
|Age|Cabins|CruiseType|Length|Passengers|PassengerDensity|Tonnage|CrewRequired|CruiseType_Label|
+---+------+----------+------+----------+----------------+-------+------------+----------------+
| 22|  0.33|     Costa|   2.8|      0.66|           50.62|  3.341|        0.59|             2.0|
| 22|  0.33|     Costa|  2.79|      0.66|           50.62|  3.341|        0.59|             2.0|
| 12|  0.45|     Costa|  2.96|      0.94|           24.78|  2.329|         0.6|             2.0|
| 27|  0.74|     Costa|   4.4|      1.67|           32.04|   5.35|        0.88|             2.0|
| 25|  0.74|     Costa|   4.4|      1.58|           33.86|   5.35|        0.88|             2.0|
+---+------+----------+------+----------+----------------+-------+------------+----------------+
only showing top 5 rows



In [13]:
data = data.drop('CruiseType').withColumnRenamed('CruiseType_Label',
                                                 'CruiseType')

In [14]:
data.take(3)

[Row(Age=22, Cabins=0.33, Length=2.8, Passengers=0.66, PassengerDensity=50.62, Tonnage=3.341, CrewRequired=0.59, CruiseType=2.0),
 Row(Age=22, Cabins=0.33, Length=2.79, Passengers=0.66, PassengerDensity=50.62, Tonnage=3.341, CrewRequired=0.59, CruiseType=2.0),
 Row(Age=12, Cabins=0.45, Length=2.96, Passengers=0.94, PassengerDensity=24.78, Tonnage=2.329, CrewRequired=0.6, CruiseType=2.0)]

In [15]:
from pyspark.ml.feature import VectorAssembler

In [16]:
features = VectorAssembler(inputCols=['Age',
 'Cabins',
 'CruiseType',
 'Length',
 'Passengers',
 'PassengerDensity',
 'Tonnage',], outputCol='Features')

In [17]:
data = features.transform(data)

In [18]:
modelData = data.select(['Features', 'CrewRequired'])

In [19]:
modelData.show()

+--------------------+------------+
|            Features|CrewRequired|
+--------------------+------------+
|[22.0,0.33,2.0,2....|        0.59|
|[22.0,0.33,2.0,2....|        0.59|
|[12.0,0.45,2.0,2....|         0.6|
|[27.0,0.74,2.0,4....|        0.88|
|[25.0,0.74,2.0,4....|        0.88|
|[27.0,0.88,2.0,4....|        1.46|
|[21.0,1.04,2.0,4....|         1.6|
|[27.0,1.04,2.0,4....|         1.6|
|[24.0,1.04,2.0,4....|         1.6|
|[23.0,1.56,2.0,6....|         1.8|
|[19.0,1.48,2.0,5....|        1.97|
|[19.0,1.48,2.0,5....|         2.1|
|[16.0,1.6,2.0,5.1...|        2.11|
|[12.0,1.94,2.0,5....|        2.87|
|[13.0,1.94,2.0,5....|        2.95|
|[36.0,3.83,2.0,5....|        2.97|
|[14.0,2.45,2.0,5....|        3.24|
|[48.0,4.25,2.0,5....|         3.5|
|[6.0,3.55,2.0,5.9...|        3.55|
|[6.0,3.55,2.0,5.9...|        3.55|
+--------------------+------------+
only showing top 20 rows



In [20]:
train, test = modelData.randomSplit([.7,.3], seed = 2529)

In [21]:
from pyspark.ml.regression import LinearRegression

In [22]:
linearRegressionModel = LinearRegression(featuresCol='Features',
                                         labelCol='CrewRequired')

In [23]:
model = linearRegressionModel.fit(train)

In [24]:
model.coefficients, model.intercept

(DenseVector([-0.0106, 0.8446, 0.1815, 0.3497, -0.1585, 0.0039, 0.0148]),
 -0.927041345031894)

In [25]:
yPrediction = model.transform(test.select('Features'))

In [26]:
yPrediction.show()

+--------------------+------------------+
|            Features|        prediction|
+--------------------+------------------+
|[5.0,10.22,3.0,9....| 9.565700841109525|
|[5.0,18.17,4.0,11...|15.760250373502673|
|[6.0,10.29,3.0,9....| 9.952165053736875|
|[6.0,11.97,0.0,9....|10.226287046308107|
|[6.0,18.0,4.0,11....| 14.42487661572244|
|[7.0,15.57,4.0,9....|13.149254856703376|
|[8.0,11.22,0.0,9....| 9.786330267943683|
|[8.0,14.87,0.0,9....|11.930677736605881|
|[9.0,10.62,0.0,9....| 9.419477985250465|
|[9.0,13.0,0.0,9.5...|11.051838370510529|
|[9.0,13.37,4.0,9....|11.919444047393705|
|[9.0,13.37,4.0,9....|11.919444047393705|
|[10.0,13.56,0.0,8...|10.924494284833893|
|[10.0,15.57,0.0,1...|12.963094266855252|
|[11.0,10.5,3.0,9....|  9.24204399300724|
|[12.0,3.54,1.0,7....| 4.502820462041679|
|[12.0,7.83,1.0,8....|  7.15184623816765|
|[12.0,9.75,0.0,9....| 8.854713495816261|
|[12.0,11.62,0.0,9...| 10.23223866611498|
|[13.0,1.94,2.0,5....|3.0413664328561936|
+--------------------+------------

In [27]:
testResult = model.evaluate(test)

In [28]:
testResult.rootMeanSquaredError

0.9753630331281856

In [29]:
testResult.meanSquaredError

0.9513330463930141

In [30]:
testResult.meanAbsoluteError

0.692428067881579

In [31]:
testResult.r2

0.9233256294119825

In [32]:
testResult.r2adj

0.908819667408844

In [33]:
testResult.predictions.show()

+--------------------+------------+------------------+
|            Features|CrewRequired|        prediction|
+--------------------+------------+------------------+
|[5.0,10.22,3.0,9....|         8.0| 9.565700841109525|
|[5.0,18.17,4.0,11...|        13.6|15.760250373502673|
|[6.0,10.29,3.0,9....|         9.0| 9.952165053736875|
|[6.0,11.97,0.0,9....|       11.09|10.226287046308107|
|[6.0,18.0,4.0,11....|        13.6| 14.42487661572244|
|[7.0,15.57,4.0,9....|        12.0|13.149254856703376|
|[8.0,11.22,0.0,9....|        11.0| 9.786330267943683|
|[8.0,14.87,0.0,9....|        11.6|11.930677736605881|
|[9.0,10.62,0.0,9....|        10.3| 9.419477985250465|
|[9.0,13.0,0.0,9.5...|        11.0|11.051838370510529|
|[9.0,13.37,4.0,9....|       12.38|11.919444047393705|
|[9.0,13.37,4.0,9....|       12.38|11.919444047393705|
|[10.0,13.56,0.0,8...|       10.68|10.924494284833893|
|[10.0,15.57,0.0,1...|       11.85|12.963094266855252|
|[11.0,10.5,3.0,9....|        8.48|  9.24204399300724|
|[12.0,3.5

In [34]:
testResult.residuals.show()

+--------------------+
|           residuals|
+--------------------+
| -1.5657008411095248|
| -2.1602503735026737|
| -0.9521650537368753|
|   0.863712953691893|
| -0.8248766157224399|
| -1.1492548567033758|
|  1.2136697320563172|
|-0.33067773660588173|
|  0.8805220147495358|
|-0.05183837051052...|
| 0.46055595260629545|
| 0.46055595260629545|
| -0.2444942848338929|
| -1.1130942668552528|
| -0.7620439930072394|
|-0.05282046204167923|
|-0.15184623816764997|
|   1.135286504183739|
|   -0.93223866611498|
|-0.09136643285619339|
+--------------------+
only showing top 20 rows



In [35]:
spark.stop()