# **Employee Details Dataset**

1. Observe the Dataset.
2. This dataset is all about Employee Details.
3. Specify Appropriate Data types?
4. What is the Target Variable in this problem?
5. Find what type of Regression Problem it is?
6. Choose appropriate "Error Metrics" based on regression problem type?

In [1]:
# Install the Required Packages
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 50.1 MB/s 
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 67.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=64e17eea145030ba09b4423dbb370b438d62f54ce23c393043fca7e689a982c6
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
# Mount the Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Create a SparkSession and Specify relevant AppName
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Employees_Data").getOrCreate()

In [4]:
# import the dataframe sql data types
from pyspark.sql.types import *

In [5]:
# Use the dataframe reader to read the file and Display Data
EmpSchema = StructType([
  StructField("EmployeeID", IntegerType(), False),
  StructField("Name", StringType(), False),
  StructField("Age", IntegerType(), False),
  StructField("DeptID", IntegerType(), False),
  StructField("Salary", IntegerType(), False)
])



In [6]:
emp = spark.read.csv('/content/drive/My Drive/sparkml/EmployeeDataNoHeader.csv', schema=EmpSchema, header=True)

In [7]:
emp.show(20)

+----------+----+---+------+------+
|EmployeeID|Name|Age|DeptID|Salary|
+----------+----+---+------+------+
|       111| bbb| 45|     2|  3000|
|       112| aaa| 50|     1|  2500|
|       113| bbb| 35|     2|  3000|
|       104| aaa| 40|     1|  2000|
|       105| bbb| 40|     2|  3500|
|       106| aaa| 35|     1|  2000|
|       107| bbb| 45|     2|  4500|
|       108| aaa| 50|     1|  2000|
|       109| bbb| 35|     2|  3000|
+----------+----+---+------+------+



In [8]:
# Print Schema
emp.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- DeptID: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [9]:
# Display Column Names
emp.columns

['EmployeeID', 'Name', 'Age', 'DeptID', 'Salary']

In [10]:
print(emp.corr('EmployeeID','Salary'))
print(emp.corr('Age','Salary'))
print(emp.corr('DeptID','Salary'))

0.07150969419341961
5.037797649556499e-17
0.8104432008587533


In [11]:
# Import VectorAssembler & Use it
# Assign a variable 'df1' : Take Age and DepartmentID and create a "Newinputcolumn"
from pyspark.ml.feature import VectorAssembler
df1=VectorAssembler(inputCols=['Age','DeptID'],outputCol='Newinputcolumn')

In [12]:
# Transform the data using above assigned variable.
data=df1.transform(emp)

In [13]:
# Display the transformed Data
data.show()

+----------+----+---+------+------+--------------+
|EmployeeID|Name|Age|DeptID|Salary|Newinputcolumn|
+----------+----+---+------+------+--------------+
|       111| bbb| 45|     2|  3000|    [45.0,2.0]|
|       112| aaa| 50|     1|  2500|    [50.0,1.0]|
|       113| bbb| 35|     2|  3000|    [35.0,2.0]|
|       104| aaa| 40|     1|  2000|    [40.0,1.0]|
|       105| bbb| 40|     2|  3500|    [40.0,2.0]|
|       106| aaa| 35|     1|  2000|    [35.0,1.0]|
|       107| bbb| 45|     2|  4500|    [45.0,2.0]|
|       108| aaa| 50|     1|  2000|    [50.0,1.0]|
|       109| bbb| 35|     2|  3000|    [35.0,2.0]|
+----------+----+---+------+------+--------------+



In [14]:
# Consider the transformed data i.e., newinputcolumn & Salary assigned it to a different variable 'df2'
df2 = data.select("newinputcolumn", "Salary")


In [15]:
# Display the transfomed data 
df2.limit(10).show()

+--------------+------+
|newinputcolumn|Salary|
+--------------+------+
|    [45.0,2.0]|  3000|
|    [50.0,1.0]|  2500|
|    [35.0,2.0]|  3000|
|    [40.0,1.0]|  2000|
|    [40.0,2.0]|  3500|
|    [35.0,1.0]|  2000|
|    [45.0,2.0]|  4500|
|    [50.0,1.0]|  2000|
|    [35.0,2.0]|  3000|
+--------------+------+



In [25]:
# Import the regresion suitable
from pyspark.ml.regression import LinearRegression
# Split the Data into 75 & 25 for train & test Datasets assign train_data  &  test_data
splits = df2.randomSplit([0.75, 0.25])

train_data = splits[0]
test_data = splits[1]

train_rows = train_data.count()
test_rows = test_data.count()

print ("Training rows count:", train_rows, " Testing rows count:", test_rows)

Training rows count: 6  Testing rows count: 3


In [26]:
train_data.show()

+--------------+------+
|newinputcolumn|Salary|
+--------------+------+
|    [35.0,2.0]|  3000|
|    [40.0,1.0]|  2000|
|    [45.0,2.0]|  3000|
|    [45.0,2.0]|  4500|
|    [50.0,1.0]|  2000|
|    [50.0,1.0]|  2500|
+--------------+------+



In [27]:
test_data.show()

+--------------+------+
|newinputcolumn|Salary|
+--------------+------+
|    [35.0,1.0]|  2000|
|    [35.0,2.0]|  3000|
|    [40.0,2.0]|  3500|
+--------------+------+



In [28]:
# Apply Regression and specify the Features & Target Columns 
lr = LinearRegression(featuresCol = 'newinputcolumn', labelCol='Salary')


In [29]:
# Fit on Appropriate Dataset
train_model = lr.fit(train_data)

In [30]:
train_model

LinearRegressionModel: uid=LinearRegression_af2be1d46907, numFeatures=2

In [31]:
# Display Coefficients
print("Coefficients: " + str(train_model.coefficients))

Coefficients: [49.99999999999955,1583.3333333333298]


In [32]:
# Display Intercept
print("Intercept: " + str(train_model.intercept))

Intercept: -1749.9999999999754


In [33]:
# Evaluate one of the Dataset
results=train_model.evaluate(train_data)
print('The r2 in train data set is',results.r2)
print('The MSE in the train data set is',results.meanSquaredError)
print('The MAE in train data set is',results.meanAbsoluteError)
print('The RMSE in train data set is',results.rootMeanSquaredError)

The r2 in train data set is 0.6923076923076923
The MSE in the train data set is 222222.22222222225
The MAE in train data set is 388.8888888888887
The RMSE in train data set is 471.4045207910317


In [41]:
# Display the Predictions
pred = train_model.transform(test_data)
pred.show()


+--------------+------+------------------+
|newinputcolumn|Salary|        prediction|
+--------------+------+------------------+
|    [35.0,1.0]|  2000|1583.3333333333385|
|    [35.0,2.0]|  3000| 3166.666666666669|
|    [40.0,2.0]|  3500| 3416.666666666666|
+--------------+------+------------------+



In [42]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Salary",metricName="r2")
results1=evaluator.evaluate(pred)


In [44]:
# Display the error metrics : Absolute & Squared.
results1=train_model.evaluate(test_data)
print('The r2 in test data set is',results1.r2)
print('The MSE in the test data set is',results1.meanSquaredError)
print('The MAE in test data set is',results1.meanAbsoluteError)
print('The RMSE in test data set is',results1.rootMeanSquaredError)

The r2 in test data set is 0.8214285714285744
The MSE in the test data set is 69444.44444444329
The MAE in test data set is 222.2222222222214
The RMSE in test data set is 263.52313834736276
