# Intake-Postgres Plugin: Joins Demo

The following notebook demonstrates "join" functionality using the _Intake-Postgres_ plugin. Its purpose is to showcase a variety of scenarios in which an _Intake_ user may want to query their PostgreSQL-based relational datasets.

Joins are to be executed within the following scenarios:

- One database, two tables
- Two databases, several tables


## Setup
1. Download the PostgreSQL/PostGIS Docker images. With [Docker installed](https://www.docker.com/community-edition), execute:
    ```
    for db_inst in $(seq 0 4); do
        docker run -d -p $(expr 5432 + $db_inst):5432 --name intake-postgres-$db_inst mdillon/postgis:9.6-alpine;
    done
    ```
    All subsequent `docker run` commands will start containers from this image.

1. In the same conda environment as this notebook, install `pandas`, `sqlalchemy`, `psycopg2`, `shapely`, and (optionally) `postgresql`:
    ```
    conda install pandas sqlalchemy psycopg2 shapely postgresql
    ```
    The `postgresql` package is only for the command-line client library, so that we can verify that results were written to the database (externally from our programs).

1. Finally, install the _intake-postgres_ plugin:
    ```
    conda install -c intake intake-postgres
    ```


## Loading the data

Because _Intake_ only supports reading the data, we need to insert the data into our databases by another means. The general approach below relies on partitioning a pre-downloaded CSV file and inserting its partitions into each table. This can be thought of as a rudimentary form of application-level "sharding".

The code (below) begins by importing the necessary modules:

In [136]:
from __future__ import print_function, absolute_import

## For downloading the test data
import os
import requests
import urllib
import zipfile

## For inserting test data
import pandas as pd
from sqlalchemy import create_engine

## For using Intake
from intake.catalog import Catalog

## Global variables
N_PARTITIONS = 5

Here we download the data, if it doesn't already exist:

In [137]:
%%time

# Define download sources and destinations.
# For secure extraction, 'fpath' must be how the zip archive refers to the file.
loan_data = {'url': 'https://resources.lendingclub.com/LoanStats3a.csv.zip',
             'fpath': 'LoanStats3a.csv',
             'table': 'loan_stats',
             'date_col': 'issue_d',
             'normalize': ['term', 'home_ownership', 'verification_status', 'loan_status', 'addr_state', 'application_type', 'disbursement_method']}
decl_loan_data = {'url': 'https://resources.lendingclub.com/RejectStatsA.csv.zip',
                  'fpath': 'RejectStatsA.csv',
                  'table': 'reject_stats',
                  'date_col': 'Application Date',
                  'normalize': ['State']}

# Do the data downloading and extraction
for data in [loan_data, decl_loan_data]:
    url, fpath = data['url'], data['fpath']
    
    if os.path.isfile(fpath):
        print('{!r} already exists: skipping download.\n'.format(fpath))
        continue

    try:
        dl_fpath = os.path.basename(urllib.parse.urlsplit(url).path)
        print('Downloading data from {!r}...'.format(url))
        response = requests.get(url)
    except:
        raise ValueError('Download error. Check internet connection and URL.')

    try:
        with open(dl_fpath, 'wb') as fp:
            print('Writing data...'.format(dl_fpath))
            fp.write(response.content)

        try:
            print('Extracting data...')
            with zipfile.ZipFile(dl_fpath, 'r') as zip_ref:
                zip_ref.extract(fpath)
            if os.path.isfile(dl_fpath) and dl_fpath.endswith('.zip'):
                os.remove(dl_fpath)
        except:
            raise ValueError('File extraction error. Is the downloaded file a zip archive?')
    except:
        raise ValueError('File write error. Check destination file path and permissions')

    print('Success: {!r}\n'.format(fpath))

'LoanStats3a.csv' already exists: skipping download.

'RejectStatsA.csv' already exists: skipping download.

CPU times: user 450 µs, sys: 861 µs, total: 1.31 ms
Wall time: 1.29 ms


Next, we partition the data into `N_PARTITIONS` groups, and persist each partition into a separate database instance. Although there are many ways we can choose to partition the dataset, here we partition by the date the loans were issued (or if they were rejected, the date when they were applied for):

In [174]:
%time
for data in [loan_data, decl_loan_data]:
    fpath, date_col, table = data['fpath'], data['date_col'], data['table']
    norm_cols = data['normalize']
    pcol = '_' + date_col  # Used for partitioning the data
    
    df = pd.read_csv(fpath, skiprows=1)
    print('# {}: {}'.format(table, len(df)))
    print('# {} valued at N/A: {}'.format(table, len(df[df[date_col].isna()])))
    df.dropna(axis=0, subset=[date_col], inplace=True)
    
    df[pcol] = pd.to_datetime(df[date_col]) # , format='%b-%Y')

    # Cast strs with '%' into floats, so we can do analysis more easily
    if 'int_rate' in df.columns:
        df['int_rate'] = df['int_rate'].str.rstrip('%').astype(float)

    df.sort_values(pcol, inplace=True)
    grouped = df.groupby(pd.qcut(df[pcol],
                                 N_PARTITIONS,
                                 labels=list(range(N_PARTITIONS))))
    
    # Normalize what we can, store into first db instance
    engine = create_engine('postgresql://postgres@localhost:{}/postgres'.format(5432))
    for norm_col in norm_cols:
        norm_col_cats = df[norm_col].astype('category')
        norm_df = pd.DataFrame({'id': pd.np.arange(len(norm_col_cats.cat.categories)),
                                norm_col: norm_col_cats.cat.categories.values})
        df.loc[:, norm_col] = norm_col_cats.cat.codes
        print('Persisting normalized column, {!r}...'.format(norm_col+'_codes'))
        norm_df.to_sql(norm_col+'_codes', engine, if_exists='replace')
    
    for group_id, group_df in grouped:
        print('\n###', group_id)
        start = group_df[pcol].min().strftime('%b-%Y')
        end = group_df[pcol].max().strftime('%b-%Y')

        # Save each partition to a different database
        print('Persisting {} {} from {} to {}...'.format(len(group_df), table, start, end))
        engine = create_engine('postgresql://postgres@localhost:{}/postgres'.format(5432+group_id))
        try:
            group_df.drop(columns=pcol).to_sql(table, engine, if_exists='fail') #'replace')
        except ValueError:
            pass  # Table already exists, so do nothing.
        
    print()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.25 µs


  interactivity=interactivity, compiler=compiler, result=result)


