## Bucket Ingest Function

The following is all the script that defines the "ingest_files_from_bucket" function. Run the code cell (shift+enter) to load the script into memory:

In [1]:
from collections import Counter
from FDMBuilder.FDM_helpers import *
from google.cloud import bigquery, storage
import numpy as np
import pandas as pd
import re
from tqdm import tqdm

# PROJECT = "yhcr-prd-phm-bia-core"
# CLIENT = bigquery.Client(project=PROJECT)

def list_files_in_bucket(bucket_name):
    bucket = storage.Client().bucket(bucket_name)
    return [blob.name for blob in bucket.list_blobs()]


def convert_str_camel_to_snake(str):
    output = re.sub('([^A-Z])([A-Z][a-z]+)', r'\1_\2', str)
    output = re.sub('([a-z0-9])([A-Z])', r'\1_\2', output)
    output = re.sub('([A-Z]+)([A-Z])([^_0-9A-Z])', r'\1_\2\3', output)
    output = re.sub('_+', r'_', output)
    return output.lower()


def convert_column_names_camel_to_snake(df):
    new_names_dict = {col: convert_str_camel_to_snake(col) 
                      for col in df.columns}
    return df.rename(new_names_dict, axis=1)


def remove_illegal_chars_from_str(string):
    return re.sub(r'[^a-zA-Z0-9_]', '', string)


def remove_num_chars_from_str_start(string):
    finished = False
    while not finished:
        if string[0].isnumeric():
            string = string[1:] + string[0]
        else:
            finished = True
    return string
            

def sanitise_column_name(col_name):
    output = remove_illegal_chars_from_str(col_name)
    output = remove_num_chars_from_str_start(output)
    return output


def sanitise_column_names(df):
    corrected_names_dict = {col: sanitise_column_name(col)
                            for col in df.columns}
    return df.rename(corrected_names_dict, axis=1)


def find_prefix_suffix(names, prop):
    return_chars = []
    loop_idx = 0
    continue_search = True
    is_prefix = None
    while continue_search:
        char_counts = Counter([name[loop_idx]  
                               for name in names 
                               if len(name) > abs(loop_idx)])
        most_common_char, n_appears = char_counts.most_common(1)[0]
        prop_same = n_appears / char_counts.total()
        if prop_same > prop: 
            if loop_idx < 0:
                return_chars.insert(0, most_common_char)
                is_prefix = False 
            else:
                return_chars.append(most_common_char)
                is_prefix = True 
            if loop_idx < 0:
                loop_idx -= 1
            else:
                loop_idx += 1
        elif loop_idx == 0:
            loop_idx = -1
        else:
            continue_search = False
    return "".join(return_chars), is_prefix


def remove_colname_prefix_suffix(df):
    prefix, is_prefix = find_prefix_suffix(df.columns, prop=0.95)
    if is_prefix is None:
        return df
    elif is_prefix:
        rename_dict = {col: col[len(prefix):] for col in df.columns  
                       if col[:len(prefix)] == prefix}
    else:
        rename_dict = {col: col[:-len(prefix)] for col in df.columns  
                       if col[-len(prefix):] == prefix}
    return df.rename(rename_dict, axis=1)
    
    
def percent_object_cols(df):
    object_cols_count = sum([df[col].dtype == "object" 
                             for col in df.columns])
    object_cols_pct = object_cols_count / df.shape[1] * 100
    return object_cols_pct


def convert_object_cols_to_str(df):
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].astype("str")
            df[col] = df[col].replace(to_replace="nan", value=None)
    return df


def convert_date_cols(df):
    """
    Infer datatype of a pandas column, process only if the column dtype is object. 
    input:   col: a pandas Series representing a df column. 
    """
    for col in df.columns:
        if df[col].dtype == "object":
            try:
                df[col] = pd.to_datetime(df[col])
            except:
                continue
    return df


