In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Libraries

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# For Encoding
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline

# Boosting
from pyspark.ml.regression import GBTRegressor

# Evaluation
from pyspark.ml.evaluation import RegressionEvaluator 

### Initializing Spark

In [None]:
spark_context = SparkContext(appName="BIA678_D3")
spark_context

In [None]:
spark_session = SparkSession(spark_context)
spark_session

In [None]:
sql_context = SQLContext(spark_session.sparkContext)
sql_context



<pyspark.sql.context.SQLContext at 0x7fab279fe290>

### Loading Datasets

In [None]:
data_details = spark_session.read.csv("train_dataset.csv", header=True)
data_details.show(5)
data_details.printSchema()

+----------------+---------+--------------+-----------+---------+--------+---------------+-------------------+
|           jobId|companyId|       jobType|     degree|    major|industry|yearsExperience|milesFromMetropolis|
+----------------+---------+--------------+-----------+---------+--------+---------------+-------------------+
|JOB1362684407687|   COMP37|           CFO|    MASTERS|     MATH|  HEALTH|             10|                 83|
|JOB1362684407688|   COMP19|           CEO|HIGH_SCHOOL|     NONE|     WEB|              3|                 73|
|JOB1362684407689|   COMP52|VICE_PRESIDENT|   DOCTORAL|  PHYSICS|  HEALTH|             10|                 38|
|JOB1362684407690|   COMP38|       MANAGER|   DOCTORAL|CHEMISTRY|    AUTO|              8|                 17|
|JOB1362684407691|    COMP7|VICE_PRESIDENT|  BACHELORS|  PHYSICS| FINANCE|              8|                 16|
+----------------+---------+--------------+-----------+---------+--------+---------------+-------------------+
o

In [None]:
data_salaries = spark_session.read.csv("train_salaries.csv", header=True)
data_salaries.show(5)
data_salaries.printSchema()

+----------------+------+
|           jobId|salary|
+----------------+------+
|JOB1362684407687|   130|
|JOB1362684407688|   101|
|JOB1362684407689|   137|
|JOB1362684407690|   142|
|JOB1362684407691|   163|
+----------------+------+
only showing top 5 rows

root
 |-- jobId: string (nullable = true)
 |-- salary: string (nullable = true)



In [None]:
# As seen in the schema above, the columne: 'salary' from data_salaries, 
# 'yearsExperience' and 'milesFromMetropolis' from data_details are string.
# These columns must be casted to numeric datatype

data_salaries = data_salaries.withColumn("salary", data_salaries.salary.cast('int'))
data_salaries.printSchema()

data_details = data_details.withColumn("yearsExperience",data_details.yearsExperience.cast('int'))
data_details = data_details.withColumn("milesFromMetropolis",data_details.milesFromMetropolis.cast('int'))
data_details.printSchema()

root
 |-- jobId: string (nullable = true)
 |-- salary: integer (nullable = true)

root
 |-- jobId: string (nullable = true)
 |-- companyId: string (nullable = true)
 |-- jobType: string (nullable = true)
 |-- degree: string (nullable = true)
 |-- major: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- yearsExperience: integer (nullable = true)
 |-- milesFromMetropolis: integer (nullable = true)



### Merging the two pyspark dataframes

In [None]:
print("Data Details Dataframe columns: ", data_details.columns)
print("Data Salaries Dataframe columns: ", data_salaries.columns)

Data Details Dataframe columns:  ['jobId', 'companyId', 'jobType', 'degree', 'major', 'industry', 'yearsExperience', 'milesFromMetropolis']
Data Salaries Dataframe columns:  ['jobId', 'salary']


In [None]:
data = data_details.join(data_salaries, ['jobId'])

In [None]:
data.show(5)

+----------------+---------+-------+-----------+---------+---------+---------------+-------------------+------+
|           jobId|companyId|jobType|     degree|    major| industry|yearsExperience|milesFromMetropolis|salary|
+----------------+---------+-------+-----------+---------+---------+---------------+-------------------+------+
|JOB1362684407687|   COMP37|    CFO|    MASTERS|     MATH|   HEALTH|             10|                 83|   130|
|JOB1362684407688|   COMP19|    CEO|HIGH_SCHOOL|     NONE|      WEB|              3|                 73|   101|
|JOB1362684407690|   COMP38|MANAGER|   DOCTORAL|CHEMISTRY|     AUTO|              8|                 17|   142|
|JOB1362684407693|   COMP15|    CFO|       NONE|     NONE|   HEALTH|             23|                 24|   178|
|JOB1362684407694|   COMP24| JUNIOR|  BACHELORS|CHEMISTRY|EDUCATION|              9|                 70|    73|
+----------------+---------+-------+-----------+---------+---------+---------------+-------------------+

