# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [1]:
# Do all imports and installs here
# Loading all library
import numpy as np
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window
import boto3
import time
from functools import reduce
import pandas as pd
# Change padans parameter to adjust visliazation
pd.set_option('max_colwidth', 200)
pd.set_option('display.max_columns', 200)
# Load in aws credential
config = configparser.ConfigParser()
config.read_file(open('csp.cfg'))
KEY                      = config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET                   = config.get('AWS','AWS_SECRET_ACCESS_KEY')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
# Define a function that creates spark session
start_time = time.time()
def create_spark_session():
    """
    This function is used to create a spark session to work in
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
# Create spark session
spark = create_spark_session()

In [3]:
# Load data from AWS s3
df_spark = spark.read.csv("s3a://john-udacity-s3/loan/ibrd-statement-of-loans-historical-data.csv", header=True)
# function that will uppercase everything in the dataframe
fields = df_spark.schema.fields
stringFields = filter(lambda f: isinstance(f.dataType, StringType), fields)
nonStringFields = map(lambda f: col(f.name), filter(lambda f: not isinstance(f.dataType, StringType), fields))
stringFieldsTransformed = map(lambda f: upper(col(f.name)), stringFields) 
allFields = [*stringFieldsTransformed, *nonStringFields]
df_new = df_spark.select(allFields)
# rename the column name
oldColumns = df_new.schema.names
newColumns  = ['End_of_Period', 'Loan_Number', 'Region', 'Country_Code', 'Country','Borrower','Guarantor_Country_Code','Guarantor','Loan_Type','Loan_Status','Interest_Rate','Currency_of_Commitment','Project_ID','Project_Name','Original_Principal_Amount','Cancelled_Amount','Undisbursed_Amount','Disbursed_Amount','Repaid_to_IBRD','Due_to_IBRD','Exchange_Adjustment','Borrowers_Obligation','Sold_3rd_Party','Repaid_3rd_Party','Due_3rd_Party','Loans_Held','First_Repayment_Date','Last_Repayment_Date','Agreement_Signing_Date','Board_Approval_Date','Effective_Date_Most_Recent','Closed_Date_Most_Recent','Last_Disbursement_Date']
df = reduce(lambda df_spark, idx: df_spark.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df_new)

In [4]:
# This is to display the number of null value of each column
#from pyspark.sql.functions import col,sum
#df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).toPandas()

In [5]:
#df.filter('Project_ID = \'P051696\'').distinct().toPandas()
#df.select('Borrower','Loan_Number').groupBy('Borrower','Loan_Number').count().withColumnRenamed('count', 'ccount').groupBy('Loan_Number').count().filter('count>1').toPandas()
#df.filter('Project_ID = \'P051696\'').distinct().toPandas()
#df.filter('Project_Name is not NULL').select('Project_Name','Loan_Number').groupBy('Project_Name','Loan_Number').count().withColumnRenamed('count', 'ccount').groupBy('Loan_Number').count().filter('count>1').toPandas()

# Drop null value
df = df.drop('Last_Disbursement_Date')
df = df.drop('Currency_of_Commitment')
df = df.drop('Borrower')
df = df.drop('Project_Name')

In [6]:
# edit the loan_number to make it 9 digits
df_good = df.filter(length(col("Loan_Number")) == 9)
df_6 = df.where(length(col("Loan_Number")) == 6).withColumn("Loan_Number", regexp_replace(col("Loan_Number") ,  "(\\w{4})(\\d{2})" , "$1000$2" ))
df_7 = df.where(length(col("Loan_Number")) == 7).withColumn("Loan_Number", regexp_replace(col("Loan_Number") ,  "(\\w{4})(\\d{3})" , "$100$2" ))
df_8 = df.where(length(col("Loan_Number")) == 8).withColumn("Loan_Number", regexp_replace(col("Loan_Number") ,  "(\\w{4})(\\d{4})" , "$10$2" ))
# No records with loan_number that has less than 9 digits
df = df_good.union(df_6).union(df_7).union(df_8)

In [7]:
# if both Guarantor_Country_Code and Guarantor are empty, then it's hard to say whether they are suppose to be empty(no guarantor)
# or they are missing values, so I just drop them.
df = df.filter('Guarantor_Country_Code is not NULL or Guarantor is not NULL')

In [8]:
# For each loan number, there should be one country code. Run below code, we will find there are 3 records that 
#df.select('Loan_Number','Country_Code').distinct().groupBy('Loan_Number').count().withColumnRenamed('count', 'ccount').filter('count>1').toPandas()
#withColumnRenamed('count', 'ccount').groupBy('Loan_Number').count().filter('count>1').toPandas()
#xx = df.select('Loan_Number','Country').distinct().groupBy('Loan_Number').count().filter('count>1').select('Loan_Number').collect()
#xarr = [str(xx[i].Loan_Number) for i in range(len(xx))]
#for i in xarr:
#    print(i)
#df.where(df.Loan_Number == 'IBRD82610').select('Country').groupBy('Country').count().toPandas()
#df.where(df.Loan_Number == 'IBRD82550').select('Country').groupBy('Country').count().toPandas()
#df.where(df.Loan_Number == 'IBRD82580').select('Country').groupBy('Country').count().toPandas()
df = df.where((df.Loan_Number == 'IBRD82580') & (df.Country != 'CHINA') | (df.Loan_Number != 'IBRD82580'))
df = df.where((df.Loan_Number == 'IBRD82550') & (df.Country != 'CHINA') | (df.Loan_Number != 'IBRD82550'))
df = df.where((df.Loan_Number == 'IBRD82610') & (df.Country != 'INDIA') | (df.Loan_Number != 'IBRD82610'))
# For each loan number, there should be one country code. Run below code, we will find there are 3 records that 
#df.select('Loan_Number','Country_Code').distinct().groupBy('Loan_Number').count().withColumnRenamed('count', 'ccount').filter('count>1').toPandas()
#withColumnRenamed('count', 'ccount').groupBy('Loan_Number').count().filter('count>1').toPandas()

In [9]:
# Remove all missing value for Borrower column
# Get distinct Loan_Number for the records that are not null
x1 = df.filter('Project_ID is not NULL').select('Loan_Number','Project_ID').distinct().toPandas().set_index('Loan_Number')['Project_ID'].to_dict() 
# Get distinct Loan_Number for the records that are null
pdf = df.filter('Project_ID is NULL').toPandas()
for index,row in pdf.iterrows():
    att = row.Loan_Number
    if att in x1.keys():
        row.Project_ID = x1.get(att)
ddd = spark.createDataFrame(pdf.astype(str)).filter('Project_ID is not NULL')
df = df.filter('Project_ID is not NULL').union(ddd)

In [10]:
c1 = df.select('Loan_Number','Country','Country_Code').distinct().toPandas().set_index('Loan_Number').T.to_dict('list')
c1b = df.filter('Guarantor == \'UNITED KINGDOM\'').select('Loan_Number').distinct().collect()
for i in c1b:
    c1[i.Loan_Number] = ['UNITED KINGDOM','GB']
pdf = df.filter('Guarantor is NULL or Guarantor_Country_Code is NULL').toPandas()
for index,row in pdf.iterrows():
    att = row.Loan_Number
    if att in c1.keys():
        row.Guarantor = c1.get(att)[0]
        row.Guarantor_Country_Code = c1.get(att)[1]
ddd = spark.createDataFrame(pdf).filter('Guarantor is not NULL and Guarantor_Country_Code is not NULL')
df = df.filter('Guarantor is not NULL and Guarantor_Country_Code is not NULL').union(ddd)

In [11]:
i1 = df.filter('Interest_Rate is not NULL').filter('Interest_Rate != \'None\'').select('Loan_Number','Interest_Rate').distinct().toPandas()
i1.Interest_Rate = i1.Interest_Rate.astype(np.float16)
i1 = i1.groupby('Loan_Number', as_index=True).agg({"Interest_Rate": "mean"})['Interest_Rate'].to_dict()
pdf = df.filter('Interest_Rate is NULL').toPandas()
for index,row in pdf.iterrows():
    att = row.Interest_Rate
    if att in c1.keys():
        row.Interest_Rate = c1.get(att)
ddd = spark.createDataFrame(pdf.astype(str)).filter('Interest_Rate is not NULL')
df = df.filter('Interest_Rate is not NULL').union(ddd)


In [12]:
def clean(df, column):
    x1 = df.filter('{} is not NULL'.format(column)).filter('{} != \'None\''.format(column)).select('Loan_Number','{}'.format(column)).distinct().toPandas().set_index('Loan_Number')['{}'.format(column)].to_dict()
    pdf = df.filter('{} is NULL'.format(column)).toPandas()
    for index,row in pdf.iterrows():
        att = row.Loan_Number
        if att in x1.keys():
            row.column = x1.get(att)
    ddd = spark.createDataFrame(pdf.astype(str)).filter('{} is not NULL'.format(column)).filter('{} != \'None\''.format(column))
    df = df.filter('{} is not NULL'.format(column)).union(ddd)
    return df

In [13]:
df = clean(df,'First_Repayment_Date')
df = clean(df,'Agreement_Signing_Date')
df = clean(df,'Closed_Date_Most_Recent')
df = clean(df,'Effective_Date_Most_Recent')
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).toPandas()


Unnamed: 0,End_of_Period,Loan_Number,Region,Country_Code,Country,Guarantor_Country_Code,Guarantor,Loan_Type,Loan_Status,Interest_Rate,Project_ID,Original_Principal_Amount,Cancelled_Amount,Undisbursed_Amount,Disbursed_Amount,Repaid_to_IBRD,Due_to_IBRD,Exchange_Adjustment,Borrowers_Obligation,Sold_3rd_Party,Repaid_3rd_Party,Due_3rd_Party,Loans_Held,First_Repayment_Date,Last_Repayment_Date,Agreement_Signing_Date,Board_Approval_Date,Effective_Date_Most_Recent,Closed_Date_Most_Recent
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [14]:
# Generate country table 
df_country = df.select('Country_Code','Country','Region').distinct()
newRow = spark.createDataFrame([('GB','United Kingdom','EUROPE AND CENTRAL ASIA')])
df_country = df_country.union(newRow)
df_country = df_country.withColumn('Country_Id',row_number().over(Window.orderBy(monotonically_increasing_id())))
#df_country.limit(2).toPandas()

In [15]:
# Generate time table 
df_time = df.select('Loan_Number','First_Repayment_Date','Last_Repayment_Date','Agreement_Signing_Date','Board_Approval_Date','Effective_Date_Most_Recent','Closed_Date_Most_Recent').distinct()
df_time = df_time.withColumn('Time_Id',row_number().over(Window.orderBy(monotonically_increasing_id())))
#df_time.limit(2).toPandas()

In [16]:
# Generate Amount table 
df_amount = df.select('Loan_Number','Original_Principal_Amount','Cancelled_Amount','Undisbursed_Amount','Disbursed_Amount','Repaid_to_IBRD','Due_to_IBRD','Sold_3rd_Party','Repaid_3rd_Party', 'Due_3rd_Party', 'Loans_Held').distinct()
df_amount = df_amount.withColumn('Amount_Id',row_number().over(Window.orderBy(monotonically_increasing_id())))
#df_amount.limit(2).toPandas()

In [17]:
# Generate Loan_Type table 
df_loan_type = df.select('Loan_Type').distinct()
df_loan_type = df_loan_type.withColumn('Loan_Type_Id',row_number().over(Window.orderBy(monotonically_increasing_id())))
#df_loan_type.limit(2).toPandas()

In [18]:
# Generate Loan_Status table 
df_loan_status = df.select('Loan_Status').distinct()
df_loan_status = df_loan_status.withColumn('Loan_Status_Id',row_number().over(Window.orderBy(monotonically_increasing_id())))
#df_loan_status.limit(2).toPandas()
print("--- %s seconds ---" % (time.time() - start_time))

--- 1355.7922852039337 seconds ---


In [31]:
df_country.createOrReplaceTempView("country")
df_time.createOrReplaceTempView("time")
df_amount.createOrReplaceTempView("amount")
df_loan_type.createOrReplaceTempView("loan_type")
df_loan_status.createOrReplaceTempView("loan_status")
df.createOrReplaceTempView("log")

In [32]:
log_norm = spark.sql("""
    select l.Loan_Number, l.End_of_Period, l.Interest_Rate, l.Project_ID, l.Exchange_Adjustment, l.Borrowers_Obligation,  \
    c.Country_Id, cc.Country_Id as Guarantor_Country_Id, t.Time_Id, a.Amount_Id, lt.Loan_Type_Id, ls.Loan_Status_Id \
    from ((country c \
    join log l on c.Country_Code = l.Country_Code and c.Country = l.Country and c.Region = l.Region) \
    join country cc on cc.Country = l.Guarantor) \
    join time t on t.Loan_Number = l.Loan_Number \
    join amount a on a.Loan_Number = l.Loan_Number \
    join loan_type lt on lt.Loan_Type = l.Loan_Type \
    join loan_status ls on ls.Loan_Status = l.Loan_Status
    """) 

In [None]:
log_norm.limit(1).show()

In [24]:
log_norm.createOrReplaceTempView("log_norm")

In [None]:
log_norm1 = spark.sql("""
    select Loan_Number, Country_Id,  from log_norm c join log l on c.Country_Code = l.Country_Code and c.Country = l.Country and c.Region = l.Region
    """) 

In [None]:
import collections
print ([item for item, count in collections.Counter(t).items() if count > 1])

In [None]:
# Performing cleaning tasks here

def process_song_data(spark, input_data, output_data):
    """
    This function is used to load songs data from s3 to our data lake and export as parquet file back to my s3 folder.
    """
    # get filepath to song data file
    song_data = input_data + 'song_data/A/A/A/*'
    # uncomment if you want to load all data
    #song_data = input_data + 'song_data/*/*/*/*'
    
    # read song data file, using song_data/A/A/A/* for performance
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration')
    
    # convert the data type to proper data type for each column
    fields_1 = {'song_id':'string','title':'string', 'artist_id':'string', 'year':'int', 'duration':'float'}
    exprs_1 = [ "cast ({} as {})".format(key,value) for key, value in fields_1.items()]
    songs_table = songs_table.selectExpr(*exprs_1)
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year','artist_id').parquet(output_data + "songs.parquet")

    # extract columns to create artists table
    artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')
    
    # convert the data type to proper data type for each column
    fields_2 = {'artist_id':'string', 'artist_name':'string', 'artist_location':'string', 'artist_latitude':'string', 'artist_longitude':'string'}
    exprs_2 = [ "cast ({} as {})".format(key,value) for key, value in fields_2.items()]
    songs_table = artists_table.selectExpr(*exprs_2)
    
    # write artists table to parquet files
    artists_table.write.parquet(output_data + "artists.parquet")



In [None]:
# Read in the data here
s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )
## This is to display what we have in S3 
sampleDbBucket =  s3.Bucket("john-udacity-s3")
i = 0
for obj in sampleDbBucket.objects.filter(Prefix="movie"):
    print(obj)
    i += 1
    if i > 10:
        break  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.