In [0]:
# Install all the dependencies in Colab environment i.e. Apache Spark 2.4.4 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# Setup Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
# Start Spark Session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
#Necessary Python Libraries
import pandas as pd

In [7]:
#Upload Student_Grades_Data.csv file from local system to remote colab location
from google.colab import files
files.upload()

Saving Restaurant_Profit_Data.csv to Restaurant_Profit_Data.csv


{'Restaurant_Profit_Data.csv': b'Miscellaneous_Expenses,Food_Innovation_Spend,Advertising,City,Profit\r\n138671.8,167497.2,475918.1,Chicago,202443.83\r\n153151.59,164745.7,448032.53,Mumbai,201974.06\r\n102919.55,155589.51,412068.54,Tokyo,201232.39\r\n120445.85,146520.41,387333.62,Chicago,193083.99\r\n93165.77,144255.34,370302.42,Tokyo,176369.94\r\n101588.71,134024.9,366995.36,Chicago,167173.12\r\n148972.87,136763.46,131850.82,Mumbai,166304.51\r\n147304.06,132446.13,328010.68,Tokyo,165934.6\r\n150492.95,122690.52,315747.29,Chicago,162393.77\r\n110453.17,125482.88,309115.62,Mumbai,159941.96\r\n112368.11,104061.08,233294.95,Tokyo,156303.95\r\n93564.61,102819.96,253878.55,Mumbai,154441.4\r\n129094.38,96011.75,253973.44,Tokyo,151767.52\r\n137269.07,94140.39,256798.93,Mumbai,144489.35\r\n158321.42,122091.24,260646.92,Tokyo,142784.65\r\n124390.84,116671.61,265910.23,Chicago,140099.04\r\n123371.55,80161.11,268480.06,Mumbai,137174.93\r\n146851.58,96805.16,286708.31,Chicago,135552.37\r\n115949.7

In [0]:
#Loading the Student_Grades_Data.csv file, uploaded in previous step
data = spark.read.csv('Restaurant_Profit_Data.csv', header=True, inferSchema=True)

In [9]:
#Taking a look at data type of each column to see what data types inferSchema=TRUE paramter has set for each column
data.printSchema()

root
 |-- Miscellaneous_Expenses: double (nullable = true)
 |-- Food_Innovation_Spend: double (nullable = true)
 |-- Advertising: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Profit: double (nullable = true)



In [10]:
#Display first few rows of data
data.show()

+----------------------+---------------------+-----------+-------+---------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+----------------------+---------------------+-----------+-------+---------+
|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|             147304.06|            132446.13|  328010.68|  Tokyo| 165934.6|
|             150492.95|            122690.52|  315747.29|Chicago|162393.77|
|             110453.17|            125482.88|  309115.62| Mumbai|159941.96|

In [11]:
#Display data types of the data columns.
data.dtypes

[('Miscellaneous_Expenses', 'double'),
 ('Food_Innovation_Spend', 'double'),
 ('Advertising', 'double'),
 ('City', 'string'),
 ('Profit', 'double')]

In [16]:
#Create features storing categorical & numerical variables, omitting the last column
categorical_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

['City']
['Miscellaneous_Expenses', 'Food_Innovation_Spend', 'Advertising']


In [17]:
#Print number of categorical as well as numerical features.
print(str(len(categorical_cols)) + '  categorical features')
print(str(len(numerical_cols)) + '  numerical features')

1  categorical features
3  numerical features


In [0]:
# First using StringIndexer to convert string/text values into numerical values followed by OneHotEncoderEstimator 
# Spark MLLibto convert each Stringindexed or transformed values into One Hot Encoded values.
# VectorAssembler is being used to assemble all the features into one vector from multiple columns that contain type double 
# Also appending every step of the process in a stages array
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
stages = []
for categoricalCol in categorical_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    OHencoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_catVec"])
stages += [stringIndexer, OHencoder]
assemblerInputs = [c + "_catVec" for c in categorical_cols] + numerical_cols
Vectassembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [Vectassembler]

In [0]:
# Using a Spark MLLib pipeline to apply all the stages of transformation
from pyspark.ml import Pipeline
cols = data.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)
selectedCols = ['features']+cols
data = data.select(selectedCols)
pd.DataFrame(data.take(5), columns=data.columns)

