In [0]:
%pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
%pip install --upgrade snowflake-connector-python

In [0]:
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import os.path
import io
import requests
import pandas as pd
import pickle

SCOPES = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/spreadsheets']
CREDS_JSON_PTH = '/dbfs/FileStore/finance/client_secret.json'
CREDS_PICKLE_PTH = '/dbfs/FileStore/finance/token.pickle'

def createCreds(CREDS_JSON_PTH=CREDS_JSON_PTH, SCOPES=SCOPES):
    '''
    Returns a usable Google API credentials
    
        Parameters:
            CREDS_JSON_PTH (string): Path to the Google API credentials JSON
            SCOPES (list): List of Google API scopes that needs to be used
        Returns:
            creds (object): A google API credential object need to work with the API 
    '''
    flow = InstalledAppFlow.from_client_secrets_file(CREDS_JSON_PTH, SCOPES)
    creds = flow.run_local_server(port=0)
    with open('token.pickle', 'wb') as token:
        pickle.dump(creds, token)
    return creds


def checkForPickledCreds(creds_pth):
    '''
    Returns a usable Google API credentials if a pickled credential json existed in storage
    
        Parameters:
            creds_pth (string): Path to the pickled Google API credentials JSON
        Returns:
            creds (object): A google API credential object need to work with the API 
    '''
    creds = None
    if os.path.exists(creds_pth):
        with open(creds_pth, 'rb') as token:
            creds = pickle.load(token)
    return creds


def getUsableCreds(creds_pth):
    '''
    Returns a usable Google API credentials if a pickled credential json existed in storage, otherwise, request a token to be created
    
        Parameters:
            creds_pth (string): Path to the pickled Google API credentials JSON
        Returns:
            creds (object): A google API credential object need to work with the API 
    '''
    creds = None
    creds = checkForPickledCreds(creds_pth)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            creds = createCreds()
    return creds


def getFileIds(service, folder_id):
    '''
    Returns a list of all File Ids (the url path) currently stored in a Google Drive folder
    
        Parameters:
            service (object): A Google API object that refers to the API service being used
            folder_id (str): Path of the Google Drive folder being checked
        Returns:
            file_ids (list): List of all file ids that are currently present in the google drive folder
    '''
    folder_query_str = "parents in '{}' and mimeType != 'application/vnd.google-apps.folder'".format(folder_id)
    page_token = None
    file_search_results = service.files().list(q=folder_query_str, spaces='drive', includeItemsFromAllDrives=False).execute()

    file_ids = {}
    for file in file_search_results.get('files', []):
        file_ids[file['name']] = file['id']
    return file_ids


def openFileToDF(file_id, access_token):
    '''
    Load a csv file into Pandas DataFrame 
    
        Parameters:
            csv_id (string): File ID of the CSV file in Google Drive
            access_token (object) : Google API token
        Returns:
            df (Pandas DataFrame): CSV loaded as a Pandas DataFrame
    '''
    url = "https://www.googleapis.com/drive/v3/files/" + file_id + "?alt=media"
    res = requests.get(url, headers={"Authorization": "Bearer " + access_token})
    df = pd.read_csv(io.StringIO(res.text))
    return df


def openOutstandingRequests(FOLDER_ID):
    '''
    Create a list of dictionary which contains data points of all open requests such as file name,
    their associated Google Drive ID and file loaded as a Pandas DataFrame  
    
        Parameters:
            none
        Returns:
            file_dicts (List): Dictionaries of outstanding requests
    '''
    creds = getUsableCreds(CREDS_PICKLE_PTH)
    service = build('drive', 'v3', credentials=creds)

    file_ids_dict = getFileIds(service, FOLDER_ID)
    file_dicts = []

    for name, id in file_ids_dict.items():
        curr_file_dict = {}
        try:
            df = openFileToDF(id, creds.token)

            curr_file_dict['filename'] = name
            curr_file_dict['id'] = id
            curr_file_dict['dataframe'] = df

            file_dicts.append(curr_file_dict)
        except:
            continue
    
    return file_dicts

