# Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum,when
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import MinMaxScaler,VectorAssembler,StringIndexer,OneHotEncoder
from pyspark.ml.regression import LinearRegression,DecisionTreeRegressor,RandomForestRegressor
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,DoubleType,StringType

# Create Spark session

In [2]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

spark = SparkSession.builder \
    .appName("Regression") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "localhost") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .getOrCreate()

# Load Dataset

In [3]:
df=spark.read.csv('CarPrice_Assignment.csv',header=True,inferSchema=True)
df.show()

+------+---------+--------------------+--------+----------+----------+-----------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+
|car_ID|symboling|             CarName|fueltype|aspiration|doornumber|    carbody|drivewheel|enginelocation|wheelbase|carlength|carwidth|carheight|curbweight|enginetype|cylindernumber|enginesize|fuelsystem|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|    price|
+------+---------+--------------------+--------+----------+----------+-----------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+
|     1|        3|  alfa-romero giulia|     gas|       std|       two|convertible|       rwd|         front|     88.6|    168.8|    6

# Find unique values of each categorical column

In [4]:
columns=['fueltype','aspiration','doornumber','carbody','drivewheel','enginelocation','fuelsystem','enginetype','cylindernumber']
for c in columns: 
    df.select(c).distinct().show()


+--------+
|fueltype|
+--------+
|     gas|
|  diesel|
+--------+

+----------+
|aspiration|
+----------+
|       std|
|     turbo|
+----------+

+----------+
|doornumber|
+----------+
|       two|
|      four|
+----------+

+-----------+
|    carbody|
+-----------+
|      wagon|
|convertible|
|      sedan|
|  hatchback|
|    hardtop|
+-----------+

+----------+
|drivewheel|
+----------+
|       fwd|
|       rwd|
|       4wd|
+----------+

+--------------+
|enginelocation|
+--------------+
|         front|
|          rear|
+--------------+

+----------+
|fuelsystem|
+----------+
|      1bbl|
|      4bbl|
|       mfi|
|      2bbl|
|      spfi|
|       idi|
|      spdi|
|      mpfi|
+----------+

+----------+
|enginetype|
+----------+
|     rotor|
|         l|
|       ohc|
|      ohcf|
|     dohcv|
|      ohcv|
|      dohc|
+----------+

+--------------+
|cylindernumber|
+--------------+
|           two|
|         eight|
|          four|
|        twelve|
|           six|
|         three|

# Data preprocessing

# Check Null values

In [5]:
null_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()


+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+
|car_ID|symboling|CarName|fueltype|aspiration|doornumber|carbody|drivewheel|enginelocation|wheelbase|carlength|carwidth|carheight|curbweight|enginetype|cylindernumber|enginesize|fuelsystem|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|price|
+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+
|     0|        0|      0|       0|         0|         0|      0|         0|             0|        0|        0|       0|        0|         0|         0|             0|         0|         0|       

# Check for duplicate records

In [6]:
df.groupBy(df.columns).count().filter('count>1').show()

+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+-----+
|car_ID|symboling|CarName|fueltype|aspiration|doornumber|carbody|drivewheel|enginelocation|wheelbase|carlength|carwidth|carheight|curbweight|enginetype|cylindernumber|enginesize|fuelsystem|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|price|count|
+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+-----+
+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+

# Feature Engineering

In [7]:
def carNameExtracter(name):
    name=name.split()[0]
    return name
carNameExtracterUdf=udf(carNameExtracter,StringType())

df=df.withColumn('Brand',carNameExtracterUdf(col('CarName')))
df.select('Brand').distinct().show(28)

+-----------+
|      Brand|
+-----------+
|     jaguar|
|      buick|
| mitsubishi|
|     toyota|
|       saab|
|         vw|
|    peugeot|
|   plymouth|
|  vokswagen|
|       audi|
|alfa-romero|
|        bmw|
|      dodge|
|    toyouta|
|      maxda|
|      mazda|
|      isuzu|
|    porsche|
|  chevrolet|
|      honda|
| volkswagen|
|    mercury|
|   porcshce|
|    renault|
|     nissan|
|     subaru|
|     Nissan|
|      volvo|
+-----------+