### Managing missing values

In [None]:
# As noticed, there are some 'NONE' values. We've to handle those.
for column in data.columns:
  print(f"{column} column's total 'NONE' values: ", data.filter(data[column]=='NONE').count())

jobId column's total 'NONE' values:  0
companyId column's total 'NONE' values:  0
jobType column's total 'NONE' values:  0
degree column's total 'NONE' values:  236854
major column's total 'NONE' values:  532355
industry column's total 'NONE' values:  0
yearsExperience column's total 'NONE' values:  0
milesFromMetropolis column's total 'NONE' values:  0
salary column's total 'NONE' values:  0


In [None]:
# Dropping the rows with NONE values
data = data.filter((data.degree!='NONE') & (data.major!='NONE'))

### Dataset Insights

In [None]:
# Creating SparkSQL view to perform SQL queries.
data.createOrReplaceTempView("data")

#### 1. Highest Paid Job

In [None]:
spark_session.sql('''
    SELECT salary, jobtype
    FROM data
    WHERE salary == (SELECT MAX(salary) FROM data)''').show()

+------+-------+
|salary|jobtype|
+------+-------+
|   301|    CFO|
|   301|    CTO|
+------+-------+



The highset paid job is for the position of CFO and CTO, having salary of $301,000 per year.

#### 2. Top 10 Highest Paying Jobs

In [None]:
print("Top 10 Highest paying job:\n")
spark_session.sql('''
          SELECT salary, jobtype, industry
          FROM data
          ORDER BY salary DESC
          LIMIT 10''').show()

Top 10 Highest paying job:

+------+-------+--------+
|salary|jobtype|industry|
+------+-------+--------+
|   301|    CFO|     OIL|
|   301|    CTO|     OIL|
|   298|    CEO|     OIL|
|   294|    CEO| FINANCE|
|   294|    CEO|     OIL|
|   293|    CEO| FINANCE|
|   293|    CEO|     OIL|
|   292|    CTO| FINANCE|
|   292|    CEO| FINANCE|
|   290|    CEO|     WEB|
+------+-------+--------+



#### 3. Top 10 Lowest Paying Jobs

In [None]:
print("Top 10 Least paying job:\n")
spark_session.sql('''
          SELECT salary, jobtype, industry
          FROM data
          WHERE salary != 0
          ORDER BY salary ASC
          LIMIT 10''').show()

Top 10 Least paying job:

+------+-------+---------+
|salary|jobtype| industry|
+------+-------+---------+
|    35| JUNIOR|EDUCATION|
|    37| JUNIOR|  SERVICE|
|    37| JUNIOR|  SERVICE|
|    38| JUNIOR|EDUCATION|
|    38| JUNIOR|  SERVICE|
|    39| JUNIOR|EDUCATION|
|    40| JUNIOR|EDUCATION|
|    40| JUNIOR|EDUCATION|
|    40| JUNIOR|EDUCATION|
|    40| JUNIOR|  SERVICE|
+------+-------+---------+



#### 4. Ranking industries based on highest average salaries.

In [None]:
spark_session.sql('''
          SELECT industry, AVG(salary) as average_salary
          FROM data
          GROUP BY industry
          ORDER BY average_salary DESC''').show()

+---------+------------------+
| industry|    average_salary|
+---------+------------------+
|      OIL| 146.8513955582233|
|  FINANCE|146.39576890323158|
|      WEB| 138.4103885627213|
|   HEALTH|131.36158099222953|
|     AUTO|123.86654947412329|
|  SERVICE|  118.948209747515|
|EDUCATION|113.92040607917342|
+---------+------------------+



#### 5. Finding corelation between years of experience and salary.

In [None]:
spark_session.sql('''
    SELECT salary, jobtype, industry, yearsExperience
    FROM data
    ORDER BY salary DESC
    ''').show()

+------+-------+--------+---------------+
|salary|jobtype|industry|yearsExperience|
+------+-------+--------+---------------+
|   301|    CFO|     OIL|             23|
|   301|    CTO|     OIL|             24|
|   298|    CEO|     OIL|             22|
|   294|    CEO| FINANCE|             24|
|   294|    CEO|     OIL|             19|
|   293|    CEO|     OIL|             24|
|   293|    CEO| FINANCE|             16|
|   292|    CEO| FINANCE|             22|
|   292|    CTO| FINANCE|             24|
|   290|    CEO|     WEB|             20|
|   289|    CTO|     WEB|             24|
|   289|    CFO|     OIL|             23|
|   289|    CEO| FINANCE|             23|
|   288|    CFO| FINANCE|             24|
|   288|    CFO| FINANCE|             21|
|   287|    CEO| FINANCE|             19|
|   287|    CEO|     OIL|             22|
|   286|    CTO| FINANCE|             24|
|   286|    CEO|     OIL|             16|
|   286|    CEO| FINANCE|             24|
+------+-------+--------+---------

