In [45]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/user/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv_lche329_ridge').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.regression import LinearRegression

In [46]:
df = spark.read.load('./Sales_data/sales_en.csv',format='csv',header='true',inferSchema = True)

In [47]:
# Print the schema of the DataFrame. You can see potential features as well as the predictor.
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Sales Representative: string (nullable = true)
 |-- Hospital Name: string (nullable = true)
 |-- Hospital Attribute: string (nullable = true)
 |-- Hospital Code: string (nullable = true)
 |-- Purchasing Price: double (nullable = true)
 |-- Selling Price: double (nullable = true)
 |-- IMF: string (nullable = true)
 |-- Hospital Class: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Client Name: string (nullable = true)
 |-- Sales Volume: double (nullable = true)
 |-- Profits: double (nullable = true)
 |-- Satisfaction: string (nullable = true)



In [48]:
df.head()

Row(ID=1, Year=2016, Product Name='Corbrin Capsule', Unit='Dept. 2', Region='Wuhan', Sales Representative='Xiongting', Hospital Name='Huazhongkejidaxuetongjiyixueyuanfushuxieheyiyuan', Hospital Attribute='Ministerial hospital', Hospital Code='ADXH', Purchasing Price=47.14, Selling Price=63.14, IMF='1571181790', Hospital Class='Third Class', Department='Shenneike', Client Name='Denganguo', Sales Volume=236.0, Profits=3776.0, Satisfaction='Y')

In [49]:
# A simple for loop allows us to make it even clearer. 
for item in df.head():
    print(item)

1
2016
Corbrin Capsule
Dept. 2
Wuhan
Xiongting
Huazhongkejidaxuetongjiyixueyuanfushuxieheyiyuan
Ministerial hospital
ADXH
47.14
63.14
1571181790
Third Class
Shenneike
Denganguo
236.0
3776.0
Y


In [50]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [51]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
assembler = VectorAssembler(
    inputCols=["Sales Representative", "Hospital Attribute", 
               "Selling Price","Hospital Class","Department","Sales Volume"],
    outputCol="features")

In [52]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Sales Representative", outputCol="Sales_Representative_Index")
df_indexed = indexer.fit(df).transform(df)
indexer = StringIndexer(inputCol="Hospital Attribute", outputCol="Hospital_Attribute_Index")
df_indexed = indexer.fit(df_indexed).transform(df_indexed)
indexer = StringIndexer(inputCol="Hospital Class", outputCol="Hospital_Class_Index")
df_indexed = indexer.fit(df_indexed).transform(df_indexed)
indexer = StringIndexer(inputCol="Department", outputCol="Department_Index")
df_indexed = indexer.fit(df_indexed).transform(df_indexed)



In [53]:
df_indexed.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Sales Representative: string (nullable = true)
 |-- Hospital Name: string (nullable = true)
 |-- Hospital Attribute: string (nullable = true)
 |-- Hospital Code: string (nullable = true)
 |-- Purchasing Price: double (nullable = true)
 |-- Selling Price: double (nullable = true)
 |-- IMF: string (nullable = true)
 |-- Hospital Class: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Client Name: string (nullable = true)
 |-- Sales Volume: double (nullable = true)
 |-- Profits: double (nullable = true)
 |-- Satisfaction: string (nullable = true)
 |-- Sales_Representative_Index: double (nullable = true)
 |-- Hospital_Attribute_Index: double (nullable = true)
 |-- Hospital_Class_Index: double (nullable = true)
 |-- Department_Index: double (nullable = true)



In [54]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
assembler = VectorAssembler(
    inputCols=["Sales_Representative_Index", "Hospital_Attribute_Index", 
               "Selling Price","Hospital_Class_Index","Department_Index","Sales Volume"],
    outputCol="features")

In [55]:
# Now that we've created the assembler variable, let's actually transform the data.
output = assembler.transform(df_indexed)

In [56]:
# Using print schema, you see that the features output column has been added. 
output.printSchema()

# You can see that the features column is a dense vector that combines the various features as expected.
output.head(1)

root
 |-- ID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Sales Representative: string (nullable = true)
 |-- Hospital Name: string (nullable = true)
 |-- Hospital Attribute: string (nullable = true)
 |-- Hospital Code: string (nullable = true)
 |-- Purchasing Price: double (nullable = true)
 |-- Selling Price: double (nullable = true)
 |-- IMF: string (nullable = true)
 |-- Hospital Class: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Client Name: string (nullable = true)
 |-- Sales Volume: double (nullable = true)
 |-- Profits: double (nullable = true)
 |-- Satisfaction: string (nullable = true)
 |-- Sales_Representative_Index: double (nullable = true)
 |-- Hospital_Attribute_Index: double (nullable = true)
 |-- Hospital_Class_Index: double (nullable = true)
 |-- Department_Index: double (nullable = true)
 |-- features