In [8]:
brand_list = [row['Brand'] for row in df.select("Brand").distinct().collect()]
print(brand_list)

['jaguar', 'buick', 'mitsubishi', 'toyota', 'saab', 'vw', 'peugeot', 'plymouth', 'vokswagen', 'audi', 'alfa-romero', 'bmw', 'dodge', 'toyouta', 'maxda', 'mazda', 'isuzu', 'porsche', 'chevrolet', 'honda', 'volkswagen', 'mercury', 'porcshce', 'renault', 'nissan', 'subaru', 'Nissan', 'volvo']


# Transformation

## Encoding Categorical columns 

In [9]:
string_idxr=StringIndexer(inputCols=['fueltype','aspiration','carbody',
                                     'drivewheel','enginelocation','enginetype','fuelsystem','Brand'],
                          outputCols=['enc_fueltype','enc_aspiration','enc_carbody',
                                      'enc_drivewheel','enc_enginelocation','enc_enginetype',
                                      'enc_fuelsystem','enc_brand'])
string_idxr=string_idxr.fit(df)
df=string_idxr.transform(df)

In [11]:
df.show()

+------+---------+--------------------+--------+----------+----------+-----------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+------------+--------------+-----------+--------------+------------------+--------------+--------------+---------+
|car_ID|symboling|             CarName|fueltype|aspiration|doornumber|    carbody|drivewheel|enginelocation|wheelbase|carlength|carwidth|carheight|curbweight|enginetype|cylindernumber|enginesize|fuelsystem|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|    price|      Brand|enc_fueltype|enc_aspiration|enc_carbody|enc_drivewheel|enc_enginelocation|enc_enginetype|enc_fuelsystem|enc_brand|
+------+---------+--------------------+--------+----------+----------+-----------+----------+--------------+---------+---------+--------+---------+----------+----------+-

In [12]:
def cylinder_num (num):
    if num=='two':
        return 2
    elif num=='three':
        return 3
    elif num=='four':
        return 4
    elif num=='five':
        return 5
    elif num=='six':
        return 6
    elif num== 'eight':
        return 8
    elif num== 'twelve':
        return 12
    else:
        return 0
    
def door_num(num):
    if num=='two':
        return 2
    elif num=='four':
        return 4    

cylinder_num_udf=udf(cylinder_num,IntegerType())
door_num_udf=udf(door_num,IntegerType())

df=df.withColumn('cylinder_num',cylinder_num_udf(col('cylindernumber')))
df=df.withColumn('doornumber',cylinder_num_udf(col('doornumber')))  
    


In [13]:
oh_encoder=OneHotEncoder(inputCols=['enc_carbody','enc_fuelsystem','enc_enginetype','enc_brand'],
                         outputCols=['oh_carbody','oh_fuelsystem','oh_enginetype','oh_brand'])
oh_encoder=oh_encoder.fit(df)
df=oh_encoder.transform(df)

## After encoding, removing unwanted columns

In [14]:
df=df.drop('fueltype','aspiration','carbody',
          'drivewheel','enginelocation','enginetype','cylindernumber',
          'CarName','symboling','car_ID','fuelsystem',
          'enc_carbody','enc_fuelsystem','enc_enginetype','enc_brand')
df.show()

+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+------------+--------------+--------------+------------------+------------+-------------+-------------+-------------+---------------+
|doornumber|wheelbase|carlength|carwidth|carheight|curbweight|enginesize|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|    price|      Brand|enc_fueltype|enc_aspiration|enc_drivewheel|enc_enginelocation|cylinder_num|   oh_carbody|oh_fuelsystem|oh_enginetype|       oh_brand|
+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+------------+--------------+--------------+------------------+------------+-------------+-------------+-------------+---------------+
|         2|     88.6|    168.8|    64.1|     48.8|      2548|       130|     3.47|  2.68|       

