In [1]:
import pyspark

In [29]:
from pyspark.sql import SparkSession
import json
spark = SparkSession.builder.getOrCreate()

df = spark.read.json("donation_np.json")
df.show(50)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|          null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH|       2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...|       2014-15|    V K Ramachandran|        Y|CPI(M)|Others|        

In [4]:
df = df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given")
df.show()



+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |    

In [5]:
#removing first row which contains null value
df = df.na.drop(subset="Name")
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [6]:
#Adding encryption to address field
from pyspark.sql.functions import *
df = df.withColumn("Address",sha2(concat_ws("||",df.Address),256))
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [8]:
df= df.drop("_corrupt_record","field10","field11","field12","field13","field14","field9")
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [9]:
#Categorized Data
from pyspark.sql.functions import *

df = df.withColumn("mode_of_payment",when(col("mode_of_payment").like("Ch.%"),"CHEQUE")
    .when(col("mode_of_payment").like("ch.%"),"CHEQUE")
    .when(col("mode_of_payment").like("%Cheque%"),"CHEQUE")
    .when(col("mode_of_payment").like("%CASH%"),"CASH")
    .when(col("mode_of_payment").like("%Bank%"),"BANK").otherwise("OTHER"))
df.show()
    


+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           BANK| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|         CHEQUE| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           BANK| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           BANK| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          OTHER| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|          OTHER| 2011-12|   Uma Shankar Gupta|     

In [18]:
#calculating aggregations
from pyspark.sql.types import IntegerType

df = df.withColumn("Amount", df["Amount"].cast(IntegerType()))

def party_aggregates(party_name):
    INC_SUM = df.where(df.Party==party_name).select(sum("Amount")).collect()[0][0]
    INC_MAX = df.where(df.Party==party_name).select(max("Amount")).collect()[0][0]
    INC_AVG = df.where(df.Party==party_name).select(avg("Amount")).collect()[0][0]
    INC_COUNT = df.where(df.Party==party_name).agg(count("Amount")).collect()[0][0]

    aggregated_list = [INC_SUM,INC_MAX,INC_AVG, INC_COUNT]
    return aggregated_list

result_list_INC = party_aggregates("INC")
result_list_BJP = party_aggregates("BJP")
result_list_NCP = party_aggregates("NCP")
result_list_CPI = party_aggregates("CPI")
result_list_CPIM = party_aggregates("CPI(M)")

def creating_new_column(df,col_name,party_name,aggregated):
    df = df.withColumn(col_name, lit(aggregated)).withColumn(col_name, when(df.Party.like(party_name), aggregated).otherwise("0"))
    return df

df=creating_new_column(df, "INC_SUM_LTD", "INC", result_list_INC[0])
df=creating_new_column(df, "INC_MAX_LTD", "INC", result_list_INC[1])
df=creating_new_column(df, "INC_AVG_LTD", "INC", result_list_INC[2])
df=creating_new_column(df, "INC_COUNT_LTD", "INC", result_list_INC[3])

df=creating_new_column(df, "BJP_SUM_LTD", "BJP", result_list_BJP[0])
df=creating_new_column(df, "BJP_MAX_LTD", "BJP", result_list_BJP[1])
df=creating_new_column(df, "BJP_AVG_LTD", "BJP", result_list_BJP[2])
df=creating_new_column(df, "BJP_COUNT_LTD", "BJP", result_list_BJP[3])

df=creating_new_column(df, "NCP_SUM_LTD", "NCP", result_list_NCP[0])
df=creating_new_column(df, "NCP_MAX_LTD", "NCP", result_list_NCP[1])
df=creating_new_column(df, "NCP_AVG_LTD", "NCP", result_list_NCP[2])
df=creating_new_column(df, "NCP_COUNT_LTD", "NCP", result_list_NCP[3])

df=creating_new_column(df, "CPI_SUM_LTD", "CPI", result_list_CPI[0])
df=creating_new_column(df, "CPI_MAX_LTD", "CPI", result_list_CPI[1])
df=creating_new_column(df, "CPI_AVG_LTD", "CPI", result_list_CPI[2])
df=creating_new_column(df, "CPI_COUNT_LTD", "CPI", result_list_CPI[3])

df=creating_new_column(df, "CPIM_SUM_LTD", "CPI(M)", result_list_CPIM[0])
df=creating_new_column(df, "CPIM_MAX_LTD", "CPI(M)", result_list_CPIM[1])
df=creating_new_column(df, "CPIM_AVG_LTD", "CPI(M)", result_list_CPIM[2])
df=creating_new_column(df, "CPIM_COUNT_LTD", "CPI(M)", result_list_CPIM[3])
#df.select("INC_MAX_LTD").show()
df.show(50)

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+------------+------------+-----------------+--------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|       BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPIM_SUM_LTD|CPIM_MAX_LTD|     CPIM_AVG_LTD|CPIM_COUNT_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+---------

