# Spark Setup

In [2]:
import findspark
findspark.init('C:\opt\spark\spark-3.0.0-preview2-bin-hadoop2.7')

In [3]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

In [6]:
# Set Up Session
spark = SparkSession\
        .builder\
        .appName('regressionDemo')\
        .getOrCreate()

# Input Dataset

In [8]:
# Reading CSV File
inpurFile = spark\
            .read\
            .option('header','true')\
            .option('inferSchema','true')\
            .csv(r'D:\Spring 2020\BAX423BigDataAnalytics\Extra\ecommerce.csv')
# inputFile = spark.read.csv('',header=True,inferSchema=True)

In [11]:
# Print Schema
inpurFile.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [12]:
inpurFile.describe().show()

+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|summary|            Email|             Address|     Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|  count|              500|                 500|        500|               500|               500|               500|                 500|                500|
|   mean|             null|                null|       null| 33.05319351819619|12.052487937166134| 37.06044542094859|   3.533461555915055|  499.3140382585909|
| stddev|             null|                null|       null|0.9925631110845354|0.9942156084725424|1.0104889067564033|  0.9992775024112585|   79.3147815497068|
|    min|aaron04@yahoo.com|0001 Mack MillNor..

In [13]:
inpurFile.show(5)

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

# Linear Regression Model

In [14]:
from pyspark.ml.regression import LinearRegression

In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [16]:
inpurFile.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [23]:
# Selevt Xs to make a feature vector
assembler = VectorAssembler(inputCols = [
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership'],outputCol = 'features')

In [24]:
output = assembler.transform(inpurFile)

In [25]:
output.show(5)

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+--------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|            features|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+--------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|[34.4972677251122...|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|[31.9262720263601...|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37

In [26]:
output.head(1)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005, features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826]))]

In [27]:
# Create a dataset [X,Y]
finalData = output.select('features','Yearly Amount Spent')

In [28]:
finalData.show(5)

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
+--------------------+-------------------+
only showing top 5 rows



In [29]:
# Split the dataset
trainData, testData = finalData.randomSplit([0.7,0.3])

In [31]:
# Config the linear regression model
lrModel = LinearRegression(labelCol = 'Yearly Amount Spent', featuresCol = 'features')

In [32]:
# Fit the train data
lrEstimator = lrModel.fit(trainData)

In [33]:
# Predict the test data
teatResults = lrEstimator.evaluate(testData)

In [35]:
teatResults.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -6.760924253450014|
|-18.239254901877644|
|-3.9109787042870607|
|  6.106708018372274|
| -1.177059632632222|
| -4.888259626929084|
| 21.964462045444634|
|  2.951870073913767|
| -5.199725789115121|
|-6.0912498333360645|
| -9.114138665437338|
|-14.226104796437767|
|-1.8874535788046387|
|  6.713877091514121|
| -4.447998707076806|
| -2.647343321680637|
|-1.4520613132079916|
|  11.09694229850453|
|  5.365042655148159|
| 7.6750773025777335|
+-------------------+
only showing top 20 rows



In [36]:
teatResults.rootMeanSquaredError

9.484507599686053

In [37]:
teatResults.r2

0.9852003453469674

In [7]:
# df = spark.sql('"select ‘spark’ as hello "')
# df.show()

# Deal with Cateforical Data

In [38]:
assembler = VectorAssembler(inputCols = ['Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership'],outputCol = 'features')

In [39]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [40]:
AvatarIndexer = StringIndexer(inputCol='Avatar',outputCol='AvatarIndexer')

In [41]:
####
# A B C
# 0 1 2
# [1,0,0] ---A
AvatarEncoder = OneHotEncoder(inputCol='AvatarIndexer',outputCol='AvatarDummies')

In [42]:
from pyspark.ml import Pipeline

In [43]:
pipeline = Pipeline(stages=[AvatarIndexer,AvatarEncoder,assembler])

In [44]:
NewOutput = pipeline.fit(inpurFile)