# Line Item Price Prediction for Procurement/Sourcing- Satya Das

In [28]:
import numpy as np
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer,StandardScaler,MinMaxScaler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

from pyspark.ml.clustering import KMeans,KMeansModel
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

sqlContext = SQLContext(sc)
df = sqlContext.read.csv("D:\\bigdata\\spark\\testdata\\lineitem_sample.csv", header=True, inferSchema=True)



#as view columns are string categorical, we will change to double


df=df.withColumn('Spec_5_d',when(df.Spec_5 == 'Windows',1.0).otherwise(2.0)).drop(df.Spec_5)
#df=df.withColumn('Spec_3_d',when(df.Spec_3 == 'i-5',1.0).otherwise(2.0)).drop(df.Spec_3)
df = df.withColumn("Spec_1", df["Spec_1"].cast(DoubleType()))


def man_com_convert(cname):
    if cname == 'IBM':
        return 1.0
    if cname == 'DELL':
        return 2.0
    if cname == 'HP':
        return 3.0
    return 0

man_com_convert_udf = udf(man_com_convert, DoubleType())
df = df.withColumn('Manufactured_Company_d',man_com_convert_udf(df['Manufactured_Company'])).drop(df.Manufactured_Company)
def supp_country_convert(cname):
    if cname == 'India':
        return 1.0
    if cname == 'USA':
        return 2.0
    return 0

supp_country_convert_udf = udf(supp_country_convert, DoubleType())
df = df.withColumn('Supplier_Country_d',supp_country_convert_udf(df['Supplier Country']))

columns_to_drop = ['Supplier Country', 'Name', 'Supplier Name','Category','Sub-Category','Currency','Spec_3','Spec_4']
df = df.drop(*columns_to_drop)

df.na.drop()


DataFrame[SNo: int, Delivery_in_days: int, Year_of_Mfg: int, Expiry_Year: int, Spec_1: double, Spec_2: int, Price: int, Spec_5_d: double, Manufactured_Company_d: double, Supplier_Country_d: double]

In [29]:
# Check mean values
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
SNo,31,16.0,9.092121131323903,1,31
Delivery_in_days,31,3.967741935483871,1.016001016001524,3,5
Year_of_Mfg,31,2017.8064516129032,0.40160966445124957,2017,2018
Expiry_Year,31,2021.8064516129032,0.40160966445124957,2021,2022
Spec_1,31,11.35483870967742,4.012882481016808,8.0,16.0
Spec_2,31,709.6774193548387,250.8051550635505,500,1000
Price,31,58161.290322580644,12709.830248521674,30000,90000
Spec_5_d,31,1.032258064516129,0.1796053020267749,1.0,2.0
Manufactured_Company_d,31,2.3548387096774195,0.9503819266229828,1.0,3.0


# Now First Step is Feature Selection

In [30]:


import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to Price for ", i, df.stat.corr('Price',i))

Correlation to Price for  SNo 0.5463283742179583
Correlation to Price for  Delivery_in_days -0.22416004987181634
Correlation to Price for  Year_of_Mfg 0.613639956721719
Correlation to Price for  Expiry_Year 0.613639956721719
Correlation to Price for  Spec_1 0.5955367470521578
Correlation to Price for  Spec_2 0.4386833982108931
Correlation to Price for  Price 1.0
Correlation to Price for  Spec_5_d 0.02684930224701973
Correlation to Price for  Manufactured_Company_d 0.4780280907969442
Correlation to Price for  Supplier_Country_d 0.2241600498718162


In [31]:

vectorAssembler = VectorAssembler(inputCols = ['Year_of_Mfg', 'Expiry_Year', 'Spec_1', 'Spec_2', 'Manufactured_Company_d', 'Supplier_Country_d'], outputCol = 'features')
df = vectorAssembler.transform(df)


In [5]:
df.select('Delivery_in_days', 'Year_of_Mfg', 'Expiry_Year', 'Spec_1', 'Spec_2', 'Spec_5_d', 'Manufactured_Company_d', 'Supplier_Country_d').show()



+----------------+-----------+-----------+------+------+--------+----------------------+------------------+
|Delivery_in_days|Year_of_Mfg|Expiry_Year|Spec_1|Spec_2|Spec_5_d|Manufactured_Company_d|Supplier_Country_d|
+----------------+-----------+-----------+------+------+--------+----------------------+------------------+
|               5|       2018|       2022|   8.0|   500|     1.0|                   1.0|               1.0|
|               5|       2017|       2021|   8.0|   500|     1.0|                   1.0|               1.0|
|               5|       2018|       2022|   8.0|  1000|     1.0|                   1.0|               1.0|
|               5|       2018|       2022|  16.0|  1000|     1.0|                   1.0|               1.0|
|               5|       2018|       2022|  16.0|   500|     1.0|                   1.0|               1.0|
|               5|       2017|       2021|   8.0|   500|     1.0|                   3.0|               1.0|
|               5|       201