In [19]:
#TOP DONOR BJP
top_donor = df.where(df.Party == "BJP").select(max("Amount")).collect()[0][0]
top_donor_name = df.select("Name").where(df.Amount == top_donor).where(df.Party=="BJP")

bjp_top_donor_name = top_donor_name.collect()[0][0]

df = df.withColumn("BJP_TOP_DONOR", lit(bjp_top_donor_name)).withColumn("BJP_TOP_DONOR", when(df.Party.like("BJP"), bjp_top_donor_name).otherwise("N.A"))

#TOP DONOR NCP
top_donor = df.where(df.Party == "NCP").select(max("Amount")).collect()[0][0]
top_donor_name = df.select("Name").where(df.Amount == top_donor).where(df.Party=="NCP")

ncp_top_donor_name = top_donor_name.collect()[0][0]

df = df.withColumn("NCP_TOP_DONOR", lit(ncp_top_donor_name)).withColumn("NCP_TOP_DONOR", when(df.Party.like("NCP"), ncp_top_donor_name).otherwise("N.A"))

#TOP DONOR CPI
top_donor = df.where(df.Party == "CPI").select(max("Amount")).collect()[0][0]
top_donor_name = df.select("Name").where(df.Amount == top_donor).where(df.Party=="CPI")

cpi_top_donor_name = top_donor_name.collect()[0][0]

df = df.withColumn("CPI_TOP_DONOR", lit(cpi_top_donor_name)).withColumn("CPI_TOP_DONOR", when(df.Party.like("CPI"), cpi_top_donor_name).otherwise("N.A"))


#TOP DONOR INC
top_donor = df.where(df.Party == "INC").select(max("Amount")).collect()[0][0]
top_donor_name = df.select("Name").where(df.Amount == top_donor).where(df.Party=="INC")

inc_top_donor_name = top_donor_name.collect()[0][0]

df = df.withColumn("INC_TOP_DONOR", lit(inc_top_donor_name)).withColumn("INC_TOP_DONOR", when(df.Party.like("INC"), inc_top_donor_name).otherwise("N.A"))


#TOP DONOR CPI(M)
top_donor = df.where(df.Party == "CPI(M)").select(max("Amount")).collect()[0][0]
top_donor_name = df.select("Name").where(df.Amount == top_donor).where(df.Party=="CPI(M)")

cpim_top_donor_name = top_donor_name.collect()[0][0]

df = df.withColumn("CPIM_TOP_DONOR", lit(cpim_top_donor_name)).withColumn("CPIM_TOP_DONOR", when(df.Party.like("CPI(M)"), cpim_top_donor_name).otherwise("N.A"))
df.show(50)


+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+------------+------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|       BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPIM_SUM_LTD|CPIM_MAX_LTD|     CPIM_AVG_LTD|CPIM_COUNT_LTD|       BJP_TOP_DONOR|NCP_TOP_DONOR|CPI_TOP_DONOR|       INC_TOP_DONOR|  CPIM_TOP_DONOR|
+--------------------+--------+---------------+--------+------

In [21]:
#SUM OF DONATIONS PER YEAR
x= df.groupBy('Party', 'fin_year').sum('Amount')
#x.show()
for i in df.groupBy('Party', 'fin_year').sum('Amount').collect():
    df = df.withColumn(i["fin_year"]+"_"+i["Party"]+"_SUM", lit(i["sum(Amount)"])).withColumn(i["fin_year"]+"_"+i["Party"]+"_SUM", when(df.Party.like(i['Party']) & df.fin_year.like(i["fin_year"]), i["sum(Amount)"]).otherwise("0"))
df.show(50)

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+------------+------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+----------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+-----

In [22]:
df1= df.groupBy("mode_of_payment").count()
df1.show()

list_modes=['CHEQUE','BANK','CASH','OTHER']

for i in range(4):
    df=df.withColumn(list_modes[i]+"_COUNT_LTD",lit(df1.collect()[i][1])).withColumn(list_modes[i]+"_COUNT_LTD", when(df.mode_of_payment.like(list_modes[i]),df1.collect()[i][1]).otherwise("0"))

df.show(50)

+---------------+-----+
|mode_of_payment|count|
+---------------+-----+
|         CHEQUE| 2686|
|           BANK| 6844|
|           CASH|   29|
|          OTHER| 4014|
+---------------+-----+

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+------------+------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+----------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+-------

In [28]:
df.write.mode("append").parquet("Political_parties_output")

Py4JJavaError: An error occurred while calling o3430.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:209)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:343)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:344)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 22 more
