In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!ls

!tar xf spark-3.1.2-bin-hadoop3.2.tgz

!pwd
!ls /content/

# Set up Spark
!pip install -q findspark
!pip install py4j

!export JAVA_HOME=$(/usr/lib/jvm/java-8-openjdk-amd64 -v 1.8)
! echo $JAVA_HOME
import os

os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2")# SPARK_HOME


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [73.9 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Ge

In [None]:
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import numpy as np
from pyspark.ml import Transformer, Pipeline, Estimator
from pyspark.ml.feature import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
#data preprocessing
print('running...')
session=SparkSession.builder.appName("Spark SQL").config("spark.some.config.option", "some-value").getOrCreate()
data=session.read.options(inferSchema=True,header=True).csv('/content/train.csv').toDF("jobType","degree","major",
"industry","yearsExperience","milesFromMetropolis","label")
print('finished')

running...
finished


In [None]:
data.show()

+--------------+-----------+----------+---------+---------------+-------------------+------+
|       jobType|     degree|     major| industry|yearsExperience|milesFromMetropolis|salary|
+--------------+-----------+----------+---------+---------------+-------------------+------+
|           CFO|    MASTERS|      MATH|   HEALTH|             10|                 83|   130|
|           CEO|HIGH_SCHOOL|      NONE|      WEB|              3|                 73|   101|
|VICE_PRESIDENT|   DOCTORAL|   PHYSICS|   HEALTH|             10|                 38|   137|
|       MANAGER|   DOCTORAL| CHEMISTRY|     AUTO|              8|                 17|   142|
|VICE_PRESIDENT|  BACHELORS|   PHYSICS|  FINANCE|              8|                 16|   163|
|       MANAGER|   DOCTORAL|   COMPSCI|  FINANCE|              2|                 31|   113|
|           CFO|       NONE|      NONE|   HEALTH|             23|                 24|   178|
|        JUNIOR|  BACHELORS| CHEMISTRY|EDUCATION|              9|     

Encode

In [None]:
stages=[]
categoricalColumns=["jobType","degree","major","industry"]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

numericCols = ["yearsExperience","milesFromMetropolis"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(data)
preppedDataDF = pipelineModel.transform(data)
preppedDataDF.printSchema()

cols = data.columns
selectedcols = ["label","features"]
dataset = preppedDataDF.select(selectedcols)

train,test=dataset.randomSplit([0.7,0.3],seed=100)
print(train.count())
#print(test.count())
train.show()

root
 |-- jobType: string (nullable = true)
 |-- degree: string (nullable = true)
 |-- major: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- yearsExperience: integer (nullable = true)
 |-- milesFromMetropolis: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- jobTypeIndex: double (nullable = false)
 |-- jobTypeclassVec: vector (nullable = true)
 |-- degreeIndex: double (nullable = false)
 |-- degreeclassVec: vector (nullable = true)
 |-- majorIndex: double (nullable = false)
 |-- majorclassVec: vector (nullable = true)
 |-- industryIndex: double (nullable = false)
 |-- industryclassVec: vector (nullable = true)
 |-- features: vector (nullable = true)

699966
+-----+--------------------+
|label|            features|
+-----+--------------------+
|   17|(27,[4,8,11,22,26...|
|   18|(27,[4,7,11,22,25...|
|   18|(27,[4,8,11,22,26...|
|   19|(27,[4,7,11,22,26...|
|   19|(27,[4,8,11,22,25...|
|   19|(27,[4,8,11,22,26...|
|   19|(27,[4,8,11,22,26...|
|

Modeling

In [None]:
#Learning
#LinearRegression model, maxIter=10
LR = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
LRmodel = LR.fit(train)

predictions = LRmodel.transform(test)
predictions.show()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
RMSE = evaluator.evaluate(predictions)
print("LR Test set RMSE = ",RMSE)

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|   19|(27,[4,7,11,22,25...|20.679534799762806|
|   20|(27,[4,7,11,22,25...|16.353561402897057|
|   20|(27,[4,7,11,22,25...| 19.89561070439568|
|   20|(27,[4,7,11,22,26...|19.474866912018953|
|   20|(27,[4,7,11,22,26...|19.082904864335404|
|   20|(27,[4,7,11,22,26...|15.163284387499786|
|   20|(27,[4,7,11,22,26...|14.771322339816223|
|   20|(27,[4,8,11,22,25...|13.096137336200812|
|   20|(27,[4,8,11,22,26...|18.177253083740524|
|   20|(27,[4,8,11,22,26...|13.865670559221343|
|   21|(27,[4,7,11,22,25...| 24.97672645193542|
|   21|(27,[4,7,11,22,25...|  21.0571059750998|
|   21|(27,[4,7,11,22,25...|20.273181879732675|
|   21|(27,[4,7,11,22,25...|19.881219832049112|
|   21|(27,[4,7,11,22,25...|17.921409593631296|
|   21|(27,[4,7,11,22,26...| 29.27391810410802|
|   21|(27,[4,7,11,22,26...| 14.37936029213266|
|   21|(27,[4,8,11,22,26...|20.920987417