# Assessment


In [83]:
# Importing required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, IntegerType
import pandas as pd


print("Libraries imported successfully")

Libraries imported successfully


In [3]:
# Create Spark Session local mode

spark = SparkSession.builder \
    .appName("pyspark-coding") \
    .master("local[*]") \
    .config("spark.executor.memory", "512m") \
    .getOrCreate()

print("Spark session created")
print(f"Spark Version: {spark.version}")

Spark session created
Spark Version: 4.0.2


In [4]:
# Load NYC Jobs dataset
file_path = r"file:///C:/Users/ASUS Zenbook 14 OLED/Workspace/Code_base/dataset/nyc-jobs.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)

print(f"Data loaded successfully")
print(f"Total records: {df.count()}")
print(f"Total columns: {len(df.columns)}")

Data loaded successfully
Total records: 2946
Total columns: 28


In [84]:
# Show sample data on a dataframe
print("\nSample Data:")

df.show(10, truncate=True)



Sample Data:
+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+-------------------+---------------------+--------------------+----------+--------------------+--------------------+--------------+-------------+---------------+-------------------+------------+
|Job ID|              Agency|Posting Type|# Of Positions|      Business Title| Civil Service Title|Title Code No|Level|        Job Category|Full-Time/Part-Time indicator|Salary Range From|Salary Range To|Salary Frequency|       Work Location|  Division/Work Unit|     Job Description|Minimum Qual Requirements|    Preferred Skills|Additional Information|            To Apply

In [6]:
# Column type analysis


numerical_cols = [field.name for field in df.schema.fields 
                 if field.dataType.typeName() in ['integer', 'double', 'float', 'long']]
string_cols = [field.name for field in df.schema.fields 
              if field.dataType.typeName() == 'string']

print(f"\nNumerical columns:")
for col_name in numerical_cols:
    print(f"  - {col_name}")

print(f"\nString columns:")
for col_name in string_cols:
    print(f"  - {col_name}")


Numerical columns:
  - Job ID
  - # Of Positions
  - Salary Range From
  - Salary Range To

String columns:
  - Agency
  - Posting Type
  - Business Title
  - Civil Service Title
  - Title Code No
  - Level
  - Job Category
  - Full-Time/Part-Time indicator
  - Salary Frequency
  - Work Location
  - Division/Work Unit
  - Job Description
  - Minimum Qual Requirements
  - Preferred Skills
  - Additional Information
  - To Apply
  - Hours/Shift
  - Work Location 1
  - Recruitment Contact
  - Residency Requirement
  - Posting Date
  - Post Until
  - Posting Updated
  - Process Date


In [10]:
#Duplicate check
exact_dup_count = df.count() - df.dropDuplicates().count()
print(f"Exact duplicate rows: {exact_dup_count}")

Exact duplicate rows: 31


In [23]:
# Missing values analysis

missing_data = []
for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    total_count = df.count()
    null_pct = (null_count / total_count) * 100
    if null_count > 0:
        missing_data.append({
            'Column': column,
            'Null_Count': null_count
        })

missing_df = pd.DataFrame(missing_data).sort_values('Null_Count', ascending=False)
#print(missing_df.sort_index().to_string())
print(missing_df.to_string(index=False))

                       Column  Null_Count
          Recruitment Contact        1763
                   Post Until        1499
              Work Location 1        1138
                  Hours/Shift        1062
        Residency Requirement         678
       Additional Information         563
                 Posting Date         517
              Posting Updated         508
                 Process Date         425
             Preferred Skills         259
Full-Time/Part-Time indicator         195
                     To Apply         180
    Minimum Qual Requirements          18
                 Job Category           2


In [26]:
# Cleaning - Remove duplicates and clean salary columns - remove $ and commas, convert to numeric

print(f"BEfore removing duplicates: {df.count()}")
df = df.dropDuplicates()
print(f"After removing duplicate: {df.count()}")

salary_cols = ['Salary Range From', 'Salary Range To']