# loan_stats: 42538
# loan_stats valued at N/A: 3
Persisting normalized column, 'term_codes'...
Persisting normalized column, 'home_ownership_codes'...
Persisting normalized column, 'verification_status_codes'...
Persisting normalized column, 'loan_status_codes'...
Persisting normalized column, 'addr_state_codes'...
Persisting normalized column, 'application_type_codes'...
Persisting normalized column, 'disbursement_method_codes'...

### 0
Persisting 8939 loan_stats from Jun-2007 to Jan-2010...

### 1
Persisting 8084 loan_stats from Feb-2010 to Sep-2010...

### 2
Persisting 9480 loan_stats from Oct-2010 to Apr-2011...

### 3
Persisting 9415 loan_stats from May-2011 to Sep-2011...

### 4
Persisting 6617 loan_stats from Oct-2011 to Dec-2011...

# reject_stats: 755491
# reject_stats valued at N/A: 0
Persisting normalized column, 'State_codes'...

### 0
Persisting 151450 reject_stats from May-2007 to Aug-2010...

### 1
Persisting 151108 reject_stats from Aug-2010 to Jul-2011...

### 2
Pers

Verify the data was written, by connecting to the databases directly with the `psql` command-line tool:

In [175]:
# Save each query from the `psql` command as HTML
!for db_inst in $(seq 0 4); do \
    psql -h localhost -p $(expr 5432 + $db_inst) -U postgres -q -H \
        -c 'select loan_amnt, term, int_rate, issue_d from loan_stats limit 5;' \
      > db${db_inst}.html; \
done

# Display the HTML files
from IPython.display import display, HTML
for db_inst in range(N_PARTITIONS):
    display(HTML('db{}.html'.format(db_inst)))

