In [4]:
# from pyspark.sql import SparkSession
# import getpass 
# username=getpass.getuser()
# spark=SparkSession. \
#     builder. \
#     config('spark.ui.port','0'). \
#     config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
#     config('spark.shuffle.useOldFetchProtocol', 'true'). \
#     enableHiveSupport(). \
#     master('yarn'). \
#     getOrCreate()

# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("Local Spark Example") \
#     .config("spark.master", "local") \
#     .config("spark.sql.warehouse.dir", "./warehouse") \
#     .getOrCreate()


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Create a Spark session
spark = SparkSession.builder \
    .appName("Lending Club Data Analysis") \
    .getOrCreate()

#### 1. create a dataframe with proper datatypes and names

In [5]:
loans_schema = 'loan_id string, member_id string, loan_amount float, funded_amount float, loan_term_months string, interest_rate float, monthly_installment float, issue_date string, loan_status string, loan_purpose string, loan_title string'

In [6]:
loans_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(loans_schema) \
.load("F:/SparkProject/lendingclubproject_datasets/lendingclubproject/raw/loans_data_csv/part-00000-30d34559-cada-4e5a-91c1-381f533f4c78-c000.csv")

In [7]:
loans_raw_df.show()

+--------+--------------------+-----------+-------------+----------------+-------------+-------------------+----------+-----------+------------------+--------------------+
| loan_id|           member_id|loan_amount|funded_amount|loan_term_months|interest_rate|monthly_installment|issue_date|loan_status|      loan_purpose|          loan_title|
+--------+--------------------+-----------+-------------+----------------+-------------+-------------------+----------+-----------+------------------+--------------------+
|56633077|b59d80da191f5b573...|     3000.0|       3000.0|       36 months|         7.89|              93.86|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|
|55927518|202d9f56ecb7c3bc9...|    15600.0|      15600.0|       36 months|         7.89|             488.06|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|
|56473345|e5a140c0922b554b9...|    20000.0|      20000.0|       36 months|         9.17|             637.58|  Aug-2015| Fully Paid|debt_cons