## Assembling for numerical features scaling

In [15]:
columns=['wheelbase','carlength','carwidth','carheight','curbweight','enginesize',
         'boreratio','stroke','compressionratio','horsepower','peakrpm','citympg','highwaympg']

assembler_x=VectorAssembler(inputCols=columns,outputCol='scale_input_x')
df=assembler_x.transform(df)

assembler_y=VectorAssembler(inputCols=['price'],outputCol='scale_input_y')
df=assembler_y.transform(df)
df.show()

+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+------------+--------------+--------------+------------------+------------+-------------+-------------+-------------+---------------+--------------------+-------------+
|doornumber|wheelbase|carlength|carwidth|carheight|curbweight|enginesize|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|    price|      Brand|enc_fueltype|enc_aspiration|enc_drivewheel|enc_enginelocation|cylinder_num|   oh_carbody|oh_fuelsystem|oh_enginetype|       oh_brand|       scale_input_x|scale_input_y|
+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+---------+-----------+------------+--------------+--------------+------------------+------------+-------------+-------------+-------------+---------------+--------------------+-------

# Train,Test split | scaling features

In [16]:
train,test=df.randomSplit([0.8,0.2])

scaler_x=MinMaxScaler(inputCol='scale_input_x',outputCol='scaled_feature')
scaler_y=MinMaxScaler(inputCol='scale_input_y',outputCol='scaled_target')

scaler_x=scaler_x.fit(train)
scaler_y=scaler_y.fit(train)
train=scaler_x.transform(train)
train=scaler_y.transform(train)

test=scaler_x.transform(test)
test=scaler_y.transform(test)

train.show()
test.show()

+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-------+-----------+------------+--------------+--------------+------------------+------------+-------------+-------------+-------------+---------------+--------------------+-------------+--------------------+--------------------+
|doornumber|wheelbase|carlength|carwidth|carheight|curbweight|enginesize|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|  price|      Brand|enc_fueltype|enc_aspiration|enc_drivewheel|enc_enginelocation|cylinder_num|   oh_carbody|oh_fuelsystem|oh_enginetype|       oh_brand|       scale_input_x|scale_input_y|      scaled_feature|       scaled_target|
+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-------+-----------+------------+--------------+--------------+------------------+------------+--------

# Assembling all features for training

In [17]:
assembler_all=VectorAssembler(inputCols=['scaled_feature','enc_fueltype','enc_aspiration','doornumber',
                                     'enc_drivewheel','enc_enginelocation','cylinder_num','oh_carbody',
                                     'oh_fuelsystem','oh_enginetype','oh_brand'],outputCol='feature_inp')

train=assembler_all.transform(train)
test=assembler_all.transform(test)
train.show()
test.show()

+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-------+-----------+------------+--------------+--------------+------------------+------------+-------------+-------------+-------------+---------------+--------------------+-------------+--------------------+--------------------+--------------------+
|doornumber|wheelbase|carlength|carwidth|carheight|curbweight|enginesize|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|  price|      Brand|enc_fueltype|enc_aspiration|enc_drivewheel|enc_enginelocation|cylinder_num|   oh_carbody|oh_fuelsystem|oh_enginetype|       oh_brand|       scale_input_x|scale_input_y|      scaled_feature|       scaled_target|         feature_inp|
+----------+---------+---------+--------+---------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-------+-----------+------------+--------------+-------------

## Converting Vector target to Single value

In [18]:
train=train.withColumn("scaled_target", vector_to_array("scaled_target")[0])
test=test.withColumn("scaled_target", vector_to_array("scaled_target")[0])

# Linear Regression | Model Evaluation