loan_amnt,term,int_rate,issue_d
5000,0,7.75%,Jun-2007
7500,0,13.75%,Jun-2007
6000,0,10.59%,Jun-2007
4400,0,9.64%,Jun-2007
1200,0,9.01%,Jun-2007


loan_amnt,term,int_rate,issue_d
4200,0,11.36%,Feb-2010
10000,0,10.25%,Feb-2010
5000,0,10.99%,Feb-2010
11600,0,13.48%,Feb-2010
16000,0,14.22%,Feb-2010


loan_amnt,term,int_rate,issue_d
15000,1,18.17%,Oct-2010
3500,0,9.62%,Oct-2010
3000,0,5.79%,Oct-2010
4000,0,13.35%,Oct-2010
12000,0,6.91%,Oct-2010


loan_amnt,term,int_rate,issue_d
11000,0,8.49%,May-2011
22000,1,13.49%,May-2011
9000,1,13.99%,May-2011
4100,0,11.49%,May-2011
3750,0,5.42%,May-2011


loan_amnt,term,int_rate,issue_d
3950,0,10.65%,Oct-2011
4000,0,12.69%,Oct-2011
11000,0,6.62%,Oct-2011
10200,0,12.42%,Oct-2011
10400,1,19.03%,Oct-2011


## Reading the data (with Intake-Postgres)

Write out a __joins\_catalog.yml__ file with the appropriate schema:

In [176]:
%%writefile joins_catalog.yml
plugins:
  source:
    - module: intake_postgres

sources:
  # Normalized columns
  - name: term_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, term from term_codes'

  - name: home_ownership_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, home_ownership from home_ownership_codes'

  - name: verification_status_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, verification_status from verification_status_codes'

  - name: loan_status_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, loan_status from loan_status_codes'

  - name: addr_state_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, addr_state from addr_state_codes'

  - name: application_type_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, application_type from application_type_codes'

  - name: disbursement_method_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, disbursement_method from disbursement_method_codes'
        
  - name: State_codes
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select id, "State" from "State_codes"'


  # loan_stats data
  - name: loans_1
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select issue_d, term, application_type, disbursement_method, home_ownership, verification_status, loan_status, loan_amnt, int_rate from loan_stats'

  - name: loans_5
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5436/postgres'
      sql_expr: 'select issue_d, term, application_type, disbursement_method, home_ownership, verification_status, loan_status, loan_amnt, int_rate from loan_stats'
        

  # reject_stats data
  - name: rejects_1
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: 'select "Application Date", "State", "Amount Requested" from reject_stats'

  - name: rejects_5
    driver: postgres
    args:
      uri: 'postgresql://postgres@localhost:5436/postgres'
      sql_expr: 'select "Application Date", "State", "Amount Requested" from reject_stats'


  # Joins
  - name: join_db_1_to_1
    driver: postgres
    parameters:
        - name: interest_lowbound
          description: "Lower-bound for interest rate in query"
          type: float
          default: 0.0
          min: 0.0
    args:
      uri: 'postgresql://postgres@localhost:5432/postgres'
      sql_expr: !template "
        select issue_d,
               term_codes.term,
               application_type_codes.application_type,
               disbursement_method_codes.disbursement_method,
               home_ownership_codes.home_ownership,
               verification_status_codes.verification_status,
               loan_status_codes.loan_status,
               loan_amnt,
               int_rate
        from loan_stats
        inner join term_codes on loan_stats.term = term_codes.id
        inner join application_type_codes on loan_stats.application_type = application_type_codes.id
        inner join disbursement_method_codes on loan_stats.disbursement_method = disbursement_method_codes.id
        inner join home_ownership_codes on loan_stats.home_ownership = home_ownership_codes.id
        inner join verification_status_codes on loan_stats.verification_status = verification_status_codes.id
        inner join loan_status_codes on loan_stats.loan_status = loan_status_codes.id
        where int_rate > {{ interest_lowbound }}"

Overwriting joins_catalog.yml


Access the catalog with Intake:

In [201]:
%time
catalog = Catalog('joins_catalog.yml')
catalog