for col_name in salary_cols:
    if col_name in df.columns:
        df = df.withColumn(
            col_name,
            regexp_replace(col(col_name), "[$,]", "").cast(DoubleType())
        )

# Create average salary column
df = df.withColumn(
    'Average_Salary',
    (col('Salary Range From') + col('Salary Range To')) / 2
)

print("Salary columns cleaned and Average_Salary created")

# Clean text columns - trim whitespace
text_columns = ['Business Title', 'Job Category', 'Agency', 
               'Civil Service Title', 'Title Classification']

for col_name in text_columns:
    if col_name in df.columns:
        df = df.withColumn(col_name, trim(col(col_name)))

print(" Text columns cleaned")

# Show cleaned data sample
df.select('Business Title', 'Salary Range From', 'Salary Range To', 'Average_Salary').show(5)

BEfore removing duplicates: 2946
After removing duplicate: 2915
Salary columns cleaned and Average_Salary created
 Text columns cleaned
+--------------------+-----------------+---------------+--------------+
|      Business Title|Salary Range From|Salary Range To|Average_Salary|
+--------------------+-----------------+---------------+--------------+
|  Software Architect|          95000.0|       110000.0|      102500.0|
|Quality Analyst L...|          85000.0|        98000.0|       91500.0|
|Manager, IT Inven...|          79471.0|       100000.0|       89735.5|
|Project Manager -...|          85000.0|        98000.0|       91500.0|
| Resiliency Engineer|          85000.0|       105000.0|       95000.0|
+--------------------+-----------------+---------------+--------------+
only showing top 5 rows


In [28]:
#Whats the number of jobs posting per category (Top 10)
jobs = df.groupBy('Job Category') \
    .agg(count('*').alias('Job_Count')) \
    .orderBy(desc('Job_Count')) \
    .limit(10)

jobs.show()


+--------------------+---------+
|        Job Category|Job_Count|
+--------------------+---------+
|Engineering, Arch...|      497|
|Technology, Data ...|      312|
|       Legal Affairs|      224|
|Public Safety, In...|      179|
|Building Operatio...|      177|
|Finance, Accounti...|      168|
|Administration & ...|      131|
|Constituent Servi...|      129|
|              Health|      125|
|Policy, Research ...|      124|
+--------------------+---------+



In [38]:
#Whats the salary distribution per job category?
sal_grp = df.filter(col('Average_Salary').isNotNull()) \
    .groupBy('Job Category') \
    .agg(
        round(min('Average_Salary'),2).alias('Min_Salary'),
        round(max('Average_Salary'),2).alias('Max_Salary'),
        count('*').alias('Job_Count')
    ) \
    .limit(15)

sal_grp.show()


+--------------------+----------+----------+---------+
|        Job Category|Min_Salary|Max_Salary|Job_Count|
+--------------------+----------+----------+---------+
|Health Policy, Re...|   94889.0|  162500.0|        4|
|Administration & ...|   69040.5|   69040.5|        2|
|Engineering, Arch...|    539.12|  103050.0|        8|
|Administration & ...|   53075.5|   53075.5|        2|
|Constituent Servi...|      17.7|  135000.0|      129|
|Building Operatio...|      17.7|  169011.0|      177|
|Constituent Servi...|   56463.5|   67029.5|        8|
|       Legal Affairs|     21.41|  191913.0|      224|
|Administration & ...|     30.02|   53094.0|        6|
|Maintenance & Ope...|      35.0|   65458.8|        8|
|Public Safety, In...|   49399.5|   67021.0|        9|
|Administration & ...|  218587.0|  218587.0|        2|
|Administration & ...|   70661.0|   70661.0|        2|
|Communications & ...|   56867.5|   56867.5|        2|
|              Health|     16.28| 170133.84|      125|
+---------