#### 6. Finding corelation between degree and salary. | NEEDS GRAPH

In [None]:
spark_session.sql('''
                  SELECT degree, salary, jobtype
                  FROM data
                  WHERE salary != 0
                  ORDER BY salary
                  ''').show(10)

+---------+------+-------+
|   degree|salary|jobtype|
+---------+------+-------+
|BACHELORS|    35| JUNIOR|
|BACHELORS|    37| JUNIOR|
|BACHELORS|    37| JUNIOR|
|BACHELORS|    38| JUNIOR|
|BACHELORS|    38| JUNIOR|
|BACHELORS|    39| JUNIOR|
|BACHELORS|    40| JUNIOR|
|BACHELORS|    40| JUNIOR|
|BACHELORS|    40| JUNIOR|
|BACHELORS|    40| JUNIOR|
+---------+------+-------+
only showing top 10 rows



#### 7. Finding corelation between major and salary. | NEEDS GRAPH

In [None]:
# ADD CODE HERE

### Predicting Salaries | ML

In [None]:
data.show(5)

+----------------+---------+--------------+---------+---------+---------+---------------+-------------------+------+
|           jobId|companyId|       jobType|   degree|    major| industry|yearsExperience|milesFromMetropolis|salary|
+----------------+---------+--------------+---------+---------+---------+---------------+-------------------+------+
|JOB1362684407687|   COMP37|           CFO|  MASTERS|     MATH|   HEALTH|             10|                 83|   130|
|JOB1362684407690|   COMP38|       MANAGER| DOCTORAL|CHEMISTRY|     AUTO|              8|                 17|   142|
|JOB1362684407694|   COMP24|        JUNIOR|BACHELORS|CHEMISTRY|EDUCATION|              9|                 70|    73|
|JOB1362684407696|   COMP41|VICE_PRESIDENT|BACHELORS|CHEMISTRY|     AUTO|             17|                 68|   104|
|JOB1362684407700|   COMP54|        JUNIOR|  MASTERS|     MATH|  FINANCE|             21|                 26|   193|
+----------------+---------+--------------+---------+---------+-

In [None]:
data = data.drop('jobId')

In [None]:
columns = data.columns

In [None]:
# Encoding labels for columns representing class.
class_columns = ["companyId","jobType", "degree", "major", "industry"]
stages = []
for categoricalCol in class_columns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    stages += [stringIndexer]
numericCols =  ["yearsExperience", "milesFromMetropolis"]
assemblerInputs = [c + "Index" for c in class_columns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
stages = []
stringIndexer = StringIndexer(inputCol = "degree", outputCol = "degreeIndex")
stages += [stringIndexer]

In [None]:
# One Hot Encoding
class_columns = ["companyId","jobType", "major", "industry"]
for categoricalCol in class_columns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
numericCols =  ["yearsExperience", "milesFromMetropolis"]
assemblerInputs = [c + "classVec" for c in class_columns] + numericCols + ["degreeIndex"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)
selectedCols = ['features'] + columns
data = data.select(selectedCols)

In [None]:
data.show(1)

+--------------------+---------+-------+-------+-----+--------+---------------+-------------------+------+
|            features|companyId|jobType| degree|major|industry|yearsExperience|milesFromMetropolis|salary|
+--------------------+---------+-------+-------+-----+--------+---------------+-------------------+------+
|(84,[61,78,81,82]...|   COMP37|    CFO|MASTERS| MATH|  HEALTH|             10|                 83|   130|
+--------------------+---------+-------+-------+-----+--------+---------------+-------------------+------+
only showing top 1 row



In [None]:
# Splitting data into training and testing sets
train, test = data.randomSplit([0.8, 0.2])

In [None]:
# Boosting
gbt =  GBTRegressor(featuresCol="features", labelCol="salary", maxBins=20, maxDepth=12)

In [None]:
model = gbt.fit(train)

In [None]:
predicted_salary = model.transform(test)

In [None]:
# Evaluation Root Mean Squared
evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predicted_salary)
rmse

21.592444756565165

In [None]:
# Evaluation R squared
evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predicted_salary)
r2

0.6331882787408196