In [19]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="feature_inp", labelCol="scaled_target")
lr_model = lr.fit(train)
lr_predictions = lr_model.evaluate(test)

print(f'MSE is {lr_predictions.meanSquaredError}')
print(f'MAE is {lr_predictions.meanAbsoluteError}')
print(f'R2 score is {lr_predictions.r2}')


MSE is 0.0039520336078594665
MAE is 0.03991633247987977
R2 score is 0.9076643106053869


# Decision Tree | Model Evaluation

In [20]:
#Decision tree regression
dt = DecisionTreeRegressor(featuresCol="feature_inp", labelCol="scaled_target")
dt_model = dt.fit(train)
dt_predictions = dt_model.transform(test)

# Evaluate predictions
evaluator = RegressionEvaluator(labelCol="scaled_target", predictionCol="prediction")

# R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(dt_predictions)
print(f'R2 score is {r2}')

# MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(dt_predictions)
print(f'MAE is {mae}')


# MSE
evaluator.setMetricName("mse")
mse = evaluator.evaluate(dt_predictions)
print(f'MSE is {mse}')

R2 score is 0.8871935568874105
MAE is 0.045050965239462865
MSE is 0.004828196521702145


# Random Forest | Model Evaluation

In [21]:
rf = RandomForestRegressor(featuresCol="feature_inp", labelCol="scaled_target", numTrees=100)
rf_model = rf.fit(train)
rf_predictions = rf_model.transform(test)

# R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(rf_predictions)
print(f'R2 score is {r2}')

# MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(rf_predictions)
print(f'MAE is {mae}')


# MSE
evaluator.setMetricName("mse")
mse = evaluator.evaluate(rf_predictions)
print(f'MSE is {mse}')


R2 score is 0.9236859358148928
MAE is 0.037302723979554316
MSE is 0.003266296579245371


# XGBoost | Model Evaluation

In [22]:
xgb = SparkXGBRegressor(
    label_col="scaled_target",
    features_col="feature_inp",
    objective='reg:squarederror',
    num_workers=2
)

xgb_model = xgb.fit(train)
xgb_predictions = xgb_model.transform(test)

# R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(xgb_predictions)
print(f'R2 score is {r2}')

# MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(xgb_predictions)
print(f'MAE is {mae}')


# MSE
evaluator.setMetricName("mse")
mse = evaluator.evaluate(xgb_predictions)
print(f'MSE is {mse}')


2025-05-20 03:24:43,091 INFO XGBoost-PySpark: _fit Running xgboost-3.0.1 on 2 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-05-20 03:24:51,665 INFO XGBoost-PySpark: _fit Finished xgboost training!


R2 score is 0.8922181883274196
MAE is 0.043787334727436775
MSE is 0.00461313869900957


# Demonstration

In [23]:
# Convert back values to it's original form 
def inverseTransform(predictions):
    min_val = float(scaler_y.originalMin[0])
    max_val = float(scaler_y.originalMax[0])
    
    # Cap the values just for safety
    predictions = predictions.withColumn(
        "prediction",
        when(col("prediction") < 0, 0)
        .when(col("prediction") > 1, 1)
        .otherwise(col("prediction"))
        )
        

    def inverse_min_max(scaled_val):
        return scaled_val * (max_val - min_val) + min_val

    inverse_udf = udf(inverse_min_max, DoubleType())

    predictions = predictions.withColumn("original_prediction", inverse_udf(col("prediction")))
    return predictions.select(['original_prediction','price'])

og_pred=inverseTransform(rf_predictions)
og_pred.show()