def ingest_files_from_bucket(bucket_id, file_names, destination_dataset, 
                             separators=None, convert_colnames=False,
                             encoding="utf_8", skiprows=None,
                             skipfooter=0):
    
    for i, filename in enumerate(file_names):
        print(f"Uploading file {i+1} of {len(file_names)}: {filename}")
        hyperparams = []
        hyperparam_options = [separators, encoding, skiprows, skipfooter]
        for option in hyperparam_options:
            if type(option) == dict:
                hyperparams.append(option[filename])
            else: 
                hyperparams.append(option)
                
        sep, encoding, skiprows, skpfooter = hyperparams

        try:
            table_df = pd.read_csv(f"gcs://{bucket_id}/{filename}",  
                                   engine="python",
                                   encoding=encoding,
                                   sep=sep,
                                   skiprows=skiprows,
                                   skipfooter=skipfooter)
        except Exception as e:
            print(f"\tWarning: Skipping {filename} - attempting to read csv resulted in "
                  f"the following error:\n\t  {e}")
            continue
        if table_df.shape[0] == 0:
            print(f"\tWarning: Skipping {filename} - csv is an empty table")
            continue
        elif table_df.shape[1] == 1:
            print(f"\tWarning: Skipping {filename} - the table parsed with only one "
                  "column")
            continue
        elif percent_object_cols(table_df) > 90:
            print(f"\tWarning: Looks like there may be an issue with {filename}:\n"
                  "> 90% of the columns were parsed as STRING")
            
        table_df = (table_df
                    .pipe(convert_date_cols) 
                    .pipe(sanitise_column_names)
                    .pipe(convert_object_cols_to_str))
        
        if convert_colnames:
            table_df = (table_df
                        .pipe(convert_column_names_camel_to_snake)
                        .pipe(remove_colname_prefix_suffix))
            
        if "." in filename:
            name = remove_illegal_chars_from_str(filename.split(".")[0])
        else:
            name = remove_illegal_chars_from_str(filename)
        table_df.to_gbq(f"{PROJECT}.{destination_dataset}.{name}",
                  project_id=PROJECT,
                  if_exists="fail",
                  progress_bar=False)

In [2]:
%%bigquery
SELECT * FROM `yhcr-prd-phm-bia-core.CY_FDM_ChildrensSocialCare.CPP` LIMIT 1000


ERROR:
 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/yhcr-prd-phm-bia-core/jobs?prettyPrint=false: Access Denied: Project yhcr-prd-phm-bia-core: User does not have bigquery.jobs.create permission in project yhcr-prd-phm-bia-core.

Location: None
Job ID: 1005e5f6-658a-4796-b2f1-1f2eaac2efcd



## Usage:

The "ingest_files_from_bucket" function takes the following arguments:

* **bucket_id**: a string that names the bucket where the files are currently stored - this can include a directory inside a bucket if needed i.e. "bucket-name/directory-name"
* **file_names**: a list of file names within the bucket/directory that are to be uploaded. Lists are defined by square brackets i.e. file_names = ["file1.csv", "file2.csv", "file3.csv"]
* **destination_dataset**: a string naming the dataset id where the files are to be ingested
* **convert_colnames**: True/False (default=False) The function includes an optional feature that converts column names *FromCamelCase* *to_snake_case* 

\*note: all column names will have non-alphanumeric characters removed, and any column names that begin with numbers will have the numbers moved from the start to the end of the name. This ensures the column names agree with the accepted format in bigquery, otherwise bigquery would throw an error.
    
* **separators**: the separator used in the file - i.e. a standard csv would have separator="," **Note:** A PIPE IS A SPECIAL CHARACTER AND MUST BE ESCAPED "\\|". Likewise, tab is a special character and is written "\t"
* **skprows**: a list containing the row numbers of any rows in the file to be skipped **Note:** Python is 0 indexed! The first row is zero.
* **encoding**: the file's character encoding - this tends to be either "utf_8" or "utf_16".
* **skipfooter**: an integer number of rows to skip at the end of the file - note, this is not an indexed function like "skiprows" and so can only be used to remove a chunk of rows at the end of a file. Removing files at specific indeces must be done with "skiprows"

\*note: the above three arguments can be specified as individual values that will be used for every file i.e. separators = "," or can be dictionaries with the keys as filenames and the values as the argument i.e separators={"file1.csv":",", "file2.csv": "\\|", ...}

## Disclaimer

**CAREFULLY READ THE CONSOLE OUTPUT WHEN THE FUNCTION IS RUNNING**

Given the range of potential file formats, and even the subset of those I've already seen, it simply isn't possible to automatically account for every possible situation. The function is designed to simply read any errors to the console and move on to the next file if it can't upload for any reason. It's also possible that odd edge cases will throw a python error and stop execution all-together.

Really, the only viable solution to having a simple "fire and forget" upload function is to ensure that all the upload files are in the same format **before** being uploaded.

The following cell contains an example usage of the ingest function as a reference:

In [2]:
bucket_id = "yhcr-prd-phm-bia-core-data-landing-bradford-tmp"
file_names = ["attendance.csv"]
destination_dataset = "CY_CLASS_ACT"
ingest_files_from_bucket(bucket_id=bucket_id, 
                         file_names=file_names, 
                         destination_dataset=destination_dataset,
                         encoding="utf_8",
                         convert_colnames=False)

Uploading file 1 of 1: attendance.csv


In [50]:
table_df.to_gbq(f"{PROJECT}.CY_CLASS_ACT.attendance",  
                project_id=PROJECT,  
                if_exists="fail",  
                progress_bar=False)