CPU times: user 1e+03 ns, sys: 1 µs, total: 2 µs
Wall time: 5.01 µs


<intake.catalog.base.Catalog at 0x11a150c88>

Inspect the metadata about the first source (optional):

In [191]:
catalog.loans_1.discover()

{'datashape': None,
 'dtype': [('issue_d', dtype('O')),
  ('term', dtype('int64')),
  ('application_type', dtype('int64')),
  ('disbursement_method', dtype('int64')),
  ('home_ownership', dtype('int64')),
  ('verification_status', dtype('int64')),
  ('loan_status', dtype('int64')),
  ('loan_amnt', dtype('float64')),
  ('int_rate', dtype('float64'))],
 'metadata': {},
 'npartitions': 1,
 'shape': (None, 9)}

In [192]:
catalog.application_type_codes.discover()

{'datashape': None,
 'dtype': [('id', dtype('int64')), ('application_type', dtype('O'))],
 'metadata': {},
 'npartitions': 1,
 'shape': (None, 2)}

In [193]:
catalog.join_db_1_to_1.discover()

{'datashape': None,
 'dtype': [('issue_d', dtype('O')),
  ('term', dtype('O')),
  ('application_type', dtype('O')),
  ('disbursement_method', dtype('O')),
  ('home_ownership', dtype('O')),
  ('verification_status', dtype('O')),
  ('loan_status', dtype('O')),
  ('loan_amnt', dtype('float64')),
  ('int_rate', dtype('float64'))],
 'metadata': {},
 'npartitions': 1,
 'shape': (None, 9)}

Read the data from the sources:

In [202]:
%%time
catalog.loans_1.read().tail()

CPU times: user 10.9 ms, sys: 3.8 ms, total: 14.6 ms
Wall time: 44.4 ms


Unnamed: 0,issue_d,term,application_type,disbursement_method,home_ownership,verification_status,loan_status,loan_amnt,int_rate
8934,Jan-2010,0,0,0,0,0,2,14400.0,16.7
8935,Jan-2010,0,0,0,4,0,2,4000.0,18.78
8936,Jan-2010,0,0,0,0,0,2,18000.0,16.7
8937,Jan-2010,0,0,0,4,0,3,3500.0,13.57
8938,Jan-2010,0,0,0,4,2,3,10000.0,8.94


In [212]:
%%time
catalog.loans_5.read().tail()

CPU times: user 1.13 ms, sys: 232 µs, total: 1.36 ms
Wall time: 1.13 ms


Unnamed: 0,issue_d,term,application_type,disbursement_method,home_ownership,verification_status,loan_status,loan_amnt,int_rate
6612,Dec-2011,0,0,0,4,1,3,12000.0,11.71
6613,Dec-2011,1,0,0,4,2,0,26000.0,12.69
6614,Dec-2011,1,0,0,0,2,3,31300.0,20.3
6615,Dec-2011,0,0,0,3,1,3,5500.0,7.9
6616,Dec-2011,0,0,0,0,1,3,6000.0,12.69


## _JOIN_ with one database

Here is our **JOIN**, with default parameters (`interest_lowbound == 0.0`):

In [207]:
%%time
catalog.join_db_1_to_1.read().tail()

CPU times: user 14.6 ms, sys: 4.67 ms, total: 19.3 ms
Wall time: 57.7 ms


Unnamed: 0,issue_d,term,application_type,disbursement_method,home_ownership,verification_status,loan_status,loan_amnt,int_rate
6899,Jan-2010,36 months,Individual,Cash,OWN,Not Verified,Does not meet the credit policy. Status:Fully ...,6500.0,14.96
6900,Jan-2010,36 months,Individual,Cash,MORTGAGE,Not Verified,Does not meet the credit policy. Status:Fully ...,14400.0,16.7
6901,Jan-2010,36 months,Individual,Cash,RENT,Not Verified,Does not meet the credit policy. Status:Fully ...,4000.0,18.78
6902,Jan-2010,36 months,Individual,Cash,MORTGAGE,Not Verified,Does not meet the credit policy. Status:Fully ...,18000.0,16.7
6903,Jan-2010,36 months,Individual,Cash,RENT,Not Verified,Fully Paid,3500.0,13.57


