# **Project Overview:**

---

This project aims to build an ETL pipeline using Python within Google Colab for processing a real estate loan data from FRED. After transforming the processed data, it will be loaded into BigQuery for further analysis.

In [None]:
# Import all the necessary libraries
import pandas as pd
import requests

# Build the pipeline

---



**1.	Data Ingestion:**

In [None]:
# Constants
# Reference: https://fred.stlouisfed.org/docs/api/fred/series_observations.html
Base_url = 'https://api.stlouisfed.org/fred/series/observations'

In [None]:
# Get data from the API, automatically name each dataset, get rid of any rows that contains ".", and return a dataset dictionary
def get_data(api_key, series_id, units, params):

    params['units'] = units
    list = []

    for key, value in params.items():
      list.append(f'{key}={value}')

    url_params = '&'.join(list)

    url = f'{Base_url}?series_id={series_id}&api_key={api_key}&file_type=json&{url_params}'

    content = requests.get(url)

    if content.status_code == 200:
        data = content.json()
        dataset_name = f'Dataset_{units}'

        filtered_data = []
        for observation in data.get('observations', []):
            if observation['value'] != '.':
                filtered_data.append(observation)

        dataset = {'dataset_name': dataset_name, 'units': units, 'data': filtered_data}
        return dataset
    else:
        print('Error!')
        return None

In [None]:
# Get a list of datasets from the API
def get_datasets_list(api_key, series_id, unit_list, params):
    datasets = []

    for units in unit_list:
        data = get_data(api_key, series_id, units, params)
        datasets.append(data)

    return datasets

**2.	Data Transformation:**


*FIrst dataframe:*

In [None]:
# Create a dataframe and extract only the date and value columns from each dataset dictionary
def create_dataframe(datasets):
    dataframes = []

    for data in datasets:
        df = pd.DataFrame(data['data'], columns=['date', 'value'])
        df.columns = ['date', f'value_{data["units"]}']
        dataframes.append(df)

    return dataframes

In [None]:
# Merge multiple dataframes using inner join
def merge_dataframes(dataframes):
    final_dataframe = dataframes[0]

    for dataframe in dataframes[1:]:
        final_dataframe = final_dataframe.merge(dataframe, on='date', how='inner')

    return final_dataframe

In [None]:
# Map unit abbreviations to column names
def get_column_names(unit_list):
    column_names = []

    for unit in unit_list:
        if unit == 'lin':
            column_name = 'Loan_Amount_in_Billions'
        elif unit == 'chg':
            column_name = 'Loan_Amount_Change_from_Previous_Week_in_Billions'
        elif unit == 'ch1':
            column_name = 'Loan_Amount_Change_from_Previous_Year_in_Billions'
        elif unit == 'pch':
            column_name = 'Weekly_Percent_Change'
        elif unit == 'pc1':
            column_name = 'Yearly_Percent_Change'
        elif unit == 'pca':
            column_name = 'Compounded_Annual_Rate_of_Change'
        elif unit == 'cch':
            column_name = 'Continuously_Compounded_Rate_of_Change'
        elif unit == 'cca':
            column_name = 'Continuously_Compounded_Annual_Rate_of_Change'
        elif unit == 'log':
            column_name = 'Index'
        column_names.append(column_name)

    return column_names

In [None]:
# Rename columns
def rename_columns(dataframe, unit_list):
    column_names = get_column_names(unit_list)
    new_column_names = []
    new_column_names.append('Date')

    for col_name in column_names:
      new_column_names.append(col_name)
    dataframe.columns = new_column_names

    return dataframe

In [None]:
# Change the data types (Data = datetime, other values = float64)
def set_data_types(dataframe):

    for column in dataframe.columns:
        if column == 'Date':
          dataframe[column] = pd.to_datetime(dataframe['Date'])
        else:
            dataframe[column] = dataframe[column].astype('float64')

    return dataframe

*Second Dataframe:*