[Row(ID=1, Year=2016, Product Name='Corbrin Capsule', Unit='Dept. 2', Region='Wuhan', Sales Representative='Xiongting', Hospital Name='Huazhongkejidaxuetongjiyixueyuanfushuxieheyiyuan', Hospital Attribute='Ministerial hospital', Hospital Code='ADXH', Purchasing Price=47.14, Selling Price=63.14, IMF='1571181790', Hospital Class='Third Class', Department='Shenneike', Client Name='Denganguo', Sales Volume=236.0, Profits=3776.0, Satisfaction='Y', Sales_Representative_Index=139.0, Hospital_Attribute_Index=3.0, Hospital_Class_Index=0.0, Department_Index=3.0, features=DenseVector([139.0, 3.0, 63.14, 0.0, 3.0, 236.0]))]

In [57]:
# Let's select two columns (the feature and predictor).
# This is now in the appropriate format to be processed by Spark.
final_data = output.select("features",'Profits')
final_data.show()

+--------------------+-------+
|            features|Profits|
+--------------------+-------+
|[139.0,3.0,63.14,...| 3776.0|
|[139.0,3.0,63.14,...| 5072.0|
|[139.0,3.0,63.14,...| 4592.0|
|[139.0,3.0,63.14,...| 2560.0|
|[139.0,3.0,63.14,...| 1920.0|
|[139.0,3.0,63.14,...| 1104.0|
|[139.0,3.0,63.14,...| 2064.0|
|[139.0,3.0,63.14,...| 2752.0|
|[139.0,3.0,63.14,...| 3392.0|
|[139.0,3.0,63.14,...| 4960.0|
|[139.0,3.0,63.14,...| 2448.0|
|[139.0,3.0,63.14,...|  288.0|
|[139.0,3.0,63.14,...|    0.0|
|[139.0,3.0,63.14,...|    0.0|
|[139.0,3.0,63.14,...| 3536.0|
|[139.0,3.0,63.14,...|    0.0|
|[139.0,3.0,63.14,...| 9488.0|
|[139.0,3.0,63.14,...|17760.0|
|[139.0,3.0,63.14,...|  336.0|
|[139.0,3.0,63.14,...|  336.0|
+--------------------+-------+
only showing top 20 rows



In [58]:
# Let's do a randomised 70/30 split. 
# Remember, you can use other splits depending on how easy/difficult it is to train your model.
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [59]:
# Let's see our training data.
train_data.describe().show()

# And our testing data.
test_data.describe().show()

+-------+------------------+
|summary|           Profits|
+-------+------------------+
|  count|             28260|
|   mean|232.25109366979714|
| stddev| 939.2765494938908|
|    min|           -130.02|
|    max|           38656.0|
+-------+------------------+

+-------+------------------+
|summary|           Profits|
+-------+------------------+
|  count|             12025|
|   mean|240.44791155954343|
| stddev| 853.1370655273175|
|    min|           -130.02|
|    max|          19714.56|
+-------+------------------+



In [60]:
#elasticNetParam=0,the trained model reduces to a ridge regression model.
lr = LinearRegression(maxIter=10,regParam=0.3,elasticNetParam=0,labelCol='Profits')

In [61]:
# Fit the model to the data.
lrModel = lr.fit(train_data)

In [62]:
# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [0.525608526986735,1.3505251313385196,0.2710413982144721,43.80632294458547,2.8323708701814834,12.126948646119379] Intercept: -80.16871978574002


In [63]:
# Let's evaluate the model against the test data.
test_results = lrModel.evaluate(test_data)

In [64]:
# Interesting results! This shows the difference between the predicted value and the test data.
test_results.residuals.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME_Ridge: {}".format(test_results.rootMeanSquaredError))

+------------------+
|         residuals|
+------------------+
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
|62.003948848504784|
| 64.40334337751017|
| 61.03158838372214|
| 61.03158838372214|
| 53.59421241671703|
| 53.59421241671703|
| 53.59421241671703|
| 53.27980439478824|
| 44.34445943601375|
| 49.93689062337567|
| 40.45399924204865|
| 40.45399924204865|
+------------------+
only showing top 20 rows

RSME_Ridge: 585.0944375559258


In [65]:
# We can also get the R2 value. 
print("R2_Ridge: {}".format(test_results.r2))

R2_Ridge: 0.5296182813868564


In [66]:
final_data.describe().show()

+-------+------------------+
|summary|           Profits|
+-------+------------------+
|  count|             40285|
|   mean|234.69782905826548|
| stddev| 914.4112274804912|
|    min|           -130.02|
|    max|           38656.0|
+-------+------------------+

