In [1]:
!pip install pyyaml
!pip install psycopg2
!pip install sqlalchemy



Installing required packages.

In [2]:
import yaml
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
import matplotlib.pyplot as plt
import numpy as np

Importing required modules.

In [3]:
def credentials():
    with open("credentials.yaml", "r") as stream:
        try:
            cred = yaml.safe_load(stream)
            return cred
        except yaml.YAMLError:
            print(yaml.YAMLError)

Defining "credentials" function which returns the credentials from the "credentials.yaml" file.

In [4]:
class RDSDatabaseConnector:
    """
    Extracts the remote database to a csv on the local machine.

    Parameters:
    ----------
    cred: dictionary
        These are the database credentials obtained from the credentials.yaml file.
        They have been converted to a dictionary by a previous function in this file.

    Attributes:
    ----------
    cred: dictionary
        Check Parameters section.
    engine: engine
        Database obtained from the SQLAlchemy method below.
    loan_payments: pd.DataFrame
        Pandas dataframe obtained from the extract_data method below.

    Methods:
    ----------
    SQLAlchemy(cred)
        Imports the RDS database using the credentials. Returns "engine".
    extract_data(engine)
         Converts the database to a pandas data frame. Returns the data frame "loan_payments".
    save_csv(loan_payments)
        Saves the previously obtained data frame to the current directory as a csv file.
    """

    # Class constructor
    def __init__(self, cred):
        self.cred = cred
        
    # Methods
    def SQLAlchemy(self, cred):
        engine = create_engine(f"postgresql+psycopg2://{self.cred['RDS_USER']}:{self.cred['RDS_PASSWORD']}@{self.cred['RDS_HOST']}:{self.cred['RDS_PORT']}/{self.cred['RDS_DATABASE']}")
        return engine
    
    def extract_data(self, engine):
        sql_query = pd.read_sql_table("loan_payments", engine)
        loan_payments = pd.DataFrame(sql_query)
        return loan_payments

    def save_csv(self, loan_payments):
        return loan_payments.to_csv("loan_payments.csv")

Initialises a new class "RDSDatabaseConnector" which extracts the data from a RDS database to a csv file on the local machine. The class contains three methods.\
"SQLAlchemy" - creates an engine which connects to the remote RDS database using the credentials from the YAML file.\
"extract_data" - extracts the required data from the database\
"save_csv" - saves the previously extracted data as a csv file on the local machine

In [5]:
def csv_to_df():
    with open("loan_payments.csv", "r") as payments:
        payments_df = pd.read_csv(payments)
        return payments_df

This function imports the contents of the previously saved csv file as a pandas data frame.

In [6]:
class DataTransform:
    """
    Transforms the data types of the columns in the dataframe.

    Parameters:
    ----------
    df: pandas dataframe
        This is the dataframe to be modifed by the class, this was obatained from the csv file produced by the previous file.
        The file was returned as a dataframe by a previous function in this file.

    Attributes:
    ----------
    df: pandas dataframe
        Check Parameters section.

    Methods:
    ----------
    change_data_type_category(df)
        This function changes the data types of the following columns to category:
        Change TERM, GRADE, SUB_GRADE, EMPLOYMENT_LENGTH, HOME_OWNERSHIP, VERIFICATION_STATUS, LOAN_STATUS, PAYMENT_PLAN, PURPOSE, APPLICATION_TYPE to category.
    change_data_type_datetime(df)
        This function changes the data types of the following columns to datetime64:
        Change ISSUE_DATE, EARLIEST_CREDIT_LINE, LAST_PAYMENT_DATE, NEXT_PAYMENT_DATE, LAST_CREDIT_PULL_DATE to datetime64.
    """

    # Class constructor
    def __init__(self, df):
        self.df = df

    # Methods
    def change_data_type_category(self, df):
        for column in ["term", "grade", "sub_grade", "employment_length", "home_ownership", "verification_status", "loan_status", "payment_plan", "purpose", "application_type"]:
            df[column] = df[column].astype("category")
        return df
    def change_data_type_datetime(self, df):
        for column in ["issue_date", "earliest_credit_line", "last_payment_date", "next_payment_date", "last_credit_pull_date"]:
            df[column] = pd.to_datetime(df[column], format = "%d/%m/%Y")
        return df

Initialises a new class "DataTransform" which transforms the data types of the columns in the previously imported pandas data frame. This class contains two methods.\
"change_data_type_category" - this changes the datatypes of the identified columns to the category datatype.\
"change_data_type_datetime" - this changes the datatypes of columns containing dates/times to datatime.

In [7]:
payments_df = DataTransform(csv_to_df()).change_data_type_category(csv_to_df())
payments_df = DataTransform(payments_df).change_data_type_datetime(payments_df)

The above applies the methods in the DataTransform class to the previously created pandas DataFrame.