In [96]:
# spark installed in different folder on VM, locating it
import findspark
findspark.init('/home/ionut/spark-2.1.1-bin-hadoop2.7/')
from pyspark.sql import SparkSession

In [97]:
# intiating session
spark = SparkSession.builder.appName('linreg').getOrCreate()

In [140]:
# importing pandas to read csv - Spark sometimes makes weird changes to it
import pandas as pd

In [None]:
# this to import through Spark
# df = spark.read.csv('cruise_ship_info.csv',inferSchema=True,header=True)

In [99]:
df = pd.read_csv('cruise_ship_info.csv')

In [144]:
df.head()

Unnamed: 0,Ship_name,Cruise_line,Age,Tonnage,passengers,length,cabins,passenger_density,crew
0,Journey,Azamara,6,30.277,6.94,5.94,3.55,42.64,3.55
1,Quest,Azamara,6,30.277,6.94,5.94,3.55,42.64,3.55
2,Celebration,Carnival,26,47.262,14.86,7.22,7.43,31.8,6.7
3,Conquest,Carnival,11,110.0,29.74,9.53,14.88,36.99,19.1
4,Destiny,Carnival,17,101.353,26.42,8.92,13.21,38.36,10.0


In [146]:
df.describe()

Unnamed: 0,Age,Tonnage,passengers,length,cabins,passenger_density,crew
count,158.0,158.0,158.0,158.0,158.0,158.0,158.0
mean,15.689873,71.284671,18.457405,8.130633,8.83,39.900949,7.794177
std,7.615691,37.22954,9.677095,1.793474,4.471417,8.639217,3.503487
min,4.0,2.329,0.66,2.79,0.33,17.7,0.59
25%,10.0,46.013,12.535,7.1,6.1325,34.57,5.48
50%,14.0,71.899,19.5,8.555,9.57,39.085,8.15
75%,20.0,90.7725,24.845,9.51,10.885,44.185,9.99
max,48.0,220.0,54.0,11.82,27.0,71.43,21.0


In [147]:
# will need to change Cruise Line to numeric categorical variables
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 158 entries, 0 to 157
Data columns (total 9 columns):
Ship_name            158 non-null object
Cruise_line          158 non-null object
Age                  158 non-null int64
Tonnage              158 non-null float64
passengers           158 non-null float64
length               158 non-null float64
cabins               158 non-null float64
passenger_density    158 non-null float64
crew                 158 non-null float64
dtypes: float64(6), int64(1), object(2)
memory usage: 11.2+ KB


In [101]:
# moving pandas df to spark
data = spark.createDataFrame(df)

In [102]:
# StringIndexer to change to numeric categories for Cruise Line
from pyspark.ml.feature import StringIndexer

In [103]:
# selecting target column and output
indexer = StringIndexer(inputCol='Cruise_line', outputCol='Cruise_indexer')

In [115]:
# initiating transformation and creating new df
indexed = indexer.fit(data).transform(data)

In [141]:
# visualising changes
indexed.head(1)

[Row(Age=6, Tonnage=30.277, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, Cruise_indexer=16.0)]

In [121]:
# dropping ship name and cruise line as they will not be used
indexed = indexed.drop('Ship_name')
indexed = indexed.drop('Cruise_line')

In [123]:
# schema looks fine, proceed to fitting model
indexed.printSchema()

root
 |-- Age: long (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Cruise_indexer: double (nullable = true)



In [124]:
indexed.columns

['Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_indexer']

In [125]:
# grouping features together in one column using VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [126]:
assembler = VectorAssembler(inputCols=['Age','Tonnage','passengers','length','cabins','passenger_density','Cruise_indexer'],
                            outputCol='features')

In [127]:
# applying assembler
output = assembler.transform(indexed)

In [128]:
# selecting the features and labels for the final df
final_output = output.select('features','crew')

In [131]:
# split the data
train, test = final_output.randomSplit([0.7,0.3])

In [132]:
# importing linear regression model
from pyspark.ml.regression import LinearRegression

In [133]:
# pointing the name of the labels column, as it is different from the default one
lr = LinearRegression(labelCol='crew')

In [134]:
# fitting linear regression
lr_model = lr.fit(train)

In [136]:
# evaluating test results
test_results = lr_model.evaluate(test)

In [143]:
# predictions evaluation
print('R squared: ', test_results.r2)
print('Mean squared error: ', test_results.rootMeanSquaredError)
final_output.describe().show()

R squared:  0.901218017186186
Mean squared error:  1.0445964013215396
+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              158|
|   mean|7.794177215189873|
| stddev|3.503486564627034|
|    min|             0.59|
|    max|             21.0|
+-------+-----------------+

