# Home Credit - Preprocessing


# Denormalize tables using joins
This program is part of step-2 as described in the above diagram.
It joins multiple table into a single table with case_id as the primary key

In [None]:
pip install --upgrade google-cloud-bigquery

In [1]:
import numpy as np
import pandas as pd
import csv
import os
from collections import defaultdict
from google.cloud import bigquery

In [2]:
gc_project_id      = 'kagglehomecredit'
bq_dataset_source  = 'homecredit_stage_2'
bq_dataset_sink    = 'homecredit_stage_3'
bq_table_sink      = 'homecredit'

table_list = [
 'train_base'
,'train_static_0'
,'train_static_cb_0'
,'train_applprev_1'
,'train_credit_bureau_a_1'
,'train_credit_bureau_b_1'
,'train_debitcard_1'
,'train_deposit_1'
,'train_other_1'
,'train_person_1'
,'train_tax_registry_a_1'
,'train_tax_registry_b_1'
,'train_tax_registry_c_1'
]


In [3]:
from_clause = f""" 
from kagglehomecredit.homecredit_stage_1.train_base   t_b
left join  {gc_project_id}.{bq_dataset_source}.train_static_0          t_s0
on t_b.case_id = t_s0.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_static_cb_0       t_scb0
on t_b.case_id = t_scb0.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_applprev_1        t_ap1 
on t_b.case_id = t_ap1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_credit_bureau_a_1 t_cba1
on t_b.case_id = t_cba1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_credit_bureau_b_1 t_cbb1
on t_b.case_id = t_cbb1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_debitcard_1       t_dc1
on t_b.case_id = t_dc1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_deposit_1         t_dp1
on t_b.case_id = t_dp1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_other_1           t_ot1
on t_b.case_id = t_ot1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_person_1          t_pr1
on t_b.case_id = t_pr1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_tax_registry_a_1  t_txa1
on t_b.case_id = t_txa1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_tax_registry_b_1  t_txb1
on t_b.case_id = t_txb1.case_id

left join  {gc_project_id}.{bq_dataset_source}.train_tax_registry_c_1  t_txc1
on t_b.case_id = t_txc1.case_id

"""

In [15]:
def create_bq_client(gc_project_id):
    bq_client = bigquery.Client(project=gc_project_id)
    return bq_client
    
    
def create_bq_dataset(bq_client, bq_dataset):
    bq_client.create_dataset(bq_dataset, exists_ok=True)
    return


def get_table_column(gc_project_id, bq_dataset):
    table_column = defaultdict(list)
    
    sql_query = f"""SELECT * FROM `{gc_project_id}.{bq_dataset_source}.INFORMATION_SCHEMA.COLUMNS`; """
    results   = bq_client.query_and_wait(sql_query)
    
    for result in results:
        table_column[result["table_name"]].append(result["column_name"])
    return table_column


def get_caseid_bucket_minmax():
    caseid_bucket_minmax = defaultdict(list)
    sql_query=f"""
                with histogram_data as
                (
                  select case_id,
                  ntile(4000) over (order by case_id) as bucket,
                  count(*) as counts
                  from `kagglehomecredit.homecredit_stage_2.train_base`
                  group by case_id
                )
                select 
                bucket,
                min(case_id) as min_case_id,
                max(case_id) as max_case_id,
                count(*) as counts
                from histogram_data
                group by bucket
                order by bucket
                """
    results   = bq_client.query_and_wait(sql_query)
    for result in results:
        caseid_bucket_minmax[result["bucket"]].append(result["min_case_id"])
        caseid_bucket_minmax[result["bucket"]].append(result["max_case_id"])
    
    return caseid_bucket_minmax