Next, with our own parameter value(s):

In [209]:
%%time
catalog.join_db_1_to_1(interest_lowbound=15.0).read().tail()

CPU times: user 5.49 ms, sys: 1.64 ms, total: 7.12 ms
Wall time: 25.9 ms


Unnamed: 0,issue_d,term,application_type,disbursement_method,home_ownership,verification_status,loan_status,loan_amnt,int_rate
1328,Jan-2010,36 months,Individual,Cash,RENT,Not Verified,Does not meet the credit policy. Status:Charge...,2000.0,15.65
1329,Jan-2010,36 months,Individual,Cash,MORTGAGE,Not Verified,Fully Paid,25000.0,18.09
1330,Jan-2010,36 months,Individual,Cash,MORTGAGE,Not Verified,Does not meet the credit policy. Status:Fully ...,14400.0,16.7
1331,Jan-2010,36 months,Individual,Cash,RENT,Not Verified,Does not meet the credit policy. Status:Fully ...,4000.0,18.78
1332,Jan-2010,36 months,Individual,Cash,MORTGAGE,Not Verified,Does not meet the credit policy. Status:Fully ...,18000.0,16.7


## _JOIN_ with two databases

For a **JOIN** between tables of two separate databases, we first connect to the tables we are interested in. Then we **JOIN** (aka `.merge()`) them together afterward:

In [197]:
%%time
loans_5_df = catalog.loans_5.read()
term_df = catalog.term_codes.read()
application_type_df = catalog.application_type_codes.read()
disbursement_method_df = catalog.disbursement_method_codes.read()
home_ownership_df = catalog.home_ownership_codes.read()
verification_status_df = catalog.verification_status_codes.read()
loan_status_df = catalog.loan_status_codes.read()

CPU times: user 14.7 ms, sys: 5.42 ms, total: 20.1 ms
Wall time: 65.8 ms


In [198]:
term_df

Unnamed: 0,id,term
0,0,36 months
1,1,60 months


In [199]:
loans_5_df.tail()

Unnamed: 0,issue_d,term,application_type,disbursement_method,home_ownership,verification_status,loan_status,loan_amnt,int_rate
6612,Dec-2011,0,0,0,4,1,3,12000.0,11.71
6613,Dec-2011,1,0,0,4,2,0,26000.0,12.69
6614,Dec-2011,1,0,0,0,2,3,31300.0,20.3
6615,Dec-2011,0,0,0,3,1,3,5500.0,7.9
6616,Dec-2011,0,0,0,0,1,3,6000.0,12.69


In [200]:
for col, lookup_df in [('term', term_df),
               ('application_type', application_type_df),
               ('disbursement_method', disbursement_method_df),
               ('home_ownership', home_ownership_df),
               ('verification_status', verification_status_df),
               ('loan_status', loan_status_df)]:
    loans_5_df = pd.merge(loans_5_df, lookup_df,
                          how='left', on=None,
                          left_on=col, right_on='id',
                          suffixes=['_', ''])
    loans_5_df.drop(columns=col+'_', inplace=True)
    if 'id_' in loans_5_df.columns:
        loans_5_df.drop(columns='id_', inplace=True)
    if 'id' in loans_5_df.columns:
        loans_5_df.drop(columns='id', inplace=True)
loans_5_df.tail()

Unnamed: 0,issue_d,loan_amnt,int_rate,term,application_type,disbursement_method,home_ownership,verification_status,loan_status
6612,Dec-2011,12000.0,11.71,36 months,Individual,Cash,RENT,Source Verified,Fully Paid
6613,Dec-2011,26000.0,12.69,60 months,Individual,Cash,RENT,Verified,Charged Off
6614,Dec-2011,31300.0,20.3,60 months,Individual,Cash,MORTGAGE,Verified,Fully Paid
6615,Dec-2011,5500.0,7.9,36 months,Individual,Cash,OWN,Source Verified,Fully Paid
6616,Dec-2011,6000.0,12.69,36 months,Individual,Cash,MORTGAGE,Source Verified,Fully Paid