In [8]:
loans_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- funded_amount: float (nullable = true)
 |-- loan_term_months: string (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- monthly_installment: float (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- loan_title: string (nullable = true)



In [9]:
from pyspark.sql.functions import current_timestamp

#### 2. insert a new column named as ingestion date(current time)

In [10]:
loans_df_ingestd = loans_raw_df.withColumn("ingest_date", current_timestamp())

In [11]:
loans_df_ingestd.show()

+--------+--------------------+-----------+-------------+----------------+-------------+-------------------+----------+-----------+------------------+--------------------+--------------------+
| loan_id|           member_id|loan_amount|funded_amount|loan_term_months|interest_rate|monthly_installment|issue_date|loan_status|      loan_purpose|          loan_title|         ingest_date|
+--------+--------------------+-----------+-------------+----------------+-------------+-------------------+----------+-----------+------------------+--------------------+--------------------+
|56633077|b59d80da191f5b573...|     3000.0|       3000.0|       36 months|         7.89|              93.86|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|2024-04-19 10:07:...|
|55927518|202d9f56ecb7c3bc9...|    15600.0|      15600.0|       36 months|         7.89|             488.06|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|2024-04-19 10:07:...|
|56473345|e5a140c0922b554b9...|    

In [12]:
loans_df_ingestd.createOrReplaceTempView("loans")

In [13]:
spark.sql("select count(*) from loans")

DataFrame[count(1): bigint]

In [14]:
spark.sql("select * from loans where loan_amount is null")

DataFrame[loan_id: string, member_id: string, loan_amount: float, funded_amount: float, loan_term_months: string, interest_rate: float, monthly_installment: float, issue_date: string, loan_status: string, loan_purpose: string, loan_title: string, ingest_date: timestamp]

#### 3. Dropping the rows which has null values in the mentioned columns

In [15]:
columns_to_check = ["loan_amount", "funded_amount", "loan_term_months", "interest_rate", "monthly_installment", "issue_date", "loan_status", "loan_purpose"]

In [16]:
loans_filtered_df = loans_df_ingestd.na.drop(subset=columns_to_check)

In [17]:
loans_filtered_df.count()

2260667

In [18]:
loans_filtered_df.createOrReplaceTempView("loans")

In [19]:
loans_filtered_df.show()

+--------+--------------------+-----------+-------------+----------------+-------------+-------------------+----------+-----------+------------------+--------------------+--------------------+
| loan_id|           member_id|loan_amount|funded_amount|loan_term_months|interest_rate|monthly_installment|issue_date|loan_status|      loan_purpose|          loan_title|         ingest_date|
+--------+--------------------+-----------+-------------+----------------+-------------+-------------------+----------+-----------+------------------+--------------------+--------------------+
|56633077|b59d80da191f5b573...|     3000.0|       3000.0|       36 months|         7.89|              93.86|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|2024-04-19 10:07:...|
|55927518|202d9f56ecb7c3bc9...|    15600.0|      15600.0|       36 months|         7.89|             488.06|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|2024-04-19 10:07:...|
|56473345|e5a140c0922b554b9...|    

#### 4. convert loan_term_months to integer

In [20]:
from pyspark.sql.functions import regexp_replace, col

In [21]:
loans_term_modified_df = loans_filtered_df.withColumn("loan_term_months", (regexp_replace(col("loan_term_months"), " months", "") \
.cast("int") / 12) \
.cast("int")) \
.withColumnRenamed("loan_term_months","loan_term_years")

In [22]:
loans_term_modified_df.show()

+--------+--------------------+-----------+-------------+---------------+-------------+-------------------+----------+-----------+------------------+--------------------+--------------------+
| loan_id|           member_id|loan_amount|funded_amount|loan_term_years|interest_rate|monthly_installment|issue_date|loan_status|      loan_purpose|          loan_title|         ingest_date|
+--------+--------------------+-----------+-------------+---------------+-------------+-------------------+----------+-----------+------------------+--------------------+--------------------+
|56633077|b59d80da191f5b573...|     3000.0|       3000.0|              3|         7.89|              93.86|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|2024-04-19 10:07:...|
|55927518|202d9f56ecb7c3bc9...|    15600.0|      15600.0|              3|         7.89|             488.06|  Aug-2015| Fully Paid|       credit_card|Credit card refin...|2024-04-19 10:07:...|
|56473345|e5a140c0922b554b9...|    20000

In [23]:
loans_term_modified_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- funded_amount: float (nullable = true)
 |-- loan_term_years: integer (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- monthly_installment: float (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- loan_title: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



#### 5. Clean the loans_purpose column

In [24]:
loans_term_modified_df.createOrReplaceTempView("loans")

In [25]:
spark.sql("select distinct(loan_purpose) from loans").show()


+--------------------+
|        loan_purpose|
+--------------------+
|             wedding|
|         educational|
|               other|
|      small_business|
|  debt_consolidation|
|         credit_card|
|              moving|
|            vacation|
|    renewable_energy|
|               house|
|                 car|
|      major_purchase|
|             medical|
|    home_improvement|
|and also pay off ...|
|and if they are a...|
|never had any tro...|
|<br/><br/>Lending...|
|I became his prim...|
|on one of the bus...|
+--------------------+
only showing top 20 rows



In [26]:
spark.sql("select loan_purpose, count(*) as total from loans group by loan_purpose order by total desc")

DataFrame[loan_purpose: string, total: bigint]

In [27]:
loan_purpose_lookup = ["debt_consolidation", "credit_card", "home_improvement", "other", "major_purchase", "medical", "small_business", "car", "vacation", "moving", "house", "wedding", "renewable_energy", "educational"]

In [28]:
from pyspark.sql.functions import when

In [29]:
loans_purpose_modified = loans_term_modified_df.withColumn("loan_purpose", when(col("loan_purpose").isin(loan_purpose_lookup), col("loan_purpose")).otherwise("other"))

In [30]:
loans_purpose_modified.createOrReplaceTempView("loans")

In [31]:
spark.sql("select loan_purpose, count(*) as total from loans group by loan_purpose order by total desc")

DataFrame[loan_purpose: string, total: bigint]

In [32]:
from pyspark.sql.functions import count

In [33]:
loans_purpose_modified.groupBy("loan_purpose").agg(count("*").alias("total")).orderBy(col("total").desc())

DataFrame[loan_purpose: string, total: bigint]

In [36]:
# loans_purpose_modified.show(2)

In [34]:
# loans_purpose_modified.write \
# .format("parquet") \
# .mode("overwrite") \
# .option("path", "/user/itv006277/lendingclubproject/raw/cleaned/loans_parquet") \
# .save()

Py4JJavaError: An error occurred while calling o134.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [None]:
loans_purpose_modified.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv006277/lendingclubproject/raw/cleaned/loans_csv") \
.save()