In [1]:
from utils.spark_utils import spark
import pyspark.sql.functions as F
from utils.db_utils import pgDB

In [2]:
SheetLoanLevel = spark(
    config_key="spark.jars.packages",
    config_value="com.crealytics:spark-excel_2.12:0.13.5"
).SparkReadExcels(
    fileFormat="com.crealytics.spark.excel",
    header="true",
    inferSchema="true",
    dataAddress="'Loan Level'!A1",
    locationFile="/home/jovyan/work/DE_Rand_test (2).xlsx"
)

In [3]:
SheetPartnerLevel = spark(
    config_key="spark.jars.packages",
    config_value="com.crealytics:spark-excel_2.12:0.13.5"
).SparkReadExcels(
    fileFormat="com.crealytics.spark.excel",
    header="true",
    inferSchema="true",
    dataAddress="'Partner Level'!A1",
    locationFile="/home/jovyan/work/DE_Rand_test (2).xlsx"
).na.drop(
    subset=[
        "partner",
        "name"
    ]
)

In [4]:
print(SheetLoanLevel.count())
print(SheetPartnerLevel.count())

10000
4


In [5]:
SheetLoanLevel.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- borrower_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- partner: string (nullable = true)
 |-- current_dpd: double (nullable = true)
 |-- max_dpd: double (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- loan_term: double (nullable = true)



In [6]:
SheetPartnerLevel.printSchema()

root
 |-- partner: string (nullable = true)
 |-- name: string (nullable = true)



In [7]:
# 1.A change schema
SheetLoanLevel = SheetLoanLevel \
    .withColumn("loan_amount", SheetLoanLevel.loan_amount.cast('int')) \
    .withColumn("current_dpd", SheetLoanLevel.current_dpd.cast('int')) \
    .withColumn("max_dpd", SheetLoanLevel.max_dpd.cast('int')) \
    .withColumn("loan_term", SheetLoanLevel.loan_term.cast('int'))

In [8]:
SheetLoanLevel.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- borrower_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- partner: string (nullable = true)
 |-- current_dpd: integer (nullable = true)
 |-- max_dpd: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- loan_term: integer (nullable = true)



In [9]:
SheetLoanLevel.select('status').distinct().collect()

[Row(status='CANCELLED'),
 Row(status='LIVE'),
 Row(status='CLOSED'),
 Row(status='PENDING')]

In [10]:
# 1.B The variable ‘status’ in ‘loan’ has a ‘Pending’ value. 
# with Python Code, change it into ‘SUBMITTED’.

SheetLoanLevel = SheetLoanLevel \
    .withColumn('status', F.when(SheetLoanLevel.status=='PENDING', 'SUBMITTED') \
    .otherwise(SheetLoanLevel.status))

In [11]:
SheetLoanLevel.select('status').distinct().collect()

[Row(status='SUBMITTED'),
 Row(status='CANCELLED'),
 Row(status='LIVE'),
 Row(status='CLOSED')]

In [12]:
db_connection = {
    'user': 'user-name',
    'password': 'strong-password',
    'host': '172.28.0.2',
    'port': '5432',
    'database': 'user-name',
}

In [13]:
# 1.C Ingest the data into the Database.

# Insert Loan Level Table
pgDB(
    user=db_connection['user'],
    password=db_connection['password'],
    host=db_connection['host'],
    port=db_connection['port'],
    database=db_connection['database']
).SparkDataFrameToDB(
    TableNameDB='loan',
    SparkDataFrame=SheetLoanLevel,
    if_exists='replace',
    index=False
)

# Insert Partner Level Table
pgDB(
    user=db_connection['user'],
    password=db_connection['password'],
    host=db_connection['host'],
    port=db_connection['port'],
    database=db_connection['database']
).SparkDataFrameToDB(
    TableNameDB='partner',
    SparkDataFrame=SheetPartnerLevel,
    if_exists='replace',
    index=False
)