def openGoogleSheets(sheet_id, sheet_range):
    '''
    Returns a gSheet object
    
        Parameters:
            sheet_id (string): ID of the Google sheet to be opened
            sheet_range (string): range of the Google sheet tab to be read
        Returns:
            values (object): A Google Sheet object 
    '''
    creds = getUsableCreds(CREDS_PICKLE_PTH)
    service = build('sheets', 'v4', credentials=creds)
    
    sheet = service.spreadsheets()
    result = sheet.values().get(spreadsheetId=sheet_id,
                                range=sheet_range).execute()
    values = result.get('values', [])

    if not result and not values:
        print('No data found.')
    
    else:
        return values

def readSheetAsDF(sheet):
    '''
    Returns a Pandas DataFrame read from Google Sheet
    
        Parameters:
            sheet (object): a Google Sheet object
        Returns:
            df (Pandas DataFrame): Google Sheet in a Pandas Data Frame
    '''
    df = pd.DataFrame(sheet[1:], columns=sheet[0])
    return df

def moveFileToNewFolder(new_folder, file_id):
    '''
    Move file from one Google Drive folder to another
    
        Parameters:
            target_folder (str): Path of the Google Drive folder where the file will be moved
            file_id (str): ID of the file to be moved
    '''
    creds = getUsableCreds(CREDS_PICKLE_PTH)
    service = build('drive', 'v3', credentials=creds)

    file = service.files().get(fileId=file_id, fields='parents').execute()
    previous_parents = ",".join(file.get('parents'))
    # Move the file to the new folder
    file = service.files().update(fileId=file_id,
                                        addParents=new_folder,
                                        removeParents=previous_parents,
                                        fields='id, parents').execute()
    return True

In [0]:
%python

sf_id = dbutils.secrets.get(scope = 'snowflake', key = 'id')
sf_pw = dbutils.secrets.get(scope = 'snowflake', key = 'pw')


In [0]:
import snowflake.connector
from snowflake.connector import pandas_tools
import numpy as np
import datetime

def connect_to_snowflake():
    ctx = snowflake.connector.connect(
        user=sf_id,
        password=sf_pw,
        account='deliverr.us-east-1',
        database='prod',
        schema='FINANCE')
    return ctx

def create_fc_rate_df(df):
    '''
    Extracts and transforms input data, returns a transformed Pandas DataFrame that match the target data table structure
    
        Parameter:
            df (Pandas DataFrame): Input data in a DataFrame
        Returns:
            df_to_copy (Pandas DataFrame): Transformed DataFrame that can be copied to the target data table
    '''
    column_header = ['payee_id', 'location_id', 'account_number', 'account_name', 'unit_price_base_tier1'
                     ,'volume_cap_base_tier1', 'unit_price_tier2', 'volume_cap_tier2', 'unit_price_tier3', 'volume_cap_tier3', 'min_charge', 'updated_at', 'signoff']
    if df.columns.to_list() != column_header:
        missing_column = [x for x in column_header if x not in df.columns.to_list()]
        print(f'Check column headers, missing columns: {missing_column}')
        return None
    else:
        print(f'Headers matched')
        df_to_copy = pd.DataFrame(columns=['PAYEE_ID', 'LOCATION_ID', 'ACCOUNT_NUMBER', 'ACCOUNT_NAME', 'UNIT_PRICE_BASE_TIER1'
                     ,'VOLUME_CAP_BASE_TIER1', 'UNIT_PRICE_TIER2', 'VOLUME_CAP_TIER2', 'UNIT_PRICE_TIER3', 'VOLUME_CAP_TIER3', 'MIN_CHARGE', 'UPDATED_AT', "SIGNOFF"]) # check on caps
        df_to_copy['PAYEE_ID'] = df['payee_id'].astype(str)
        df_to_copy['LOCATION_ID'] = df['location_id'].astype(str)
        df_to_copy['ACCOUNT_NUMBER'] = df['account_number'].astype(str)
        df_to_copy['ACCOUNT_NAME'] = df['account_name'].astype(str)
        df_to_copy['SIGNOFF'] = df['signoff'].astype(str)
        df_to_copy['UNIT_PRICE_BASE_TIER1'] = pd.to_numeric(df['unit_price_base_tier1'], errors='coerce')
        df_to_copy['UNIT_PRICE_TIER2'] = pd.to_numeric(df['unit_price_tier2'], errors='coerce')
        df_to_copy['UNIT_PRICE_TIER3'] = pd.to_numeric(df['unit_price_tier3'], errors='coerce')
        df_to_copy['MIN_CHARGE'] = pd.to_numeric(df['min_charge'], errors='coerce')
        df_to_copy['VOLUME_CAP_BASE_TIER1'] = pd.to_numeric(df['volume_cap_base_tier1'], errors='coerce')
        df_to_copy['VOLUME_CAP_TIER2'] = pd.to_numeric(df['volume_cap_tier2'], errors='coerce')
        df_to_copy['VOLUME_CAP_TIER3'] = pd.to_numeric(df['volume_cap_tier3'], errors='coerce')
        df_to_copy['UPDATED_AT'] = pd.to_datetime(df['updated_at'], format='%m/%d/%Y').dt.date
        # df_to_copy.dropna(axis='index', inplace=True) # Drop any line items that does not have a unit rate filled out
        return df_to_copy
      

