<a href="https://colab.research.google.com/github/somesh-ghaturle/PySpark/blob/master/Linear_Regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 63kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 46.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=72b3f8aacc5e9e6be3d457e7ea127be2b57c2448119bf7d6b88bc49aaa98bc71
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [6]:
import pyspark 
from pyspark.sql import SparkSession 
#SparkSession is now the entry point of Spark 
#SparkSession can also be construed as gateway to spark libraries 

In [7]:
#create instance of spark class 
spark=SparkSession.builder.appName('housing_price_model').getOrCreate() 

In [8]:
#create spark dataframe of input csv file 
df=spark.read.csv('/content/cruise_ship_info.csv'
                  ,inferSchema=True,header=True) 

In [9]:
df

DataFrame[Ship_name: string, Cruise_line: string, Age: int, Tonnage: double, passengers: double, length: double, cabins: double, passenger_density: double, crew: double]

In [10]:
df.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [11]:
#prints structure of dataframe along with datatype 
df.printSchema() 

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [12]:
#In our predictive model, below are the columns 
df.columns 

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

In [13]:
#columns identified as features are as below: 
#['Cruise_line','Age','Tonnage','passengers','length','cabins','passenger_density'] 
#to work on the features, spark MLlib expects every value to be in numeric form 
#feature 'Cruise_line is string datatype 
#using StringIndexer, string type will be typecast to numeric datatype 
#import library strinindexer for typecasting 
  
from pyspark.ml.feature import StringIndexer 
indexer=StringIndexer(inputCol='Cruise_line',outputCol='cruise_cat') 
indexed=indexer.fit(df).transform(df) 
  
#above code will convert string to numeric feature and create a new dataframe 
#new dataframe contains a new feature 'cruise_cat' and can be used further 
#feature cruise_cat is now vectorized and can be used to fed to model 
for item in indexed.head(5): 
    print(item) 
    print('\n') 

Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)


Row(Ship_name='Quest', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)


Row(Ship_name='Celebration', Cruise_line='Carnival', Age=26, Tonnage=47.262, passengers=14.86, length=7.22, cabins=7.43, passenger_density=31.8, crew=6.7, cruise_cat=1.0)


Row(Ship_name='Conquest', Cruise_line='Carnival', Age=11, Tonnage=110.0, passengers=29.74, length=9.53, cabins=14.88, passenger_density=36.99, crew=19.1, cruise_cat=1.0)


Row(Ship_name='Destiny', Cruise_line='Carnival', Age=17, Tonnage=101.353, passengers=26.42, length=8.92, cabins=13.21, passenger_density=38.36, crew=10.0, cruise_cat=1.0)




In [14]:
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler 
#creating vectors from features 
#Apache MLlib takes input if vector form 
assembler=VectorAssembler(inputCols=['Age', 
 'Tonnage', 
 'passengers', 
 'length', 
 'cabins', 
 'passenger_density', 
 'cruise_cat'],outputCol='features') 
output=assembler.transform(indexed) 
output.select('features','crew').show(5) 
#output as below 

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
+--------------------+----+
only showing top 5 rows



In [15]:
#final data consist of features and label which is crew. 
final_data=output.select('features','crew') 
#splitting data into train and test 
train_data,test_data=final_data.randomSplit([0.7,0.3]) 
train_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               109|
|   mean| 7.733302752293589|
| stddev|3.6461222564999387|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



In [16]:
test_data.describe().show() 

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|                49|
|   mean| 7.929591836734693|
| stddev|3.1950703951449926|
|    min|              0.88|
|    max|              13.6|
+-------+------------------+



In [17]:
#import LinearRegression library 
from pyspark.ml.regression import LinearRegression 
#creating an object of class LinearRegression 
#object takes features and label as input arguments 
ship_lr=LinearRegression(featuresCol='features',labelCol='crew') 
#pass train_data to train model 
trained_ship_model=ship_lr.fit(train_data) 
#evaluating model trained for Rsquared error 
ship_results=trained_ship_model.evaluate(train_data) 
  
print('Rsquared Error :',ship_results.r2) 
#R2 value shows accuracy of model is 92% 
#model accuracy is very good and can be use for predictive analysis 

Rsquared Error : 0.9192641497121865


In [18]:
#testing Model on unlabeled data 
#create unlabeled data from test_data 
#testing model on unlabeled data 
unlabeled_data=test_data.select('features') 
unlabeled_data.show(5) 

+--------------------+
|            features|
+--------------------+
|[6.0,90.0,20.0,9....|
|[6.0,110.23899999...|
|[6.0,112.0,38.0,9...|
|[6.0,113.0,37.82,...|
|[6.0,158.0,43.7,1...|
+--------------------+
only showing top 5 rows



In [19]:
predictions=trained_ship_model.transform(unlabeled_data) 
predictions.show() 
#below are the results of output from test data 

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[6.0,90.0,20.0,9....|10.085183395990937|
|[6.0,110.23899999...|11.041930487908328|
|[6.0,112.0,38.0,9...|11.282734936034396|
|[6.0,113.0,37.82,...|11.624760178680154|
|[6.0,158.0,43.7,1...|13.942724164619564|
|[8.0,77.499,19.5,...| 8.498483178246657|
|[8.0,91.0,22.44,9...|10.052075733830627|
|[9.0,81.0,21.44,9...| 9.488605999652357|
|[9.0,113.0,26.74,...|11.355149881091215|
|[9.0,113.0,26.74,...|11.355149881091215|
|[10.0,46.0,7.0,6....|2.5964752951506616|
|[10.0,58.825,15.6...| 7.236968156525389|
|[10.0,86.0,21.14,...| 9.653372184504596|
|[10.0,91.62700000...|  9.11364728891679|
|[10.0,138.0,31.14...|13.050707392547716|
|[11.0,90.0,22.4,9...|10.026937764976722|
|[11.0,138.0,31.14...| 13.04950076469163|
|[12.0,50.0,7.0,7....| 4.234312150109949|
|[12.0,88.5,21.24,...|10.337008070177365|
|[12.0,91.0,20.32,...|   9.1868695542363|
+--------------------+------------