In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('lreg').getOrCreate()

In [3]:
from pyspark.ml.regression import LinearRegression

In [4]:
import os

In [5]:
data_file = os.path.join(os.curdir, 'data', 'sample_linear_regression_data.txt')

In [6]:
training = spark.read.format('libsvm').load(data_file)

In [7]:
training.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

In [8]:
lr = LinearRegression(featuresCol='features', labelCol='label',
                     predictionCol='prediction')

In [9]:
lr_model = lr.fit(training)

In [10]:
lr_model.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [11]:
lr_model.intercept

0.14228558260358093

In [12]:
training_summary = lr_model.summary

In [13]:
training_summary.r2

0.027839179518600154

In [14]:
training_summary.rootMeanSquaredError

10.16309157133015

In [15]:
# the example above took everything as training data which is wrong,
# so lets try with the split

total_data = spark.read.format('libsvm').load(data_file)
training_data, testing_data = total_data.randomSplit([0.7, 0.3])

In [16]:
training_data.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
|-28.571478869743427|(10,[0,1,2,3,4,5,...|
|-26.736207182601724|(10,[0,1,2,3,4,5,...|
|-22.837460416919342|(10,[0,1,2,3,4,5,...|
|-21.432387764165806|(10,[0,1,2,3,4,5,...|
|-20.212077258958672|(10,[0,1,2,3,4,5,...|
|-19.884560774273424|(10,[0,1,2,3,4,5,...|
| -19.66731861537172|(10,[0,1,2,3,4,5,...|
|-19.402336030214553|(10,[0,1,2,3,4,5,...|
|-18.845922472898582|(10,[0,1,2,3,4,5,...|
| -18.27521356600463|(10,[0,1,2,3,4,5,...|
|-17.494200356883344|(10,[0,1,2,3,4,5,...|
|-17.428674570939506|(10,[0,1,2,3,4,5,...|
| -17.32672073267595|(10,[0,1,2,3,4,5,...|
|-17.065399625876015|(10,[0,1,2,3,4,5,...|
| -16.71909683360509|(10,[0,1,2,3,4,5,...|
|-16.692207021311106|(10,[0,1,2,3,4,5,...|
|-16.151349351277112|(10,[0,1,2,3,4,5,...|
|-15.951512565794573|(10,[0,1,2,3,4,5,...|
| -15.86200932757056|(10,[0,1,2,3,4,5,...|
|-15.780685032623301|(10,[0,1,2,3,4,5,...|
+----------

In [17]:
training_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                354|
|   mean| 0.3779468351985323|
| stddev|  9.926059314739335|
|    min|-28.571478869743427|
|    max| 26.903524792043335|
+-------+-------------------+



In [18]:
testing_data.describe().show()

+-------+--------------------+
|summary|               label|
+-------+--------------------+
|  count|                 147|
|   mean|-0.03463863769108...|
| stddev|  11.235881729265715|
|    min| -28.046018037776633|
|    max|   27.78383192005107|
+-------+--------------------+



In [19]:
correct_model = lr.fit(training_data)

In [20]:
test_results = correct_model.evaluate(testing_data)

In [21]:
test_results.rootMeanSquaredError

11.145820935518916

In [22]:
unlabeled_data = testing_data.select('features')

In [23]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [24]:
predictions = correct_model.transform(unlabeled_data)

In [25]:
predictions.show()

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|(10,[0,1,2,3,4,5,...| 0.5739708667781711|
|(10,[0,1,2,3,4,5,...| 0.2667139877745106|
|(10,[0,1,2,3,4,5,...|  1.739204628850703|
|(10,[0,1,2,3,4,5,...| 0.9905816311133762|
|(10,[0,1,2,3,4,5,...|  2.175000267789256|
|(10,[0,1,2,3,4,5,...| 2.3424385872833344|
|(10,[0,1,2,3,4,5,...|  0.783569294119348|
|(10,[0,1,2,3,4,5,...|0.42775376227374984|
|(10,[0,1,2,3,4,5,...|  -2.65098824946657|
|(10,[0,1,2,3,4,5,...| 0.5856119770984458|
|(10,[0,1,2,3,4,5,...| 0.8622437773143061|
|(10,[0,1,2,3,4,5,...| 0.6582382608920628|
|(10,[0,1,2,3,4,5,...| 1.5886934662423458|
|(10,[0,1,2,3,4,5,...|-0.5156440539526038|
|(10,[0,1,2,3,4,5,...|-0.4774615368963048|
|(10,[0,1,2,3,4,5,...|-1.4462448415695643|
|(10,[0,1,2,3,4,5,...| 0.6403626727296705|
|(10,[0,1,2,3,4,5,...|-2.5719444522509254|
|(10,[0,1,2,3,4,5,...|0.07301758850897627|
|(10,[0,1,2,3,4,5,...|-2.6531870557209967|
+----------

In [26]:
ecom_data_file = os.path.join(os.curdir, 'data', 'Ecommerce_Customers.csv')

In [27]:
ecom_data = spark.read.csv(ecom_data_file, inferSchema=True,
                          header=True)

In [28]:
ecom_data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [29]:
item = ecom_data.head(1)[0]
item.asDict()

{'Email': 'mstephenson@fernandez.com',
 'Address': '835 Frank TunnelWrightmouth, MI 82180-9605',
 'Avatar': 'Violet',
 'Avg Session Length': 34.49726772511229,
 'Time on App': 12.65565114916675,
 'Time on Website': 39.57766801952616,
 'Length of Membership': 4.0826206329529615,
 'Yearly Amount Spent': 587.9510539684005}

In [30]:
# import vector assembler and vectors 

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [31]:
ecom_data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [32]:
# vectorization will work only on numerical columns
assembler = VectorAssembler(inputCols=['Avg Session Length',
                                       'Time on App',
                                       'Time on Website',
                                       'Length of Membership'],
                               outputCol='features')



In [33]:
output = assembler.transform(ecom_data)

In [34]:
output.select('features').show()

+--------------------+
|            features|
+--------------------+
|[34.4972677251122...|
|[31.9262720263601...|
|[33.0009147556426...|
|[34.3055566297555...|
|[33.3306725236463...|
|[33.8710378793419...|
|[32.0215955013870...|
|[32.7391429383803...|
|[33.9877728956856...|
|[31.9365486184489...|
|[33.9925727749537...|
|[33.8793608248049...|
|[29.5324289670579...|
|[33.1903340437226...|
|[32.3879758531538...|
|[30.7377203726281...|
|[32.1253868972878...|
|[32.3388993230671...|
|[32.1878120459321...|
|[32.6178560628234...|
+--------------------+
only showing top 20 rows



In [35]:
output.head(1)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005, features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826]))]

In [36]:
final_data = output.select(['features', 'Yearly Amount Spent'])

In [37]:
final_data.head(3)

[Row(features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826]), Yearly Amount Spent=587.9510539684005),
 Row(features=DenseVector([31.9263, 11.1095, 37.269, 2.664]), Yearly Amount Spent=392.2049334443264),
 Row(features=DenseVector([33.0009, 11.3303, 37.1106, 4.1045]), Yearly Amount Spent=487.54750486747207)]

In [38]:
training_data, testing_data = final_data.randomSplit([0.7, 0.3])

In [39]:
training_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                353|
|   mean|  503.5347568778912|
| stddev|  79.21238934442914|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [40]:
lr = LinearRegression(labelCol='Yearly Amount Spent')

In [41]:
lr_model = lr.fit(training_data)

In [42]:
test_results = lr_model.evaluate(testing_data)

In [43]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|  9.548040011913372|
|-13.015877423767165|
|  9.908448452407697|
|-18.733379845422974|
| -4.883238482667991|
| -6.503161419306537|
|   8.82563240826994|
| 2.2721020309444384|
| 2.0852228073894707|
| -6.271950221509087|
| -6.043070569242104|
|  -10.0795428415949|
|-17.804288834328872|
| -14.45311380712019|
|-26.826489253056593|
|-7.1977647944926275|
| -2.992913023858307|
| -6.605995123575838|
|-2.2893541116071674|
|-11.345129404639351|
+-------------------+
only showing top 20 rows



In [44]:
test_results.rootMeanSquaredError

10.771144390601414

In [45]:
test_results.r2

0.9812396717937315

In [46]:
cruise_data_file = os.path.join(os.curdir, 'data', 'cruise_ship_info.csv')

In [47]:
cruise_data = spark.read.csv(cruise_data_file, inferSchema=True,
                          header=True)

In [48]:
cruise_data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [50]:
cruise_data.groupBy('Cruise_line').count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
|        Silversea|    4|
|         Seabourn|    3|
| Holland_American|   14|
|         Windstar|    3|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|             Star|    6|
|  Royal_Caribbean|   23|
+-----------------+-----+



In [51]:
from pyspark.ml.feature import StringIndexer

In [53]:
indexer = StringIndexer(inputCol='Cruise_line', outputCol='cruise_cat')
indexed = indexer.fit(cruise_data).transform(cruise_data)
indexed.head(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, cruise_cat=16.0)]

In [54]:
indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'cruise_cat']

In [61]:
assembler = VectorAssembler(inputCols=[
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'cruise_cat'
], outputCol='features')

In [62]:
output = assembler.transform(indexed)

In [64]:
output.select('features', 'crew').show()

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
|[22.0,70.367,20.5...| 9.2|
|[15.0,70.367,20.5...| 9.2|
|[23.0,70.367,20.5...| 9.2|
|[19.0,70.367,20.5...| 9.2|
|[6.0,110.23899999...|11.5|
|[10.0,110.0,29.74...|11.6|
|[28.0,46.052,14.5...| 6.6|
|[18.0,70.367,20.5...| 9.2|
|[17.0,70.367,20.5...| 9.2|
|[11.0,86.0,21.24,...| 9.3|
|[8.0,110.0,29.74,...|11.6|
|[9.0,88.5,21.24,9...|10.3|
|[15.0,70.367,20.5...| 9.2|
|[12.0,88.5,21.24,...| 9.3|
|[20.0,70.367,20.5...| 9.2|
+--------------------+----+
only showing top 20 rows



In [65]:
final_data = output.select(['features', 'crew'])

In [66]:
training_data, testing_data = final_data.randomSplit([0.7, 0.3])

In [67]:
training_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              111|
|   mean|7.506756756756767|
| stddev|3.465529255700061|
|    min|             0.59|
|    max|             19.1|
+-------+-----------------+



In [68]:
lr = LinearRegression(labelCol='crew')
trained_ship_model = lr.fit(training_data)

In [69]:
ship_results = trained_ship_model.evaluate(testing_data)

In [70]:
ship_results.r2

0.9434563194260573

In [72]:
ship_results.rootMeanSquaredError

0.8318422756986975

In [73]:
training_data.describe().show()

+-------+-----------------+
|summary|             crew|
+-------+-----------------+
|  count|              111|
|   mean|7.506756756756767|
| stddev|3.465529255700061|
|    min|             0.59|
|    max|             19.1|
+-------+-----------------+



In [74]:
from pyspark.sql.functions import corr

In [75]:
cruise_data.select(corr('crew', 'cabins')).show()

+------------------+
|corr(crew, cabins)|
+------------------+
|0.9508226063578497|
+------------------+