def write_fc_rate_to_snowflake(df):
    '''
    Clears the existing FC_RATE table, overwrites it with the latest version of the FC_RATE table, then returns the status
    
        Parameters:
            df (Pandas DataFrame): A DataFrame generated by the Transformation function
        Returns:
            success (boolean): True if the DataFrame was copied to Snowflake successfully 
    '''
    ctx = connect_to_snowflake()
    query = '''
    DELETE FROM FINANCE.FC_RATE_NEW;
    '''
    conn = ctx.cursor()
    conn.execute(query)
    success, nchunks, nrows, _ = pandas_tools.write_pandas(conn = ctx, df = df, table_name = 'FC_RATE_NEW', schema = 'FINANCE')
    if success == True:
        print('Copied to Snowflake')
    else:
        print('Copying to Snowflake failed')
    return success


def crawl_through_folders():
    '''
    A For Loop that crawls through a defined set of Google Drive folders, finds any outstanding input data, calls the corresponding ETL functions to process them.
    '''
    print(f"Crawling through Google Drive folders for unprocessed files")
    for folder in folders_to_crawl:
        outstanding_requests = openOutstandingRequests(folder['Unprocessed'])
        if len(outstanding_requests) == 0:
            print(f"No {folder['Type']} file to process")
        else:
            for req_dict in outstanding_requests:
                try: # ensure that one bad input file does not stall the loop
                    file_name = os.path.splitext((req_dict['filename']))[0]
                    print(f"Processing {folder['Type']} file: {file_name}")
                    type_of_request = folder['Type']
                    df_to_process = req_dict['dataframe']
                    processed_df = ET_functions_mapping[type_of_request](df_to_process)
                    Copy_functions_mapping[type_of_request](processed_df)
                    moveFileToNewFolder(folder['Processed'], req_dict['id'])
                except:
                    continue

def crawl_through_google_sheets():
    '''
    A For Loop that crawls through a defined set of Google Sheets, calls the corresponding ETL functions to process them.
    '''
    print(f"Crawling through Google Sheets for ETL")
    for sheet in sheets_to_crawl:
        try: # ensure that one bad input file does not stall the loop
            print(f"Processing: {sheet['Type']}")
            type_of_request = sheet['Type']
            gsheet = openGoogleSheets(sheet['sheet_id'], sheet['range'])
            df_to_process = readSheetAsDF(gsheet)
            processed_df = ET_functions_mapping[type_of_request](df_to_process)
            Copy_functions_mapping[type_of_request](processed_df)
        except:
            continue


# the sheetid is available in the url of the sheet.

sheets_to_crawl = [
    {'Type': 'FC_Rate', 'sheet_id': '1Pev9KIKLl1uDRsboeKfjIDBnCNEPWzGlxfcwr128z-0', 'range': 'Rate Sheet by Location!A1:M1789'}
]
                    
ET_functions_mapping = {
    'FC_Rate': create_fc_rate_df, 
    }

Copy_functions_mapping = {
    'FC_Rate': write_fc_rate_to_snowflake,
    }

In [0]:
# crawl_through_folders()
crawl_through_google_sheets()

In [0]:
'''
Use cell for testing and debugging
'''
cred = getUsableCreds(CREDS_PICKLE_PTH)
sheet = openGoogleSheets( '1Pev9KIKLl1uDRsboeKfjIDBnCNEPWzGlxfcwr128z-0', 'Rate Sheet by Location!A1:M1789')
df = readSheetAsDF(sheet)
#transformed_df = create_fc_rate_df(df)
#transformed_df
#write_fc_rate_to_snowflake(transformed_df)