In [42]:
# correlation between the higher degree and the salary
df_edu = df.withColumn(
    'Education_Level',
    when(upper(col('Minimum Qual Requirements')).contains('BACHELOR'), 'Bachelor')
    .when(upper(col('Minimum Qual Requirements')).contains('MASTER'), 'Master')
    .when(upper(col('Minimum Qual Requirements')).contains('DOCTORAL'), 'Doctoral') 
    .when(upper(col('Minimum Qual Requirements')).contains('PHD'), 'Doctoral')
    .when(upper(col('Minimum Qual Requirements')).contains('ASSOCIATE'), 'Associate')
    .when(upper(col('Minimum Qual Requirements')).contains('HIGH SCHOOL'), 'High School')
    .otherwise('Other/Not Specified')
)

correlation = df_edu.filter(col('Average_Salary').isNotNull()) \
    .groupBy('Education_Level') \
    .agg(
        avg('Average_Salary').alias('Avg_Salary'),
        count('*').alias('Job_Count')
    ) \
    .orderBy(desc('Avg_Salary'))

correlation.show()

+-------------------+------------------+---------+
|    Education_Level|        Avg_Salary|Job_Count|
+-------------------+------------------+---------+
|           Bachelor| 90758.95603053436|      131|
|             Master| 84403.69259159022|      654|
|Other/Not Specified| 72093.75999386892|      946|
|        High School|64665.978645618845|      913|
|          Associate| 60497.77874889298|      271|
+-------------------+------------------+---------+



In [85]:
# Whats the job posting having the highest salary per agency?
window_spec = Window.partitionBy('Agency').orderBy(desc('Average_Salary'))

job_pos = df.filter(col('Average_Salary').isNotNull()) \
    .withColumn('rank', row_number().over(window_spec)) \
    .filter(col('rank') == 1) \
    .select('Job ID','Agency', 'Business Title', 'Average_Salary', 'Job Category') \
    .orderBy(desc('Average_Salary')) \
    .limit(15)

job_pos.show()


+------+--------------------+--------------------+--------------+--------------------+
|Job ID|              Agency|      Business Title|Average_Salary|        Job Category|
+------+--------------------+--------------------+--------------+--------------------+
|396521|DEPT OF ENVIRONME...|Deputy Commission...|      218587.0|Administration & ...|
|415583|   POLICE DEPARTMENT|Deputy Commission...|      217201.0|Constituent Servi...|
|425494|DISTRICT ATTORNEY...|Co-Chief Informat...|      191913.0|       Legal Affairs|
|391188|NYC HOUSING AUTHO...|executive Vice Pr...|      169011.0|Building Operatio...|
|404375|OFFICE OF THE COM...|Head of Short Ter...|      167500.0|Finance, Accounti...|
|425828|      LAW DEPARTMENT|Deputy Borough Ch...|      153666.0|       Legal Affairs|
|416442|DEPT OF DESIGN & ...|Associate Commiss...|      151795.0|Engineering, Arch...|
|415326|DEPARTMENT FOR TH...|Executive Agency ...|      150371.0|       Legal Affairs|
|423210|HOUSING PRESERVAT...|Assistant Comm

In [74]:
# Whats the job positings average salary per agency for the last 2 years?


In [52]:
# What are the highest paid skills in the US market?

df_skl = df.select("Preferred Skills","Work Location").distinct()
df_skl.show(truncate=True)
#very noisy data need more time to work on this.

+--------------------+--------------------+
|    Preferred Skills|       Work Location|
+--------------------+--------------------+
| all candidates m...|335 Adams Street,...|
|               ""2""|59-17 Junction Bl...|
|1.\tMinimum 15 ye...|96-05 Horace Hard...|
| to be eligible f...|28-11 Queens Plaz...|
| candidates must ...|120 Broadway, New...|
| ""2"" or ""3"" a...|335 Adams Street,...|
|Ability to commun...|   55 Water St Ny Ny|
|PREFERRED SKILLS ...|421 East 26th Str...|
| individuals must...|421 East 26th Str...|
| all candidates m...|600 W 168Th St., ...|
|A valid or abilit...|10 Walker Rd, Val...|
|QUALIFICATIONS:  ...|255 Greenwich Street|
| all candidates m...|100 Church St., N.Y.|
| ""b"" and ""c"" ...|Office for Exec P...|
|Proven working ex...|100 Church St., N.Y.|
| including the 18...|96-05 Horace Hard...|
| including one ye...|        2 Metro Tech|
| all candidates m...|1 Police Plaza, N.Y.|
|-\tGood writing a...|   42-09 28th Street|
|We are looking fo...|33 Beaver 