Unnamed: 0,features,Miscellaneous_Expenses,Food_Innovation_Spend,Advertising,City,Profit
0,"[1.0, 0.0, 138671.8, 167497.2, 475918.1]",138671.8,167497.2,475918.1,Chicago,202443.83
1,"[0.0, 1.0, 153151.59, 164745.7, 448032.53]",153151.59,164745.7,448032.53,Mumbai,201974.06
2,"[0.0, 0.0, 102919.55, 155589.51, 412068.54]",102919.55,155589.51,412068.54,Tokyo,201232.39
3,"[1.0, 0.0, 120445.85, 146520.41, 387333.62]",120445.85,146520.41,387333.62,Chicago,193083.99
4,"[0.0, 0.0, 93165.77, 144255.34, 370302.42]",93165.77,144255.34,370302.42,Tokyo,176369.94


In [0]:
#Display the data having additional column named features. Since it's a multiple linear regression problem, hence all the
# independent variable values are shown as one vector
data.show()

+--------------------+----------------------+---------------------+-----------+-------+---------+
|            features|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+--------------------+----------------------+---------------------+-----------+-------+---------+
|[1.0,0.0,138671.8...|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|[0.0,1.0,153151.5...|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|[0.0,0.0,102919.5...|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|[1.0,0.0,120445.8...|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|[0.0,0.0,93165.77...|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|[1.0,0.0,101588.7...|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|[0.0,1.0,148972.8...|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|[0.0,0.0,147304.0..

In [0]:
#Select only Features and Label from previous dataset as we need these two entities for building machine learning model
finalized_data = data.select("features","Profit")

finalized_data.show()

+--------------------+---------+
|            features|   Profit|
+--------------------+---------+
|[1.0,0.0,138671.8...|202443.83|
|[0.0,1.0,153151.5...|201974.06|
|[0.0,0.0,102919.5...|201232.39|
|[1.0,0.0,120445.8...|193083.99|
|[0.0,0.0,93165.77...|176369.94|
|[1.0,0.0,101588.7...|167173.12|
|[0.0,1.0,148972.8...|166304.51|
|[0.0,0.0,147304.0...| 165934.6|
|[1.0,0.0,150492.9...|162393.77|
|[0.0,1.0,110453.1...|159941.96|
|[0.0,0.0,112368.1...|156303.95|
|[0.0,1.0,93564.61...| 154441.4|
|[0.0,0.0,129094.3...|151767.52|
|[0.0,1.0,137269.0...|144489.35|
|[0.0,0.0,158321.4...|142784.65|
|[1.0,0.0,124390.8...|140099.04|
|[0.0,1.0,123371.5...|137174.93|
|[1.0,0.0,146851.5...|135552.37|
|[0.0,0.0,115949.7...| 134448.9|
|[1.0,0.0,155288.1...|132958.86|
+--------------------+---------+
only showing top 20 rows



In [0]:
#Split the data into training and test model with 70% obs. going in training and 30% in testing
train_dataset, test_dataset = finalized_data.randomSplit([0.7, 0.3])

In [0]:
#Import Linear Regression class called LinearRegression
from pyspark.ml.regression import LinearRegression

In [0]:
#Create the Multiple Linear Regression object named MLR having feature column as features and Label column as Profit
MLR = LinearRegression(featuresCol="features", labelCol="Profit")

In [0]:
#Train the model on the training using fit() method.
model = MLR.fit(train_dataset)

In [0]:
#Predict the Profit on Test Dataset using the evulate method
pred = model.evaluate(test_dataset)

In [0]:
#Show the predicted Grade values along side actual Grade values
pred.predictions.show()

+--------------------+---------+------------------+
|            features|   Profit|        prediction|
+--------------------+---------+------------------+
|[0.0,0.0,93165.77...|176369.94|179064.27054716123|
|[0.0,0.0,102919.5...|201232.39|189845.34056382696|
|[0.0,0.0,107525.0...|118915.99|117008.74395330332|
|[0.0,0.0,112368.1...|156303.95|142557.38734089918|
|[0.0,0.0,117415.2...|110119.59|105881.04987600169|
|[0.0,0.0,129094.3...|151767.52| 137125.9342443147|
|[0.0,1.0,53057.14...|100131.14| 95218.20303011732|
|[0.0,1.0,97963.63...| 81680.49| 78221.58315015868|
|[0.0,1.0,115641.3...|128656.03|126171.48795409381|
|[0.0,1.0,137269.0...|144489.35| 137993.1943693853|
|[0.0,1.0,148972.8...|166304.51|168805.09079625408|
|[0.0,1.0,153151.5...|201974.06|201788.20465140056|
|[1.0,0.0,67721.93...| 91411.06| 74219.59182840605|
|[1.0,0.0,86821.44...|106661.51| 96255.78280649436|
|[1.0,0.0,101588.7...|167173.12|171295.04851100093|
|[1.0,0.0,125927.0...| 75108.08|54200.535416892046|
|[1.0,0.0,15

In [0]:
#Find out coefficient value
coefficient = model.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([414.6953, 2084.9539, 0.0254, 0.8105, 0.0323])


In [0]:
#Find out intercept Value
intercept = model.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : 47837.870166


In [0]:
#Evaluate the model using metric like Mean Absolute Error(MAE), Root Mean Square Error(RMSE) and R-Square
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="Profit", predictionCol="prediction")

# r2 - coefficient of determination
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.939


In [0]:
#Create Unlabeled dataset  to contain only feature column
unlabeled_dataset = test_dataset.select('features')

In [0]:
#Display the content of unlabeled_dataset
unlabeled_dataset.show()

+--------------------+
|            features|
+--------------------+
|[0.0,0.0,93165.77...|
|[0.0,0.0,102919.5...|
|[0.0,0.0,107525.0...|
|[0.0,0.0,112368.1...|
|[0.0,0.0,117415.2...|
|[0.0,0.0,129094.3...|
|[0.0,1.0,53057.14...|
|[0.0,1.0,97963.63...|
|[0.0,1.0,115641.3...|
|[0.0,1.0,137269.0...|
|[0.0,1.0,148972.8...|
|[0.0,1.0,153151.5...|
|[1.0,0.0,67721.93...|
|[1.0,0.0,86821.44...|
|[1.0,0.0,101588.7...|
|[1.0,0.0,125927.0...|
|[1.0,0.0,154806.0...|
|[1.0,0.0,155547.4...|
+--------------------+



In [0]:
#Predict the model output for fresh & unseen test data using transform() method
new_predictions = model.transform(unlabeled_dataset)

In [0]:
#Display the new prediction values
new_predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.0,0.0,93165.77...|179064.27054716123|
|[0.0,0.0,102919.5...|189845.34056382696|
|[0.0,0.0,107525.0...|117008.74395330332|
|[0.0,0.0,112368.1...|142557.38734089918|
|[0.0,0.0,117415.2...|105881.04987600169|
|[0.0,0.0,129094.3...| 137125.9342443147|
|[0.0,1.0,53057.14...| 95218.20303011732|
|[0.0,1.0,97963.63...| 78221.58315015868|
|[0.0,1.0,115641.3...|126171.48795409381|
|[0.0,1.0,137269.0...| 137993.1943693853|
|[0.0,1.0,148972.8...|168805.09079625408|
|[0.0,1.0,153151.5...|201788.20465140056|
|[1.0,0.0,67721.93...| 74219.59182840605|
|[1.0,0.0,86821.44...| 96255.78280649436|
|[1.0,0.0,101588.7...|171295.04851100093|
|[1.0,0.0,125927.0...|54200.535416892046|
|[1.0,0.0,154806.0...|110689.29224789032|
|[1.0,0.0,155547.4...| 127281.2801372062|
+--------------------+------------------+

