In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import functions as func
from functools import reduce


spark = SparkSession.builder \
 .appName("moving_average")\
 .config("Pausedspark.sql.shuffle.partitions", "4") \
 .getOrCreate()


In [2]:
  df = spark.read \
 .option("inferSchema", "true") \
 .option("header", "true") \
 .option("delimiter", ",") \
 .csv("/home/spark/Downloads/archive/rejected_2007_to_2018q4.csv")

The history saving thread hit an unexpected error (OperationalError('database is locked',)).History will not be written to the database.


In [3]:
## Schema print just after loading file. so it has space in column name
df.printSchema()

root
 |-- Amount Requested: double (nullable = true)
 |-- Application Date: string (nullable = true)
 |-- Loan Title: string (nullable = true)
 |-- Risk_Score: string (nullable = true)
 |-- Debt-To-Income Ratio: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Employment Length: string (nullable = true)
 |-- Policy Code: string (nullable = true)



In [4]:
## Total count of file
df.count()


27648741

In [5]:

current_columns = df.columns # Returns list of columns as python list
print(current_columns)

new_columns = list(map(lambda item : item.replace(" ","_").upper(),current_columns)) 

# Replacing white spaces with '_' and converting the whole column name to upper case

print(new_columns)


df_ma = reduce(lambda data, idx: data.withColumnRenamed(current_columns[idx], new_columns[idx]), range(len(current_columns)), df)

df_ma.printSchema()



['Amount Requested', 'Application Date', 'Loan Title', 'Risk_Score', 'Debt-To-Income Ratio', 'Zip Code', 'State', 'Employment Length', 'Policy Code']
['AMOUNT_REQUESTED', 'APPLICATION_DATE', 'LOAN_TITLE', 'RISK_SCORE', 'DEBT-TO-INCOME_RATIO', 'ZIP_CODE', 'STATE', 'EMPLOYMENT_LENGTH', 'POLICY_CODE']
root
 |-- AMOUNT_REQUESTED: double (nullable = true)
 |-- APPLICATION_DATE: string (nullable = true)
 |-- LOAN_TITLE: string (nullable = true)
 |-- RISK_SCORE: string (nullable = true)
 |-- DEBT-TO-INCOME_RATIO: string (nullable = true)
 |-- ZIP_CODE: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- EMPLOYMENT_LENGTH: string (nullable = true)
 |-- POLICY_CODE: string (nullable = true)



In [6]:
## Filterd only 2007 records
df_ma=df_ma.filter(year("APPLICATION_DATE") == lit("2007"))

## count of 2007 data only
df_ma.count()

5274

In [14]:
##converting timestamp column to seconds, and then using use the rangeBetween function in the pyspark.sql.Window class to include the correct rows in window
df_ma = df_ma.withColumn('APPLICATION_DATE', df_ma.APPLICATION_DATE.cast('timestamp'))
days = lambda i: i * 86400
df_ma = df_ma.withColumn('APPLICATION_DATE', df_ma.APPLICATION_DATE.cast('timestamp'))
windowSpec = Window.orderBy(func.col("APPLICATION_DATE").cast('long')).rangeBetween(-days(50), 0)
df2 = df_ma.withColumn('RiskScoreMA50', func.avg("RISK_SCORE").over(windowSpec)) 

## select only few columns
df2[to_date('APPLICATION_DATE').alias('APPLICATION_DATE'),'RISK_SCORE','RiskScoreMA50'].show(100,False)

+----------------+----------+-----------------+
|APPLICATION_DATE|RISK_SCORE|RiskScoreMA50    |
+----------------+----------+-----------------+
|2007-05-26      |693.0     |698.0            |
|2007-05-26      |703.0     |698.0            |
|2007-05-27      |715.0     |669.5            |
|2007-05-27      |698.0     |669.5            |
|2007-05-27      |509.0     |669.5            |
|2007-05-27      |645.0     |669.5            |
|2007-05-27      |693.0     |669.5            |
|2007-05-27      |700.0     |669.5            |
|2007-05-28      |694.0     |675.4375         |
|2007-05-28      |573.0     |675.4375         |
|2007-05-28      |710.0     |675.4375         |
|2007-05-28      |680.0     |675.4375         |
|2007-05-28      |688.0     |675.4375         |
|2007-05-28      |704.0     |675.4375         |
|2007-05-28      |694.0     |675.4375         |
|2007-05-28      |708.0     |675.4375         |
|2007-05-29      |685.0     |683.9285714285714|
|2007-05-29      |698.0     |683.9285714

In [15]:
## Inserted the data in hive table.
df2.write.mode("append").saveAsTable("honest.rejected_loan_2007")

In [16]:
## Created temp table on top of the dataframe
df2.createOrReplaceTempView("rejected_loan_2007") 

## just a fitler check to show we have data of of 2007 year only
spark.sql("select count(*) from rejected_loan_2007 where year(APPLICATION_DATE) <> '2007'").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [17]:
## selecting all the column from table;
spark.sql("select * from rejected_loan_2007").show()

+----------------+-------------------+--------------------+----------+--------------------+--------+-----+-----------------+-----------+-----------------+
|AMOUNT_REQUESTED|   APPLICATION_DATE|          LOAN_TITLE|RISK_SCORE|DEBT-TO-INCOME_RATIO|ZIP_CODE|STATE|EMPLOYMENT_LENGTH|POLICY_CODE|    RiskScoreMA50|
+----------------+-------------------+--------------------+----------+--------------------+--------+-----+-----------------+-----------+-----------------+
|          1000.0|2007-05-26 00:00:00|Wedding Covered b...|     693.0|                 10%|   481xx|   NM|          4 years|        0.0|            698.0|
|          1000.0|2007-05-26 00:00:00|  Consolidating Debt|     703.0|                 10%|   010xx|   MA|         < 1 year|        0.0|            698.0|
|         11000.0|2007-05-27 00:00:00|Want to consolida...|     715.0|                 10%|   212xx|   MD|           1 year|        0.0|            669.5|
|          6000.0|2007-05-27 00:00:00|             waksman|     698.0|