## E-Commerce Customer Analysis
Examine a dataset with Ecommerce Customer Data for a company's website and mobile app.Build a regression model that will predict the customer's yearly spend on the company's product

### Importing pyspark

In [74]:
import findspark
findspark.init('/home/ubuntu/spark-3.1.1-bin-hadoop3.2')

In [3]:
from pyspark.sql import SparkSession

In [4]:
#Initializing Spark Session
spark = SparkSession.builder.appName('lr').getOrCreate()

In [5]:
from pyspark.ml.regression import LinearRegression

### Importing data

In [6]:
#Reading Data
data = spark.read.csv('Ecommerce_Customers.csv',inferSchema=True,header=True)

In [7]:
data.show(5)

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

In [8]:
data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [9]:
data.head(1)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)]

In [10]:
#Statistical description of data
data.describe().show()

+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|summary|            Email|             Address|     Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|  count|              500|                 500|        500|               500|               500|               500|                 500|                500|
|   mean|             null|                null|       null| 33.05319351819619|12.052487937166134| 37.06044542094859|   3.533461555915055|  499.3140382585909|
| stddev|             null|                null|       null|0.9925631110845354|0.9942156084725424|1.0104889067564033|  0.9992775024112585|   79.3147815497068|
|    min|aaron04@yahoo.com|0001 Mack MillNor..

In [11]:
#Shape of the dataset
print((data.count(), len(data.columns)))

(500, 8)


In [12]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns
# ("features","label")

# Import VectorAssembler and Vectors

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [13]:
#Lisitng columns
data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [14]:
assembler = VectorAssembler(inputCols=['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'],
                           outputCol='features')

#inputCols takes all the independent features
#outputCol helps to create a dense vector all the inputCols given with name can be anything. Here given 'features'

In [15]:
#Transforming whole data by Vector Assembler
output = assembler.transform(data)

In [16]:
output.select('features').show()