In [None]:
# Create a dataframe in which a selected column (one of the units) can be aggregated by years
def transform_data(dataframe, column_name):

    dataframe1 = dataframe.copy()
    dataframe1['Year'] = dataframe1['Date'].dt.year

    transformed_df = dataframe1.groupby('Year')[column_name].agg(['sum', 'mean', 'max', 'min']).reset_index()
    transformed_df.rename(columns={
        'sum': 'Total_Loan_Amount_in_billions',
        'mean': 'Average_Loan_Amount_in_billions',
        'max': 'Max_Loan_Amount_in_billions',
        'min': 'Min_Loan_Amount_in_billions',
    }, inplace=True)

    return transformed_df

3.	Data Loading:

---

In [None]:
# Import all the necessary libraries to connect to BigQuery
from google.cloud import bigquery
from pandas_gbq import to_gbq, read_gbq

In [None]:
# Prepare schema
def prepare_bigquery_schema(dataframe):
    columns_and_types = []

    for col in dataframe.columns:
        if dataframe[col].dtype == 'float64':
            columns_and_types.append({'name': col, 'type': 'FLOAT'})
        elif dataframe[col].dtype == 'int64':
            columns_and_types.append({'name': col, 'type': 'INTEGER'})
        elif dataframe[col].dtype == 'datetime64[ns]':
            columns_and_types.append({'name': col, 'type': 'DATE'})
        else:
            columns_and_types.append({'name': col, 'type': 'STRING'})

    return columns_and_types

In [None]:
# - Load data into BigQuery (https://pandas-gbq.readthedocs.io/en/latest/api.html#pandas_gbq.to_gbq)
# - Check to see if the dataset is succefully loaded to BigQuery
def load_data_to_bigquery(dataframe, project_id, dataset_id, table_name, if_exists='replace'):
    full_table_name = f'{project_id}.{dataset_id}.{table_name}'

    columns_and_types = prepare_bigquery_schema(dataframe)

    bq_table = to_gbq(dataframe, full_table_name, project_id=project_id, if_exists=if_exists, table_schema=columns_and_types)

    # Check to see if the data is loaded successfully
    query = f'SELECT * FROM {full_table_name}'
    loaded_data = read_gbq(query, project_id=project_id)

    print(f'First few rows of loaded data from {dataset_id}.{table_name}:')
    print(f'{loaded_data.head()}\n \n')

# Run the pipeline

---



You can customize the data by adjusting parameters such as the number of data entries (limit), the start and end dates of observations, the order (ascending or descending) of the data, and the unit. Please refer to the documentation on github for more information about the parameters.

In [None]:
# Feel free to customize the variables below
api_key = 'af7cc74c15dc754573d689fbb7533dbe' # replace with your api key
series_id = 'CREACBW027SBOG'
Default_params = {
    'limit': 100000,
    'sort_order': 'asc',
    'observation_start': '1774-01-04',
    'observation_end': '9999-12-31'
    }
unit_list = ['lin','chg','ch1','pch','pc1','pca','cch','cca','log']
agg_column_name = 'Loan_Amount_in_Billions' # pick which column to be aggregate

# My BigQuery info
project_id = 'numeric-datum-406721' # replace with your project_id
dataset_id = 'ETLproject'
table_name1 = 'real_estate_loan'
table_name2 = 'real_estate_loan_by_years'

In [None]:
# Call the function to get a list of datasets, create dataframes, merge them into a dataframe, and check the data types
datasets_list = get_datasets_list(api_key, series_id, unit_list, Default_params)
dataframes = create_dataframe(datasets_list)
final_dataframe = merge_dataframes(dataframes)

final_dataframe = rename_columns(final_dataframe, unit_list)
print(f'Current Data Type:\n{final_dataframe.dtypes}\n {final_dataframe.shape}')

Current Data Type:
Date                                                 object
Loan_Amount_in_Billions                              object
Loan_Amount_Change_from_Previous_Week_in_Billions    object
Loan_Amount_Change_from_Previous_Year_in_Billions    object
Weekly_Percent_Change                                object
Yearly_Percent_Change                                object
Compounded_Annual_Rate_of_Change                     object
Continuously_Compounded_Rate_of_Change               object
Continuously_Compounded_Annual_Rate_of_Change        object
Index                                                object
dtype: object
 (965, 10)


