In [1]:
import pandas as pd
import os
from src.secrets import secrets as sc
from gcloud import storage
from oauth2client.service_account import ServiceAccountCredentials
from pathlib import Path

# Download

In [42]:
# name and unique sheet ID from gdrive
sheetIDs = [["People", "0"], ["Matches", "1626706262"], ["Formations", "900063785"], ["PlayByPlay", "1273503444"], ["MatchEvents", "567816814"], ["TrainingSessions", "960271477"], ["TrainingBreakdown", "1405062779"], ["TrainingEvents", "928411476"]]

for name, number in sheetIDs:
    # unique identifiers stored in secrets file
    url = f"https://docs.google.com/spreadsheets/d/{sc.SHEET_UID}/export?format=csv&gid={number}"
    # read direct from gdrive
    sheet = pd.read_csv(url)
    # output location
    out = f"./data/{name}.csv"
    # export dropping index
    sheet.to_csv(out, index = False)

# Store in google

single region - sydney

standard class

public access prevention on

access control uninform (simplicity)

no protection

encryption: google managed key

In [43]:
# modified code from below as template
# https://stackoverflow.com/questions/37003862/how-to-upload-a-file-to-google-cloud-storage-on-python-3

# use pathlib to set path to data
sheetDirectory = Path("./data/")

# credentials from service account key, stored in secrets
credentials_dict = {
    'type': 'service_account',
    'client_id': sc.BUCKET_CLIENT_ID,
    'client_email': sc.BUCKET_CLIENT_EMAIL,
    'private_key_id': sc.BUCKET_PRIVATE_KEY_ID,
    'private_key': sc.BUCKET_PRIVATE_KEY,
}

# set creds
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
    credentials_dict
)

# start client sessions
client = storage.Client(credentials=credentials, project=sc.BUCKET_PROJECT)

# locate in project bucket
bucket = client.get_bucket(sc.BUCKET_NAME)

# iterate over all files in directory with csv as file type
for path in sheetDirectory.glob("*.csv"):
    # set file (blob) name in bucket
    blob = bucket.blob(path.name)
    # upload file to blob
    blob.upload_from_filename(path)

data/MatchEvents.csv
data/TrainingBreakdown.csv
data/PlayByPlay.csv
data/People.csv
data/TrainingSessions.csv
data/TrainingEvents.csv
data/Formations.csv
data/Matches.csv


# Create function

name: ETLDEEval

region: sydney

trigger: cloudstorage -> finalise/create (run on data change)

runtime: 256mb, timeout 60

runtime: python 3.9

ingress settings: allow internal traffic + load balancing

*would likely be better to use a data transfer / dataflow*

*inefficient as replacing table rather than appending new values*

In [28]:
def gcs_etl_bq(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    import re
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    
    # set sessions
    gcsSession = storage.Client()
    bqSession = bigquery.Client()
    
    # error gate for filetype
    if event['contentType'] == "text/csv":
        # check / setup bigquery
        # set dataset ID
        datasetName = "deeval"
        datasetID = f"{bqSession.project}.{datasetName}"
        ## dataset check or creation
        try:
            bqSession.get_dataset(datasetID)
        except NotFound:
            #create dataset params
            dataset = bigquery.Dataset(datasetID)
            dataset.location = "australia-southeast1"
            # post to bigquery to create dataset
            datasetCreate = bqSession.create_dataset(dataset, timeout=30)
            # wait for query to complete
            datasetCreate.result()
        
        # ELT to reduce need for memory -> load to ingest table -> create actual table appending timestamp
        #issue with all strings and autodetect -> some tables have typeFixNum column
        
        ## table
        ## set table id        
        # regex to get tablename from event metadata
        tableName = re.match(r".*(?=\.csv)", event['name']).group(0)
        tableName = re.sub(r"\W", "", tableName) 
        tableID = f"{datasetID}.{tableName}"
        ingestTableName = f"{tableName}ingest"
        ingestTableID = f"{datasetID}.{ingestTableName}"
        
        # resource location
        uri = f"gs://{event['bucket']}/{event['name']}"
        
        # need to autodetect schema so one funciton can be used for all files
        loadConfig = bigquery.job.LoadJobConfig(autodetect = True, # auto detect schema
                                                skip_leading_rows = 1, # skips importing header rows
                                                write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # replaces table if existing
                                                source_format=bigquery.SourceFormat.CSV)
        # load data to ingest table
        ingest = bqSession.load_table_from_uri(source_uris = uri,
                                               destination = ingestTableID,
                                               job_config = loadConfig,
                                               timeout = 30)
        # wait for completion
        ingest.result()
        
        # transform adding col
        # tested sql in bigquery before moving into python code
        # leave timestamp as UTC
        # SQL query to create and append table
        transform = f"CREATE OR REPLACE TABLE {tableID} AS SELECT *, CURRENT_TIMESTAMP() AS extracted_at FROM {ingestTableID}"
        
        # post query to bigquery
        transform = bqSession.query(transform)
        # wait for completion
        transform.result()
        
        # verify completion
        destinationTable = bqSession.get_table(tableID)  # Make an API request.
        print("Loaded {} rows.".format(destinationTable.num_rows))
        