In [77]:
# Created a salary bucket for simplification
df = df.withColumn(
    'Salary_Bucket',
    when(col('Average_Salary') < 50000, 'Entry Level')
    .when((col('Average_Salary') >= 50000) & (col('Average_Salary') < 80000), 'Mid Level')
    .when((col('Average_Salary') >= 80000) & (col('Average_Salary') < 120000), 'Senior Level')
    .when(col('Average_Salary') >= 120000, 'Executive Level')
    .otherwise('Not Specified')
)
print("Salary_Bucket created")


# Employment Type Standardization
df = df.withColumn(
    'Employment_Type',
    when(upper(col('Full-Time/Part-Time indicator')).contains('F'), 'Full-Time')
    .when(upper(col('Full-Time/Part-Time indicator')).contains('P'), 'Part-Time')
    .otherwise('Not Specified')
)
print("Employment_Type standardized")

# Linkedin like Posted Since 
df = df.filter(
    trim(col('Posting Date')).rlike('^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}$')
)

df = df.withColumn(
    'format_date', 
    to_timestamp(trim(col('Posting Date')), "yyyy-MM-dd'T'HH:mm:ss.SSS")
).withColumn(
    'Posted_since',
    datediff(current_date(), col('format_date'))
)


# Show sample with new features
df.select(
    'Salary_Bucket', 
    'Employment_Type',
    'Posting Date',
    'Posted_since'
).show(10, truncate=True)

Salary_Bucket created
Employment_Type standardized
+-------------+---------------+--------------------+------------+
|Salary_Bucket|Employment_Type|        Posting Date|Posted_since|
+-------------+---------------+--------------------+------------+
|    Mid Level|      Full-Time|2018-03-27T00:00:...|        2875|
|    Mid Level|      Full-Time|2018-04-09T00:00:...|        2862|
|  Entry Level|      Full-Time|2016-03-14T00:00:...|        3618|
| Senior Level|      Full-Time|2016-12-19T00:00:...|        3338|
|    Mid Level|      Full-Time|2017-08-30T00:00:...|        3084|
|    Mid Level|      Full-Time|2018-05-02T00:00:...|        2839|
|  Entry Level|      Full-Time|2018-06-25T00:00:...|        2785|
|  Entry Level|  Not Specified|2015-07-14T00:00:...|        3862|
| Senior Level|      Full-Time|2017-08-15T00:00:...|        3099|
|  Entry Level|      Full-Time|2017-11-01T00:00:...|        3021|
+-------------+---------------+--------------------+------------+
only showing top 10 rows


In [82]:
output_path = "/opt/workspace/processed_data"

# Save as CSV with single partition
df.coalesce(1).write.mode('overwrite').option('header', 'true').csv(output_path)

print(f"Processed data saved to: {output_path}")
print(f"Total records saved: {df.count()}")
print(f"Total features: {len(df.columns)}")

#Getting error while writing 
#Py4JJavaError: An error occurred while calling o3468.csv.
#: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems

Py4JJavaError: An error occurred while calling o3468.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:190)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:268)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:426)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
		at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
		at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
		at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:190)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:268)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
		at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
		at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
		at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
		at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		... 1 more
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:601)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:622)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:645)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:742)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1954)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1912)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1885)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$install$1(ShutdownHookManager.scala:194)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.Option.fold(Option.scala:263)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:195)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:55)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:53)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:159)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala:63)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:250)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:99)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:379)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:961)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:204)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:96)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1132)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1141)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:521)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:492)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:569)
	... 27 more