In [6]:
dfnew = df.select(['features', 'Price'])
dfnew.show()

+--------------------+-----+
|            features|Price|
+--------------------+-----+
|[5.0,2018.0,2022....|40000|
|[5.0,2017.0,2021....|30000|
|[5.0,2018.0,2022....|50000|
|[5.0,2018.0,2022....|60000|
|[5.0,2018.0,2022....|55000|
|[5.0,2017.0,2021....|40000|
|[5.0,2018.0,2022....|60000|
|[5.0,2018.0,2022....|55000|
|[5.0,2018.0,2022....|52000|
|[5.0,2018.0,2022....|65000|
|[5.0,2017.0,2021....|50000|
|[5.0,2018.0,2022....|70000|
|[5.0,2018.0,2022....|65000|
|[5.0,2018.0,2022....|62000|
|[5.0,2018.0,2022....|75000|
|[3.0,2018.0,2022....|45000|
|[3.0,2017.0,2021....|35000|
|[3.0,2018.0,2022....|55000|
|[3.0,2018.0,2022....|65000|
|[3.0,2018.0,2022....|60000|
+--------------------+-----+
only showing top 20 rows



# First Step - we will use Linear Regression

In [32]:
splits = dfnew.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

lr = LinearRegression(featuresCol = 'features', labelCol='Price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [-1128.4066787008942,7185.416355524314,7185.416651897027,1526.6513784159288,8.843205351639696,-17545.41708797923,6276.102850753308,2256.8133574478547]
Intercept: -28987218.444887202
RMSE: 4465.783575
r2: 0.881976


In [33]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","Price").show()

+-----------------+-----+
|       prediction|Price|
+-----------------+-----+
|46539.50953545794|55000|
|60910.34254287556|67000|
|73123.55357020348|65000|
|42025.88282060251|40000|
|42025.88282060251|50000|
|43844.51012651622|40000|
|56396.71582802385|52000|
|68609.92685535178|60000|
+-----------------+-----+



In [34]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Price",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

R Squared (R2) on test data = 0.524867


In [35]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 6619.94


In [36]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.5, 0.3723252375486345, 0.17440322370019581, 0.11071190053855992, 0.06869976291590323, 0.061615305283811206, 0.059399111387475195, 0.05912579431788037, 0.059060107535151805, 0.05904960568591024, 0.05904926859360576]
+------------------+
|         residuals|
+------------------+
| 1012.696166049689|
|-1539.509535457939|
| -3358.13684136793|
|-3910.342542875558|
| 2220.260482814163|
|-5331.945218693465|
| 4668.054781306535|
|-571.3478686958551|
| 1876.446429796517|
| 7.049455486238003|
| -7545.15624602139|
| 12454.84375397861|
|0.2608419582247734|
| 526.3228809051216|
| 5603.284171976149|
|1733.8871976658702|
|-5818.318503841758|
| 4181.681496158242|
|-1057.721153844148|
|1390.0731446482241|
+------------------+
only showing top 20 rows



# we achieved worse RMSE and R squared on the test set and traing set


# Now we will try different algorithm-Decision tree regression

In [37]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Price')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 9033.12


In [41]:
dt_model.featureImportances
#['Year_of_Mfg', 'Expiry_Year', 'Spec_1', 'Spec_2', 'Manufactured_Company_d', 'Supplier_Country_d']

SparseVector(8, {0: 0.0545, 1: 0.5218, 3: 0.1527, 4: 0.0212, 5: 0.0489, 6: 0.201})

In [42]:
#Spec_1 seems to be most important feature for this 

# Now we will try for Gradient-boosted tree regression

In [43]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'Price', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)

gbt_evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 8545.09


In [44]:
gbt_predictions.select('Price','prediction').show()

+-----+-----------------+
|Price|       prediction|
+-----+-----------------+
|55000|          45000.0|
|67000|          57000.0|
|65000|75447.39242666666|
|40000|          45000.0|
|50000|          45000.0|
|40000|          45000.0|
|52000|          62000.0|
|60000|          70000.0|
+-----+-----------------+