+--------------------+
|            features|
+--------------------+
|[34.4972677251122...|
|[31.9262720263601...|
|[33.0009147556426...|
|[34.3055566297555...|
|[33.3306725236463...|
|[33.8710378793419...|
|[32.0215955013870...|
|[32.7391429383803...|
|[33.9877728956856...|
|[31.9365486184489...|
|[33.9925727749537...|
|[33.8793608248049...|
|[29.5324289670579...|
|[33.1903340437226...|
|[32.3879758531538...|
|[30.7377203726281...|
|[32.1253868972878...|
|[32.3388993230671...|
|[32.1878120459321...|
|[32.6178560628234...|
+--------------------+
only showing top 20 rows



In [17]:
output.head(1)

#Features here being a dense vector of all the inputCols given.

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005, features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826]))]

In [18]:
#Combining independent features and dependent features into a dataframe
final_data = output.select(['features','Yearly Amount Spent'])

#Spark accepts thing kind of format

In [19]:
final_data.show(5)

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
+--------------------+-------------------+
only showing top 5 rows



### Train Test Split

In [20]:
#Splitting data into training and testing
train_data , test_data = final_data.randomSplit([0.7,0.3])

In [21]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                357|
|   mean|  502.3288849753759|
| stddev|  77.28917838640147|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [22]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                143|
|   mean| 491.78746288871474|
| stddev|   83.9673406494717|
|    min|  282.4712457199145|
|    max|  744.2218671047146|
+-------+-------------------+



### Model Building

In [23]:
#Creating a linear regression model
lr = LinearRegression(labelCol='Yearly Amount Spent')

#Default is same as featuresCol='features', just the label column is different.Set it accordingly

In [24]:
#Fitting to training data
lr_model = lr.fit(train_data)

In [25]:
#coefficients and intercept for linear regression
print("Coefficients: {} Intercept: {}".format(lr_model.coefficients,lr_model.intercept))

Coefficients: [25.774479636352776,38.85209275021159,0.3897432102027075,61.75704088974337] Intercept: -1053.081688359479


### Evaluating Metrics

In [26]:
test_results = lr_model.evaluate(test_data)

In [27]:
#Residuals - Difference between the predicted value and the actual value
test_results.residuals.show(5)

+-------------------+
|          residuals|
+-------------------+
| 10.190272543561832|
|  -5.14248411783484|
|-0.2235811995388417|
| -4.281641232615357|
|  21.31139210940478|
+-------------------+
only showing top 5 rows



In [28]:
#Root Mean Squared Error
test_results.rootMeanSquaredError

9.779586610718548

In [29]:
#R2 Score - model explains 98% variance in the data
test_results.r2

0.9863394592824732

### Predicting new data

In [30]:
unlabeled_data = test_data.select('features')

In [31]:
unlabeled_data.show(5)

+--------------------+
|            features|
+--------------------+
|[29.5324289670579...|
|[30.4925366965402...|
|[30.5743636841713...|
|[31.2681042107507...|
|[31.2834474760581...|
+--------------------+
only showing top 5 rows



In [32]:
#Predicting the target variable
predictions = lr_model.transform(unlabeled_data)

In [33]:
predictions.show(5)

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[29.5324289670579...|398.45007852906565|
|[30.4925366965402...|287.61372983774936|
|[30.5743636841713...| 442.2879949576045|
|[31.2681042107507...| 427.7521744064393|
|[31.2834474760581...| 570.4696973162627|
+--------------------+------------------+
only showing top 5 rows



<hr>

## Consulting Project

You've been contracted by Hyundai Heavy Industries to help them build a predictive model for some ships. [Hyundai Heavy Industries](http://www.hyundai.eu/en) is one of the world's largest ship manufacturing companies and builds cruise liners.

You've been flown to their headquarters in Ulsan, South Korea to help them give accurate estimates of how many crew members a ship will require.

They are currently building new ships for some customers and want you to create a model and use it to predict how many crew members the ships will need.

Here is what the data looks like so far:

    Description: Measurements of ship size, capacity, crew, and age for 158 cruise
    ships.


    Variables/Columns
    Ship Name     1-20
    Cruise Line   21-40
    Age (as of 2013)   46-48
    Tonnage (1000s of tons)   50-56
    passengers (100s)   58-64
    Length (100s of feet)  66-72
    Cabins  (100s)   74-80
    Passenger Density   82-88
    Crew  (100s)   90-96
    
Your job is to create a regression model that will help predict how many crew members will be needed for future ships.

In [34]:
#Reading dataset
df = spark.read.csv('cruise_ship_info.csv',inferSchema=True,header=True)

In [35]:
df.show(5)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
only showing top 5 rows



### Exploratory Data Analysis

In [36]:
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [37]:
df.describe().show()

+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|summary|Ship_name|Cruise_line|               Age|           Tonnage|       passengers|           length|            cabins|passenger_density|             crew|
+-------+---------+-----------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|  count|      158|        158|               158|               158|              158|              158|               158|              158|              158|
|   mean| Infinity|       null|15.689873417721518| 71.28467088607599|18.45740506329114|8.130632911392404| 8.830000000000005|39.90094936708861|7.794177215189873|
| stddev|     null|       null| 7.615691058751413|37.229540025907866|9.677094775143416|1.793473548054825|4.4714172221480615| 8.63921711391542|3.503486564627034|
|    min|Adventure|    Azamara|   

In [38]:
print((df.count(), len(df.columns)))

(158, 9)


In [39]:
#Grouping based on Cruise Line 
df.groupBy('Cruise_line').count().show()

+-----------------+-----+
|      Cruise_line|count|
+-----------------+-----+
|            Costa|   11|
|              P&O|    6|
|           Cunard|    3|
|Regent_Seven_Seas|    5|
|              MSC|    8|
|         Carnival|   22|
|          Crystal|    2|
|           Orient|    1|
|         Princess|   17|
|        Silversea|    4|
|         Seabourn|    3|
| Holland_American|   14|
|         Windstar|    3|
|           Disney|    2|
|        Norwegian|   13|
|          Oceania|    3|
|          Azamara|    2|
|        Celebrity|   10|
|             Star|    6|
|  Royal_Caribbean|   23|
+-----------------+-----+



### Handling Categorical Variable

In [40]:
from pyspark.ml.feature import StringIndexer

In [41]:
indexer = StringIndexer(inputCol='Cruise_line',outputCol='cruise_cat')

In [42]:
indexed = indexer.fit(df).transform(df)

In [43]:
indexed.show(1)

+---------+-----------+---+------------------+----------+------+------+-----------------+----+----------+
|Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|cruise_cat|
+---------+-----------+---+------------------+----------+------+------+-----------------+----+----------+
|  Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|      16.0|
+---------+-----------+---+------------------+----------+------+------+-----------------+----+----------+
only showing top 1 row



### Vector Assembler

In [44]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [45]:
indexed.columns

['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'cruise_cat']

In [46]:
assembler = VectorAssembler(inputCols=['Age','Tonnage','passengers','length','cabins','passenger_density','cruise_cat'],
                           outputCol='features')

In [47]:
output = assembler.transform(indexed)

In [48]:
output.select(['features','crew']).show(5)

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
+--------------------+----+
only showing top 5 rows



In [49]:
final_data = output.select(['features','crew'])

In [50]:
final_data.show(5)

+--------------------+----+
|            features|crew|
+--------------------+----+
|[6.0,30.276999999...|3.55|
|[6.0,30.276999999...|3.55|
|[26.0,47.262,14.8...| 6.7|
|[11.0,110.0,29.74...|19.1|
|[17.0,101.353,26....|10.0|
+--------------------+----+
only showing top 5 rows



### Train Test Split

In [52]:
train_data , test_data = final_data.randomSplit([0.7,0.3],101)

In [53]:
train_data.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               108|
|   mean| 7.715833333333342|
| stddev|3.4453859501940265|
|    min|              0.59|
|    max|              21.0|
+-------+------------------+



### Model Building

In [54]:
from pyspark.ml.regression import LinearRegression

In [57]:
lr = LinearRegression(labelCol='crew')

In [59]:
lr_model = lr.fit(train_data)

### Evaluation Metrics

In [61]:
test_results = lr_model.evaluate(test_data)

In [65]:
test_results.residuals.show(5)

+--------------------+
|           residuals|
+--------------------+
| 0.04555816042846139|
| -1.3989028593182269|
|  -0.979321621629067|
|-0.47165845979649035|
|  1.0853984945178325|
+--------------------+
only showing top 5 rows



In [63]:
test_results.rootMeanSquaredError

1.345437036441198

In [66]:
test_results.r2

0.8617799547107838

In [67]:
test_results.meanSquaredError

1.8102008190276737

In [68]:
test_results.meanAbsoluteError

0.731118803460324

### Correlation between Variables

In [69]:
from pyspark.sql.functions import corr

In [73]:
df.select(corr('crew','cabins')).show()

+------------------+
|corr(crew, cabins)|
+------------------+
|0.9508226063578497|
+------------------+

