## Purpose of script:
#### Basic script for PySpark ML, using linear regression as an example
#### References LinkedIn Learning course here:
#### https://www.linkedin.com/learning/apache-spark-essential-training/welcome?u=0

In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\liamk\\Documents\\spark\\spark-3.3.1-bin-hadoop3'

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
spark = SparkSession.builder.appName('ml').getOrCreate()

In [5]:
path = '../Datasets/cogsley_sales.csv'

data = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path)

In [6]:
data.cache() # cache data for faster reuse
data = data.dropna()

In [7]:
data.head()

Row(RowID=1, OrderID=3, OrderDate=datetime.datetime(2010, 10, 13, 0, 0), OrderMonthYear=datetime.datetime(2010, 10, 1, 0, 0), Quantity=6, Quote=1200, DiscountPct=0.04, Rate=200, SaleAmount=1152.0, CustomerName='Muhammed MacIntyre', CompanyName='CA Inc.', Sector='Technology', Industry='Computer Software: Prepackaged Software', City='Highland Park', ZipCode=60035, State='Illinois', Region='Central', ProjectCompleteDate=datetime.datetime(2010, 10, 20, 0, 0), DaystoComplete=7, ProductKey='Development - Big Data', ProductCategory='Development', ProductSubCategory='Big Data', Consultant='Ethan Bird', Manager='Josh Martinez', HourlyWage=60, RowCount=1, WageMargin=0.7)

In [8]:
for field in data.schema.fields:
    print(field.name +" , "+str(field.dataType))

RowID , IntegerType()
OrderID , IntegerType()
OrderDate , TimestampType()
OrderMonthYear , TimestampType()
Quantity , IntegerType()
Quote , IntegerType()
DiscountPct , DoubleType()
Rate , IntegerType()
SaleAmount , DoubleType()
CustomerName , StringType()
CompanyName , StringType()
Sector , StringType()
Industry , StringType()
City , StringType()
ZipCode , IntegerType()
State , StringType()
Region , StringType()
ProjectCompleteDate , TimestampType()
DaystoComplete , IntegerType()
ProductKey , StringType()
ProductCategory , StringType()
ProductSubCategory , StringType()
Consultant , StringType()
Manager , StringType()
HourlyWage , IntegerType()
RowCount , IntegerType()
WageMargin , DoubleType()


In [9]:
summary = data.select("OrderMonthYear", "SaleAmount").groupby("OrderMonthYear").sum()\
    .orderBy("OrderMonthYear").toDF("OrderMonthYear", "SaleAmount")

results = summary.rdd.map(lambda r: (int(str(r.OrderMonthYear).replace('-', '')[:8]), r.SaleAmount))\
    .toDF(["OrderMonthYear", "SaleAmount"])

results.head()

Row(OrderMonthYear=20090101, SaleAmount=741024.2000000001)

In [10]:
# dat = results.select("OrderMonthYear", "SaleAmount")\
#     .rdd.map(lambda r: LabeledPoint(r[1], [r[0]]))\
#     .toDF()

# dat.head()

Row(features=DenseVector([20090101.0]), label=741024.2000000001)

In [22]:
va = VectorAssembler(inputCols = ['OrderMonthYear'], outputCol = 'features')
dat = va.transform(results)
dat = dat.select(['features', 'SaleAmount'])
dat.show(3)

+-------------+-----------------+
|     features|       SaleAmount|
+-------------+-----------------+
|[2.0090101E7]|741024.2000000001|
|[2.0090201E7]|544241.1499999998|
|[2.0090301E7]|        563502.15|
+-------------+-----------------+
only showing top 3 rows



## Fit linear models

In [26]:
## corrected issue of deprecated mllib.LabeledPoint by replacing with ml.feature.VectorAssembler
## refereced this tutorial:
## https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a

lr = LinearRegression(featuresCol = 'features', labelCol='SaleAmount')

modelA = lr.fit(dat, {lr.regParam:0.0})

predA = modelA.transform(dat)

predA.show(10)

+-------------+-----------------+-----------------+
|     features|       SaleAmount|       prediction|
+-------------+-----------------+-----------------+
|[2.0090101E7]|741024.2000000001|607367.4185045543|
|[2.0090201E7]|544241.1499999998|607347.2708239248|
|[2.0090301E7]|        563502.15|607327.1231432953|
|[2.0090401E7]|619011.4000000001|607306.9754626658|
|[2.0090501E7]|641158.6999999998|607286.8277820363|
|[2.0090601E7]|        558288.55|607266.6801014068|
|[2.0090701E7]|673657.1000000002|607246.5324207773|
|[2.0090801E7]|        662651.85|607226.3847401477|
|[2.0090901E7]|650729.3500000001|607206.2370595182|
|[2.0091001E7]|        571600.35|607186.0893788887|
+-------------+-----------------+-----------------+
only showing top 10 rows



In [35]:
print("Coefficients: " + str(modelA.coefficients))
print("Intercept: " + str(modelA.intercept))

Coefficients: [-0.20147680629502693]
Intercept: 4655056.806129081


In [36]:
trainingSummary = modelA.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 69171.467009
r2: 0.001060


In [37]:
spark.stop()