# BigQuery Functions

Creating functions to be used in querying and transforming BigQuery tables

Reading in the required modules

In [None]:
from google.cloud import bigquery
import re

In [None]:
!pip install datalab --upgrade
#
%load_ext google.datalab.kernel
#
from google.datalab import bigquery
import google.datalab.bigquery as bq
#
set_datalab_project_id('project_id') # enter your project id here
#
%reload_ext google.cloud.bigquery

In [None]:
import google.datalab as dl
import google.datalab.bigquery as bq
import google.datalab.storage as storage

#Import python pandas for data wrangling
import pandas as pd

## Pivot Table Function

 Creating the function to read and create a pivot table query from a table given the inputs

In [None]:
# Enter the database name, table name, index column, columns to pivot and value column (by default 1)
def bq_custom_pivot(db_name, # enter the input database name
                    table_name, # input table name
                    index_cols, # index columns as a list: ['col1', 'col2']
                    col_def, # field to convert to column
                    val_def = 1, # value field to aggregate (By default it will carry out one-hot encoding)
                    val_agg = 'SUM'): # Aggregation function
        
        ## Reading in the indexes mentioned
        idx_fin = ""
        for i in range(0, len(index_cols)):
            idx = str(index_cols[i])

            if i == max(range(0, len(index_cols))):
                idx_fin += idx
            else:
                idx_fin += idx + ", "
        
        ## Identifying the columns to generate case when statements
        col_sql = "SELECT DISTINCT " + col_def + " FROM `" + db_name + "." + table_name + "`" + " WHERE " + col_def + " IS NOT NULL"
        col_query = bq.Query(col_sql)
        col_df = col_query.execute(output_options=bq.QueryOutput.dataframe()).result()
        
        ## Generating the case when statements
        case_qry = ""
        sel_qry = ""
        
        for i in range(0,len(col_df.iloc[:,0])):
            
            col_sel = str(col_df.iloc[i, 0])
            col_clean = re.sub('[^A-Za-z0-9]+', '', col_sel)
            
            if type(col_df.iloc[i, 0]) == str:
                case_prt = "(CASE WHEN " + col_def + " = " + "\"" + col_sel + "\"" + " THEN " + val_def + " ELSE 0 END) AS " + "Col_" + col_clean
            else:
                case_prt = "(CASE WHEN " + col_def + " = " + col_sel + " THEN " + val_def + " ELSE 0 END) AS " + "Col_" + col_clean
            
            sel_prt = "Col_" + col_clean

            if i == max(range(0,len(col_df.iloc[:,0]))):
                case_qry += case_prt
                sel_qry += sel_prt

            elif i == max(range(0,len(col_df.iloc[:,0]))) - 1:
                case_qry += case_prt + ", "
                sel_qry += sel_prt +", "
                
                
            else: 
                case_qry += case_prt + ", "
                sel_qry += sel_prt +", "
                
        agg_qry = ""
        for m in range(0, len(col_df.iloc[:, 0])):
            col_sel = str(col_df.iloc[m, 0])
            col_clean = re.sub('[^A-Za-z0-9]+', '', col_sel)
            agg_prt = val_agg + "(Col_" + col_clean + ") AS Col_" + col_clean
            
            if m >= max(range(0,len(col_df.iloc[:,0]))):
                agg_qry += agg_prt
            else:
                agg_qry += agg_prt + ", "
                
        ## Group By number of columns
        grp_qry =""
        for k in range(0, len(index_cols)):
            grp_col = str(k+1)
            if k == max(range(0, len(index_cols))):
                grp_qry += grp_col
            else:
                grp_qry += grp_col + ", "
                

        qry_1 = "WITH Q1 AS (SELECT " + idx_fin + ", " + case_qry + " " + " FROM `" + db_name + "." + table_name + "`)"
        
        qry_2 = " SELECT " + idx_fin + ", " + agg_qry +" FROM Q1 GROUP BY " + grp_qry 
        
        final_qry = qry_1 + qry_2
        
        return final_qry

In [None]:
user_col = "vendor_name"
user_tbl = "R30day_liquor"
user_db = "project_id.mayank"
user_val = "sale_dollars" #by default put 1
user_index = ["city", "county"] #list of index columns'

In [None]:
liq_qry = bq_custom_pivot(user_db, user_tbl, user_index, user_col, user_val)

In [None]:
liq_query = bq.Query(liq_qry)
liquor_job = liq_query.execute(output_options = bq.QueryOutput.table(name='mayank.liqtry', 
                                                                    mode='overwrite'))

## JSON combined column to BigQuery columns Function

Created a function to separate and write a query to read columns from a json string into a SQL query to create a dataset with all the json columns: 

In [None]:
# Enter the database name, table name, json column and any additional columns you may want to select

def jsontocol(db_name, table_name, json_col, additional_cols = None):
    json_sql = "SELECT " + json_col + " FROM `" + db_name + "." + table_name + "`" + " WHERE " + json_col + " IS NOT NULL LIMIT 1" 
    
    json_qry = bq.Query(json_sql)
    json_element = json_qry.execute(output_options = bq.QueryOutput.dataframe()).result()
    json_element = json_element.iloc[0,0]
    
    # identifying and cleaning the json column headers
    json_clean = re.findall("{\"\w*|,\"\w*", json_element)
    
    col_list = [re.sub("[\"\,{]", "", x) for x in json_clean]
    
    json_finalqry = ""
    for i in range(0, len(col_list)):
        
        str_json = ("JSON_EXTRACT_SCALAR(" + json_col + "," +  "'$." + col_list[i] + "') AS " + col_list[i])
        if i == max(range(0, len(col_list))):
            json_finalqry += str_json
        else:
            json_finalqry += str_json + ", "
    
    
    # iterating through the additional columns specified above (by default its none)
    add_cols_fin = ""
    if additional_cols != None:
        for j in range(0, len(additional_cols)):
            add_cols = additional_cols[j]

            if j == max(range(0, len(additional_cols))):
                add_cols_fin += add_cols
            else:
                add_cols_fin += add_cols + ", "
        
    final_query = "SELECT " + add_cols_fin + ", " + json_finalqry + " FROM `" + db_name + "." + table_name + "`"
        
    return final_query

In [None]:
user_qry = jsontocol("database_name", # database name
                     "table_name", # table name
                     "json_dimensions", # column with the json string
                     ["col1","col2"]) # other columns you want to select

In [None]:
json_write_qry = bq.Query(user_qry)
json_write_job = json_write_qry.execute(output_options = bq.QueryOutput.table(name='mayank.json_table_name', 
                                                                    mode='overwrite'))