+-------------------+-------+
|original_prediction|  price|
+-------------------+-------+
|   6612.58046453164| 6855.0|
|    6878.0370984611| 5195.0|
|  6540.145158603766| 6095.0|
| 6432.5567242140905| 5399.0|
|  8710.370188187648| 7957.0|
|  6571.848674694544| 8916.5|
| 10363.604444796645| 9298.0|
| 10489.967325876929| 9538.0|
|  6603.491799297301| 5348.0|
| 14362.237345493782|14489.0|
| 12144.916406508853|11048.0|
| 10683.579917673247| 9895.0|
|  8485.514835719616| 9095.0|
|  9031.594110440648| 7975.0|
|   8411.02184577244| 7775.0|
|  11118.75162127618| 8449.0|
| 12281.397891955923|11199.0|
| 12633.778472947735|17669.0|
| 16547.565473399904|18399.0|
|  17682.26422229837|20970.0|
+-------------------+-------+
only showing top 20 rows



In [24]:
df.columns

['doornumber',
 'wheelbase',
 'carlength',
 'carwidth',
 'carheight',
 'curbweight',
 'enginesize',
 'boreratio',
 'stroke',
 'compressionratio',
 'horsepower',
 'peakrpm',
 'citympg',
 'highwaympg',
 'price',
 'Brand',
 'enc_fueltype',
 'enc_aspiration',
 'enc_drivewheel',
 'enc_enginelocation',
 'cylinder_num',
 'oh_carbody',
 'oh_fuelsystem',
 'oh_enginetype',
 'oh_brand',
 'scale_input_x',
 'scale_input_y']

In [25]:
def predict(data):
    inp_df=spark.createDataFrame([data])
    inp_df=string_idxr.transform(inp_df)
    inp_df=oh_encoder.transform(inp_df)
    inp_df=inp_df.drop('fueltype','aspiration','carbody',
          'drivewheel','enginelocation','enginetype','cylindernumber',
          'CarName','symboling','car_ID','fuelsystem',
          'enc_carbody','enc_fuelsystem','enc_enginetype','enc_brand')
    
    inp_df=assembler_x.transform(inp_df)
    
    inp_df=scaler_x.transform(inp_df)
    inp_df=assembler_all.transform(inp_df)
    prediction = rf_model.transform(inp_df).select('prediction').collect()[0][0]
    if prediction<0:
          prediction=0
    elif prediction>1:
          prediction=1      
    min_val = float(scaler_y.originalMin[0])
    max_val = float(scaler_y.originalMax[0])
    prediction=prediction * (max_val - min_val) + min_val
    return prediction

data={'wheelbase':88.6,'carlength':168.8,'carwidth':64.1,'carheight':48.8,'curbweight':2548,'enginesize':130,
      'boreratio':3.47,'stroke':2.68,'compressionratio':9.0,'horsepower':111,'peakrpm':5000,'citympg':21,
      'highwaympg':27,'cylinder_num':4,'Brand':'alfa-romero','fueltype':'gas','aspiration':'std',
      'doornumber':2,'drivewheel':'rwd','enginelocation':'front','carbody':'convertible','fuelsystem':'mpfi',
      'enginetype':'dohc'}  

predict(data)  

14352.477385314138

In [26]:
# Choose a directory to save each component
base_path = "models/"

# Save StringIndexer
string_idxr.write().overwrite().save(base_path + "string_idxr")

# Save OneHotEncoder
oh_encoder.write().overwrite().save(base_path + "oh_encoder")

# Save Assembler for features
assembler_x.write().overwrite().save(base_path + "assembler_x")

# Save Scalers
scaler_x.write().overwrite().save(base_path + "scaler_x")
scaler_y.write().overwrite().save(base_path + "scaler_y")

# Save assembler for all features
assembler_all.write().overwrite().save(base_path + "assembler_all")

# Save the model (Random Forest or XGBoost)
rf_model.write().overwrite().save(base_path + "rf_model")


Py4JJavaError: An error occurred while calling o1476.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
	at org.apache.hadoop.mapred.OutputCommitter.setupJob(OutputCommitter.java:265)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:79)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.feature.StringIndexerModel$StringIndexModelWriter.saveImpl(StringIndexer.scala:496)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:377)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:969)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:199)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:222)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1125)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1134)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