In [None]:
# Call the function to change the data types
real_estate_loan_df = set_data_types(final_dataframe)
print(f'Updated dtypes:\n{real_estate_loan_df.dtypes}\n{real_estate_loan_df.head()}')

Updated dtypes:
Date                                                 datetime64[ns]
Loan_Amount_in_Billions                                     float64
Loan_Amount_Change_from_Previous_Week_in_Billions           float64
Loan_Amount_Change_from_Previous_Year_in_Billions           float64
Weekly_Percent_Change                                       float64
Yearly_Percent_Change                                       float64
Compounded_Annual_Rate_of_Change                            float64
Continuously_Compounded_Rate_of_Change                      float64
Continuously_Compounded_Annual_Rate_of_Change               float64
Index                                                       float64
dtype: object
        Date  Loan_Amount_in_Billions  \
0 2005-06-01                1158.3338   
1 2005-06-08                1161.1041   
2 2005-06-15                1164.0374   
3 2005-06-22                1168.6571   
4 2005-06-29                1173.4074   

   Loan_Amount_Change_from_Previous_Week_in

In [None]:
# Call the function to create a dataframe in which a selected column can be aggregated by years
real_estate_loan_by_years_df = transform_data(real_estate_loan_df, agg_column_name)
real_estate_loan_by_years_df.head()

Unnamed: 0,Year,Total_Loan_Amount_in_billions,Average_Loan_Amount_in_billions,Max_Loan_Amount_in_billions,Min_Loan_Amount_in_billions
0,2005,37701.1084,1216.164787,1275.6285,1158.3338
1,2006,71447.676,1373.993769,1460.7168,1278.795
2,2007,79275.0592,1524.520369,1592.7756,1466.0321
3,2008,88124.7487,1662.731108,1731.4749,1592.0548
4,2009,88130.1255,1694.810106,1723.8507,1634.4332


In [None]:
# Call the function to load the dataset into BigQuery
load_data_to_bigquery(real_estate_loan_df, project_id, dataset_id, table_name1)
load_data_to_bigquery(real_estate_loan_by_years_df, project_id, dataset_id, table_name2)

100%|██████████| 1/1 [00:00<00:00, 5629.94it/s]


Downloading: 100%|[32m██████████[0m|
First few rows of loaded data from ETLproject.real_estate_loan:
         Date  Loan_Amount_in_Billions  \
0  2015-08-12                1714.8060   
1  2015-06-10                1681.9342   
2  2017-01-18                1981.0945   
3  2020-01-01                2323.4216   
4  2019-04-24                2215.8421   

   Loan_Amount_Change_from_Previous_Week_in_Billions  \
0                                             3.2093   
1                                             1.9234   
2                                             4.0927   
3                                             5.5228   
4                                            -0.0245   

   Loan_Amount_Change_from_Previous_Year_in_Billions  Weekly_Percent_Change  \
0                                           144.5170                0.18750   
1                                           129.4670                0.11449   
2                                           184.0837                0.

100%|██████████| 1/1 [00:00<00:00, 1640.32it/s]


Downloading: 100%|[32m██████████[0m|
First few rows of loaded data from ETLproject.real_estate_loan_by_years:
   Year  Total_Loan_Amount_in_billions  Average_Loan_Amount_in_billions  \
0  2005                     37701.1084                      1216.164787   
1  2006                     71447.6760                      1373.993769   
2  2007                     79275.0592                      1524.520369   
3  2008                     88124.7487                      1662.731108   
4  2009                     88130.1255                      1694.810106   

   Max_Loan_Amount_in_billions  Min_Loan_Amount_in_billions  
0                    1275.6285                    1158.3338  
1                    1460.7168                    1278.7950  
2                    1592.7756                    1466.0321  
3                    1731.4749                    1592.0548  
4                    1723.8507                    1634.4332  
 



---


The dataset has been successfully loaded into Google BigQuery. To delve deeper into analysis and exploration, SQL queries are available within Google BigQuery, and Looker Studio can be used for visualization to extract meaningful insights from the dataset.