def create_select_clause(selected_table):
    select_clause = f""" select t_b.case_id """
    select_clause = select_clause + ", " + selected_table['train_base']
    select_clause = select_clause + ", " + selected_table['train_static_0']
    select_clause = select_clause + ", " + selected_table['train_static_cb_0']
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_ap1.num_group1  as num_group1, "  + selected_table['train_applprev_1']        + " )) as ap1" 
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_pr1.num_group1  as num_group1, "  + selected_table['train_person_1']          + " )) as pr1"
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_cba1.num_group1 as num_group1, "  + selected_table['train_credit_bureau_a_1'] + " )) as cba1"
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_cbb1.num_group1 as num_group1, "  + selected_table['train_credit_bureau_b_1'] + " )) as cbb1"
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_dc1.num_group1  as num_group1, "  + selected_table['train_debitcard_1']       +  ")) as dc1 "
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_dp1.num_group1  as num_group1, "  + selected_table['train_deposit_1']         +  ")) as dp1 "
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_ot1.num_group1  as num_group1, "  + selected_table['train_other_1']           +  ")) as ot1 "
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_txa1.num_group1 as num_group1, "  + selected_table['train_tax_registry_a_1']  +  ")) as txa1 "
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_txb1.num_group1 as num_group1, "  + selected_table['train_tax_registry_b_1']  +  ")) as txb1 "
    select_clause = select_clause + ", " + "ARRAY_AGG(STRUCT( t_txc1.num_group1 as num_group1, "  + selected_table['train_tax_registry_c_1']  +  ")) as txc1 "   
    
    return select_clause


def create_groupby_clause(selected_table):
    groupby_clause = f""" group by t_b.case_id """
    groupby_clause = groupby_clause + ", " + selected_table['train_base']
    groupby_clause = groupby_clause + ", " + selected_table['train_static_0']
    groupby_clause = groupby_clause + ", " + selected_table['train_static_cb_0']
    
    return groupby_clause


def execute_sql(sql_query):
    
#     print("Query execution Started: ")
    bq_client.query_and_wait(sql_query)
#     print("Query execution Completed")  
    
    return

In [5]:
bq_client = create_bq_client(gc_project_id)
create_bq_dataset(bq_client, bq_dataset_sink)

In [6]:
table_column = get_table_column(gc_project_id,bq_dataset_source)

In [7]:
selected_table   = defaultdict(list)
selected_columns = []

for table_name, table_columns in table_column.items():
    
    selected_columns = [s for s in table_columns if s not in ['case_id', 'num_group1', 'num_group2']]
    delimiter = ", "
    selected_columns_string = delimiter.join(selected_columns)
    selected_table[table_name] = selected_columns_string

In [8]:
select_clause    = create_select_clause(selected_table)
groupby_clause   = create_groupby_clause(selected_table) 

In [16]:
caseid_bucket_minmax=get_caseid_bucket_minmax()

for bucket, minmax in caseid_bucket_minmax.items():
    
    print("Executing Bucket--> ", bucket)
    create_clause    = f"create or replace table {gc_project_id}.{bq_dataset_sink}.{bq_table_sink}_{bucket} as " 
    where_clause = f""" where t_b.case_id >= {minmax[0]} and t_b.case_id <={minmax[1]}"""
    sql_query    = create_clause + select_clause + from_clause + where_clause + groupby_clause
#     print(sql_query)
    
    execute_sql(sql_query)
#     break

Executing Bucket-->  1
Executing Bucket-->  2
Executing Bucket-->  3
Executing Bucket-->  4
Executing Bucket-->  5
Executing Bucket-->  6
Executing Bucket-->  7
Executing Bucket-->  8
Executing Bucket-->  9
Executing Bucket-->  10
Executing Bucket-->  11
Executing Bucket-->  12
Executing Bucket-->  13
Executing Bucket-->  14
Executing Bucket-->  15
Executing Bucket-->  16
Executing Bucket-->  17
Executing Bucket-->  18
Executing Bucket-->  19
Executing Bucket-->  20
Executing Bucket-->  21
Executing Bucket-->  22
Executing Bucket-->  23
Executing Bucket-->  24
Executing Bucket-->  25
Executing Bucket-->  26
Executing Bucket-->  27
Executing Bucket-->  28
Executing Bucket-->  29
Executing Bucket-->  30
Executing Bucket-->  31
Executing Bucket-->  32
Executing Bucket-->  33
Executing Bucket-->  34
Executing Bucket-->  35
Executing Bucket-->  36
Executing Bucket-->  37
Executing Bucket-->  38
Executing Bucket-->  39
Executing Bucket-->  40
Executing Bucket-->  41
Executing Bucket-->  42
E

BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/kagglehomecredit/queries?prettyPrint=false: Cannot query rows larger than 100MB limit.