In [0]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


df = spark.read.table('sales_data_sample_csv')


In [0]:
df.dtypes

Out[82]: [('ORDERNUMBER', 'int'),
 ('QUANTITYORDERED', 'int'),
 ('PRICEEACH', 'double'),
 ('ORDERLINENUMBER', 'int'),
 ('SALES', 'double'),
 ('ORDERDATE', 'string'),
 ('STATUS', 'string'),
 ('QTR_ID', 'int'),
 ('MONTH_ID', 'int'),
 ('YEAR_ID', 'int'),
 ('PRODUCTLINE', 'string'),
 ('MSRP', 'int'),
 ('PRODUCTCODE', 'string'),
 ('CUSTOMERNAME', 'string'),
 ('PHONE', 'string'),
 ('ADDRESSLINE1', 'string'),
 ('ADDRESSLINE2', 'string'),
 ('CITY', 'string'),
 ('STATE', 'string'),
 ('POSTALCODE', 'string'),
 ('COUNTRY', 'string'),
 ('TERRITORY', 'string'),
 ('CONTACTLASTNAME', 'string'),
 ('CONTACTFIRSTNAME', 'string'),
 ('DEALSIZE', 'string')]

In [0]:
display(df.select('QUANTITYORDERED', 'PRICEEACH', 'ORDERLINENUMBER','SALES'))


QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES
30,95.7,2,2871.0
34,81.35,5,2765.9
41,94.74,2,3884.34
45,83.26,6,3746.7
49,100.0,14,5205.27
36,96.66,1,3479.76
29,86.13,9,2497.77
48,100.0,1,5512.32
22,98.57,2,2168.54
41,100.0,14,4708.44


In [0]:
# Drop unnecessary columns
df = df.drop("PHONE", "ADDRESSLINE1", "ADDRESSLINE2", "STATE", "POSTALCODE", "COUNTRY", "CONTACTFIRSTNAME", "CONTACTLASTNAME")
# Replace null values with 0
df = df.na.fill(0)
# Create a new column "total_sales" by multiplying "QUANTITYORDERED" and "PRICEEACH"
df = df.withColumn("total_sales", col("QUANTITYORDERED") * col("PRICEEACH"))


In [0]:
# Filter out any negative or zero values in "total_sales"
df = df.filter(col("total_sales") > 0)
# Select only the required columns for the model
df = df.select("QUANTITYORDERED", "PRICEEACH", "total_sales")

In [0]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['QUANTITYORDERED', 'PRICEEACH'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(df)
vhouse_df = vhouse_df.select(['features', 'total_sales'])
display(vhouse_df)

features,total_sales
"Map(vectorType -> dense, length -> 2, values -> List(30.0, 95.7))",2871.0
"Map(vectorType -> dense, length -> 2, values -> List(34.0, 81.35))",2765.9
"Map(vectorType -> dense, length -> 2, values -> List(41.0, 94.74))",3884.34
"Map(vectorType -> dense, length -> 2, values -> List(45.0, 83.26))",3746.7
"Map(vectorType -> dense, length -> 2, values -> List(49.0, 100.0))",4900.0
"Map(vectorType -> dense, length -> 2, values -> List(36.0, 96.66))",3479.76
"Map(vectorType -> dense, length -> 2, values -> List(29.0, 86.13))",2497.77
"Map(vectorType -> dense, length -> 2, values -> List(48.0, 100.0))",4800.0
"Map(vectorType -> dense, length -> 2, values -> List(22.0, 98.57))",2168.54
"Map(vectorType -> dense, length -> 2, values -> List(41.0, 100.0))",4100.0


In [0]:
splits = vhouse_df.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]


display(test_df)

features,total_sales
"Map(vectorType -> dense, length -> 2, values -> List(16.0, 75.48))",1207.68
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 34.19))",683.8
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 36.42))",728.4000000000001
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 40.66))",813.1999999999999
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 48.62))",972.4
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 54.33))",1086.6
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 56.12))",1122.4
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 60.54))",1210.8
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 62.47))",1249.4
"Map(vectorType -> dense, length -> 2, values -> List(20.0, 66.99))",1339.8


