In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import corr
spark = SparkSession.builder.appName('lrex').getOrCreate()

In [2]:
dtFrame = spark.read.csv(path='/Users/manoj/Desktop/Naveena Project/Course Work -- BD & DV.csv/cleaneddata.csv', header = True, inferSchema = True)

In [3]:
print((dtFrame.count(), len(dtFrame.columns)))

(419890, 7)


In [4]:
dtFrame.printSchema()

root
 |-- Event_Time: string (nullable = true)
 |-- Order_ID: long (nullable = true)
 |-- Product_ID: long (nullable = true)
 |-- Category_Code: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- User_ID: double (nullable = true)



In [5]:
dtFrame.describe().show(5)

+-------+----------+--------------------+--------------------+------------------+--------+------------------+--------------------+
|summary|Event_Time|            Order_ID|          Product_ID|     Category_Code|   Brand|             Price|             User_ID|
+-------+----------+--------------------+--------------------+------------------+--------+------------------+--------------------+
|  count|    419890|              419890|              419890|            419890|  419890|            419890|              419890|
|   mean|      null|2.370508869981049...|1.676752749673554...|              null|    null|254.28161868586594|1.515915649999444...|
| stddev|      null| 2.01414377327802E16|3.171233999238092...|              null|    null| 321.1676131705486|                 0.0|
|    min|2020-01-05| 2294359932054536986| 1515966223509088493|   accessories.bag|   acana|               0.0|       1.51591565E18|
|    max|2020-11-21| 2388440981134693944| 2388434452476865703|stationery.stapler|zw

In [6]:
dtFrame.na.drop().show()

+----------+-------------------+-------------------+--------------------+-------+-------+-------------+
|Event_Time|           Order_ID|         Product_ID|       Category_Code|  Brand|  Price|      User_ID|
+----------+-------------------+-------------------+--------------------+-------+-------+-------------+
|2020-04-24|2294359932054536986|1515966223509089906|  electronics.tablet|samsung| 162.01|1.51591565E18|
|2020-04-24|2294359932054536986|1515966223509089906|  electronics.tablet|samsung| 162.01|1.51591565E18|
|2020-04-24|2294444024058086220|2273948319057183658|electronics.audio...| huawei|  77.52|1.51591565E18|
|2020-04-24|2294444024058086220|2273948319057183658|electronics.audio...| huawei|  77.52|1.51591565E18|
|2020-04-26|2295716521449619559|1515966223509261697|furniture.kitchen...|maestro|  39.33|1.51591565E18|
|2020-04-26|2295740594749702229|1515966223509104892|electronics.smart...|  apple|1387.01|1.51591565E18|
|2020-04-26|2295740594749702229|1515966223509104892|electronics.

In [7]:
dtFrame.select(corr('Order_ID','Price')).show()

+---------------------+
|corr(Order_ID, Price)|
+---------------------+
|  0.06470901042722335|
+---------------------+



In [8]:
assembler = VectorAssembler(inputCols=['Price'], outputCol='features')
dtFrame1 = assembler.transform(dtFrame)

In [9]:
dtFrame2 = dtFrame1.select(['features','Price'])

In [10]:
train,test = dtFrame2.randomSplit([0.7, 0.3])

In [11]:
train.show(5)

+--------+-----+
|features|Price|
+--------+-----+
|   [0.0]|  0.0|
|  [0.23]| 0.23|
|  [0.23]| 0.23|
|  [0.23]| 0.23|
|  [0.23]| 0.23|
+--------+-----+
only showing top 5 rows



In [12]:
test.show(5)

+--------+-----+
|features|Price|
+--------+-----+
|   [0.0]|  0.0|
|   [0.0]|  0.0|
|  [0.02]| 0.02|
|  [0.23]| 0.23|
|  [0.23]| 0.23|
+--------+-----+
only showing top 5 rows



In [13]:
lr = LinearRegression(featuresCol = 'features', labelCol='Price')
lrModel = lr.fit(train)

In [14]:
print(f'Intercept: {lrModel.intercept}\nCoefficient: {lrModel.coefficients.values}')

Intercept: 1.1576099662839685e-13
Coefficient: [1.]


In [15]:
trainSummary = lrModel.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("\nr2: %f" % trainSummary.r2)

RMSE: 0.000000

r2: 1.000000


In [16]:
from  pyspark.sql.functions import abs
predictions = lrModel.transform(test)
x =((predictions['Price']-predictions['prediction'])/predictions['Price'])*100
predictions = predictions.withColumn('Accuracy',abs(x))
predictions.select("prediction","Price","Accuracy","features").show(5)

+--------------------+-----+--------------------+--------+
|          prediction|Price|            Accuracy|features|
+--------------------+-----+--------------------+--------+
|1.157609966283968...|  0.0|                null|   [0.0]|
|1.157609966283968...|  0.0|                null|   [0.0]|
|0.020000000000115755| 0.02|5.787731405249019...|  [0.02]|
|  0.2300000000001157| 0.23|5.029793007214839...|  [0.23]|
|  0.2300000000001157| 0.23|5.029793007214839...|  [0.23]|
+--------------------+-----+--------------------+--------+
only showing top 5 rows



In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
pred_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Price",metricName="r2")
print("R Squared (R2) on test data = %g" % pred_evaluator.evaluate(predictions))

R Squared (R2) on test data = 1


In [18]:
r2 = trainSummary.r2
n = dtFrame.count()
p = len(dtFrame.columns)
adjusted_r2 = 1-(1-r2)*(n-1)/(n-p-1)

In [19]:
lin_reg = LinearRegression(featuresCol = 'features', labelCol='Price',maxIter=50, regParam=0.12, elasticNetParam=0.2)
linear_model = lin_reg.fit(train)

In [20]:
linear_model.summary.rootMeanSquaredError

0.11996397032091072