In [0]:
spark

Get Head of the file to understand the Schema.

In [0]:
file_path = "/FileStore/tables/ecommerce/ecommerce.csv"
dbutils.fs().head(file_path)

Out[2]: 'Email,Avatar,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent\r\nmstephenson@fernandez.com,Violet,34.49726773,12.65565115,39.57766802,4.082620633,587.951054\r\nhduke@hotmail.com,DarkGreen,31.92627203,11.10946073,37.26895887,2.664034182,392.2049334\r\npallen@yahoo.com,Bisque,33.00091476,11.33027806,37.11059744,4.104543202,487.5475049\r\nriverarebecca@gmail.com,SaddleBrown,34.30555663,13.71751367,36.72128268,3.120178783,581.852344\r\nmstephens@davidson-herman.com,MediumAquaMarine,33.33067252,12.79518855,37.5366533,4.446308318,599.406092\r\nalvareznancy@lucas.biz,FloralWhite,33.87103788,12.02692534,34.47687763,5.493507201,637.1024479\r\nkatherine20@yahoo.com,DarkSlateBlue,32.0215955,11.36634831,36.68377615,4.685017247,521.5721748\r\nawatkins@yahoo.com,Aqua,32.73914294,12.35195897,37.37335886,4.434273435,549.9041461\r\nvchurch@walter-martinez.com,Salmon,33.9877729,13.38623528,37.53449734,3.273433578,570.200409\r\nbonnie69@lin.biz,Brown,31.93

In [0]:
schema = "email string, avatar string, avg_session_length double, time_on_app double, time_on_website double, length_of_membership double, yearly_amount_spent double"
ecommerce_df = spark.read.csv(schema=schema, header=True, path = file_path)
ecommerce_df.show()

+--------------------+----------------+------------------+-----------+---------------+--------------------+-------------------+
|               email|          avatar|avg_session_length|time_on_app|time_on_website|length_of_membership|yearly_amount_spent|
+--------------------+----------------+------------------+-----------+---------------+--------------------+-------------------+
|mstephenson@ferna...|          Violet|       34.49726773|12.65565115|    39.57766802|         4.082620633|         587.951054|
|   hduke@hotmail.com|       DarkGreen|       31.92627203|11.10946073|    37.26895887|         2.664034182|        392.2049334|
|    pallen@yahoo.com|          Bisque|       33.00091476|11.33027806|    37.11059744|         4.104543202|        487.5475049|
|riverarebecca@gma...|     SaddleBrown|       34.30555663|13.71751367|    36.72128268|         3.120178783|         581.852344|
|mstephens@davidso...|MediumAquaMarine|       33.33067252|12.79518855|     37.5366533|         4.4463083

Convert String columns to Indexed columns

In [0]:
from pyspark.ml.feature import *

email_string_indexer = StringIndexer(inputCol="email", outputCol="email_index")
avatar_string_indexer = StringIndexer(inputCol="avatar", outputCol="avatar_index")

In [0]:
email_indexed_df = email_string_indexer.fit(ecommerce_df).transform(ecommerce_df)
indexed_df = avatar_string_indexer.fit(email_indexed_df).transform(email_indexed_df)

Split Data into two - 0.7 for Training and 0.3 for Testing

In [0]:
split_df = indexed_df.randomSplit([0.7, 0.3])
training_split_df = split_df[0]
testing_split_df = split_df[1]

print(f"Training Split: {training_split_df.count()} and Testing Split: {testing_split_df.count()}")

Training Split: 377 and Testing Split: 123


Use Training Split data to assemble as Features and Label

In [0]:
from pyspark.sql.functions import *

assembler = VectorAssembler(outputCol="features", \
    inputCols =["email_index", "avatar_index", "time_on_app", "time_on_website", "avg_session_length", "length_of_membership"])
training_df = assembler.transform(training_split_df).select("features", col("yearly_amount_spent").alias("label"))
training_df.show()



+--------------------+-----------+
|            features|      label|
+--------------------+-----------+
|[0.0,68.0,10.1631...|521.2407802|
|[2.0,101.0,12.005...|576.4776072|
|[4.0,21.0,11.4489...|420.7376732|
|[5.0,69.0,10.7353...|476.1914133|
|[6.0,17.0,10.9731...|404.8245289|
|[7.0,65.0,11.7644...|482.1449969|
|[8.0,133.0,12.190...|494.5518611|
|[9.0,99.0,12.8779...|419.9387748|
|[10.0,1.0,14.7153...|452.3156755|
|[11.0,71.0,12.600...|501.9282649|
|[12.0,36.0,10.256...|256.6705823|
|[13.0,0.0,11.4815...|445.7498412|
|[14.0,48.0,12.387...| 684.163431|
|[16.0,72.0,10.963...|439.9978799|
|[17.0,49.0,12.026...|637.1024479|
|[18.0,24.0,13.130...| 583.977802|
|[21.0,2.0,11.5886...| 483.673308|
|[22.0,76.0,11.546...| 448.340425|
|[23.0,19.0,13.338...|492.6060127|
|[24.0,44.0,12.447...| 584.105885|
+--------------------+-----------+
only showing top 20 rows



In [0]:
testing_df = assembler.transform(testing_split_df).select("features", col("yearly_amount_spent").alias("true_label"))
testing_df.show()

+--------------------+-----------+
|            features| true_label|
+--------------------+-----------+
|[1.0,12.0,13.4577...|503.9783791|
|[3.0,18.0,10.1016...|418.6027421|
|[15.0,105.0,12.03...|  497.81193|
|[19.0,48.0,10.719...|378.3309069|
|[20.0,0.0,10.8048...|375.3984554|
|[27.0,58.0,11.761...|402.1671222|
|[34.0,12.0,11.371...|502.7710746|
|[42.0,115.0,12.48...|497.3895578|
|[43.0,5.0,12.3519...|549.9041461|
|[44.0,83.0,11.673...|442.7228916|
|[46.0,100.0,13.45...|549.0082269|
|[48.0,18.0,10.969...|403.7669021|
|[58.0,28.0,11.851...|266.0863409|
|[59.0,51.0,11.520...|474.5323294|
|[61.0,13.0,12.931...|570.4517259|
|[65.0,27.0,13.387...|588.7126055|
|[71.0,65.0,11.887...|491.9115051|
|[74.0,72.0,13.485...|  511.97986|
|[76.0,5.0,12.3262...|482.6024673|
|[77.0,12.0,11.388...|451.6286105|
+--------------------+-----------+
only showing top 20 rows



Apply Linear Regression Model

In [0]:
from pyspark.ml.regression import *

lr = LinearRegression(featuresCol= 'features', labelCol = 'label', maxIter = 100, regParam=0.3)
model = lr.fit(training_df)
prediction = model.transform(testing_df)


In [0]:
prediction.createOrReplaceTempView("ecommerce_prediction")

In [0]:
%sql
select prediction, true_label from ecommerce_prediction;

prediction,true_label
500.1979523910889,503.9783791
424.1263605906324,418.6027421
474.4785225081757,497.81193
389.453390025569,378.3309069
373.7547570309964,375.3984554
423.5808075445152,402.1671222
494.3829074304758,502.7710746
482.1817217742623,497.3895578
556.0038640870462,549.9041461
436.25246948162567,442.7228916


Output can only be rendered in Databricks

In [0]:
from pyspark.ml.evaluation import *

evaluator = RegressionEvaluator(predictionCol= 'prediction', labelCol= 'true_label', metricName = 'rmse')
rmse = evaluator.evaluate(prediction)

print(f"Root Mean Square Error: {rmse}")

Root Mean Square Error: 10.464807018848928