In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='total_sales', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [84.7024604476237,34.948468297447626]
Intercept: -2957.0399945225813


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 192.029455
r2: 0.970068


In [0]:
lr_predictions = lr_model.transform(test_df)
display(lr_predictions.select("prediction","total_sales","features"))
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="total_sales",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

prediction,total_sales,features
1036.1097597307448,1207.68,"Map(vectorType -> dense, length -> 2, values -> List(16.0, 75.48))"
-68.10265448037308,683.8,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 34.19))"
9.832429822934955,728.4000000000001,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 36.42))"
158.013935404113,813.1999999999999,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 40.66))"
436.2037430517962,972.4,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 48.62))"
635.7594970302221,1086.6,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 54.33))"
698.3172552826532,1122.4,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 56.12))"
852.7894851573715,1210.8,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 60.54))"
920.2400289714456,1249.4,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 62.47))"
1078.2071056759091,1339.8,"Map(vectorType -> dense, length -> 2, values -> List(20.0, 66.99))"


R Squared (R2) on test data = 0.968436


In [0]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","total_sales","features").show()

+------------------+------------------+------------+
|        prediction|       total_sales|    features|
+------------------+------------------+------------+
|1036.1097597307448|           1207.68|[16.0,75.48]|
|-68.10265448037308|             683.8|[20.0,34.19]|
| 9.832429822934955| 728.4000000000001|[20.0,36.42]|
|  158.013935404113| 813.1999999999999|[20.0,40.66]|
| 436.2037430517962|             972.4|[20.0,48.62]|
| 635.7594970302221|            1086.6|[20.0,54.33]|
| 698.3172552826532|1122.3999999999999|[20.0,56.12]|
| 852.7894851573715|            1210.8|[20.0,60.54]|
| 920.2400289714456|            1249.4|[20.0,62.47]|
|1078.2071056759091|            1339.8|[20.0,66.99]|
|  1287.54843077762|1459.6000000000001|[20.0,72.98]|
| 1330.535046783481|1484.1999999999998|[20.0,74.21]|
|1884.4682692980255|            1801.2|[20.0,90.06]|
|1902.2919881297234|1811.3999999999999|[20.0,90.57]|
| 1939.337364525018|            1832.6|[20.0,91.63]|
|2231.8560441746554|            2000.0|[20.0,1

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'total_sales')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="total_sales", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 200.317


In [0]:
dt_predictions.show()

+------------+------------------+-----------------+
|    features|       total_sales|       prediction|
+------------+------------------+-----------------+
|[16.0,75.48]|           1207.68| 1378.47027027027|
|[20.0,34.19]|             683.8|862.3243243243244|
|[20.0,36.42]| 728.4000000000001|862.3243243243244|
|[20.0,40.66]| 813.1999999999999|862.3243243243244|
|[20.0,48.62]|             972.4|1180.580810810811|
|[20.0,54.33]|            1086.6|1180.580810810811|
|[20.0,56.12]|1122.3999999999999| 1378.47027027027|
|[20.0,60.54]|            1210.8| 1378.47027027027|
|[20.0,62.47]|            1249.4| 1378.47027027027|
|[20.0,66.99]|            1339.8| 1378.47027027027|
|[20.0,72.98]|1459.6000000000001| 1378.47027027027|
|[20.0,74.21]|1484.1999999999998| 1378.47027027027|
|[20.0,90.06]|            1801.2|1800.730185185185|
|[20.0,90.57]|1811.3999999999999|1800.730185185185|
|[20.0,91.63]|            1832.6|1800.730185185185|
|[20.0,100.0]|            2000.0|1800.730185185185|
|[20.0,100.0

In [0]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'total_sales', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'total_sales', 'features').show(5)

+------------------+-----------------+------------+
|        prediction|      total_sales|    features|
+------------------+-----------------+------------+
|1368.9620402367184|          1207.68|[16.0,75.48]|
| 737.9307326935431|            683.8|[20.0,34.19]|
| 737.9307326935431|728.4000000000001|[20.0,36.42]|
| 748.1421950610031|813.1999999999999|[20.0,40.66]|
|1100.2644041092585|            972.4|[20.0,48.62]|
+------------------+-----------------+------------+
only showing top 5 rows

