## Task 1: Transaction File Unzipping and GBQ Data Upload 

In [1]:
# Import Dependencies 

import os
import re
import datetime 
import zipfile
import logging

from google.cloud import bigquery
from google.oauth2 import service_account
from google.oauth2 import credentials
from google.api_core import exceptions
from google.cloud.exceptions import BadRequest


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import janitor 

In [2]:
# Define Credentials & Establish Connection

gbq_proj_id = "dow-wedge-transactions"
dataset_id = "transactions"  

client = bigquery.Client(project=gbq_proj_id)

In [None]:
# List out current contents of GBQ project:

datasets = list(client.list_datasets())
if datasets:
    for item in datasets:
        print(item.full_dataset_id)
else:
    print("Project empty, no datasets present")


In [None]:
# Check to see if transaction tables exist in datasets (commented out as files have been uploaded)

# Regex pattern to match file naming structure
#trans_pattern = re.compile(r"transArchive_201\d+")

#tables = client.list_tables(dataset_id)

# Flag to track if any deletions were made
#deleted_any = False

#for table in tables:
    # Check if the table ID matches the pattern (starting with 'transArchive_201')
 #   if trans_pattern.search(table.table_id):
        # Construct the full table name using the new project ID
  #      full_name = ".".join([gbq_proj_id, dataset_id, table.table_id])
        
        # Delete the table from the specified project
   #     client.delete_table(full_name, not_found_ok=True)
    #    print(f"Deleted {full_name}.")

        # Set flag to True since a table was deleted
     #   deleted_any = True    

# If no tables were deleted, print a message indicating no changes were made
#if not deleted_any:
 #   print("No tables matched the pattern, no changes made.")

In [None]:
# Script to define our loops to; unzip the transaction files, check to see if the files exist in GBQ, and upload them to GBQ if absent

# Set up status/error logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define paths
ZIP_FILE_PATH = r'E:\files\wedge-clean-files.zip'
DESTINATION_FOLDER = r'E:\files\wedge-transactions'

# creating sub-loops 

def unzip_transaction_files(zip_file_path, destination_folder):
    # Unzip the archive containing transaction files using Try - Except function 
    try:
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(destination_folder)
        logging.info(f"Transaction files unzipped to {destination_folder}") # using logging across board 
    except zipfile.BadZipFile:
        logging.error(f"Error: {zip_file_path} is not a valid zip file.")
    except Exception as e:
        logging.error(f"Error unzipping transaction files: {str(e)}")

def transaction_file_exists_in_bigquery(file_name, dataset_id):
    # Check if a transaction file has already been uploaded to BigQuery using Try - Except function 
    table_name_pattern = re.compile(re.escape(file_name.split('.')[0]))
    try:
        tables = list(client.list_tables(dataset_id))
        return any(table_name_pattern.search(table.table_id) for table in tables)
    except exceptions.NotFound:
        logging.error(f"Dataset {dataset_id} not found.")
        return False
    except Exception as e:
        logging.error(f"Error checking transaction file existence in BigQuery: {str(e)}")
        return False

def upload_transaction_file_to_bigquery(file_path, table_id):
    # Upload a transaction file to BigQuery using Try - Except function 
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        source_format=bigquery.SourceFormat.CSV,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
    )
    
    try:
        with open(file_path, "rb") as source_file:
            job = client.load_table_from_file(source_file, table_id, job_config=job_config)
        job.result()  # Wait for the job to complete
        logging.info(f"Uploaded transaction file {file_path} to BigQuery table {table_id}")
    except exceptions.BadRequest as e:
        logging.error(f"Error uploading transaction file {file_path}: {str(e)}")
    except Exception as e:
        logging.error(f"Unexpected error uploading transaction file {file_path}: {str(e)}")

def process_transaction_files(zip_file_path, destination_folder, project_id, dataset_id):
    """Main function to unzip and upload transaction files if necessary."""
    unzip_transaction_files(zip_file_path, destination_folder)

    for root, _, files in os.walk(destination_folder):
        for file in files:
            if file.endswith(".csv"):
                table_name = file.split('.')[0].replace('-', '_')  # Replace hyphens with underscores (if exist)
                if not transaction_file_exists_in_bigquery(table_name, dataset_id):
                    file_path = os.path.join(root, file)
                    table_id = f"{project_id}.{dataset_id}.{table_name}"
                    upload_transaction_file_to_bigquery(file_path, table_id)
                else:
                    logging.info(f"Skipping upload for transaction file {file}, corresponding table already exists in BigQuery.")

if __name__ == "__main__": # neat name = main trick suggested by Claude (also using Try - Except function)
    try:
        process_transaction_files(ZIP_FILE_PATH, DESTINATION_FOLDER, gbq_proj_id, dataset_id)
    except Exception as e:
        logging.error(f"An error occurred during processing: {str(e)}") # important error logging by file unzip/upload attempt 

In [None]:
# Script to return list of unzipped files not yet uploaded to GBQ, as some files crashed during upload 

TRANSACTION_FILES_FOLDER = r'E:\files\wedge-transactions\clean-files'

def get_bigquery_tables():
    # Retrieve a list of all tables in the specified BigQuery dataset (using Try - Except function)
    try:
        dataset_ref = f"{gbq_proj_id}.{dataset_id}"
        print(f"Attempting to list tables in dataset: {dataset_ref}")
        tables = list(client.list_tables(dataset_ref))
        table_ids = [table.table_id for table in tables]
        print(f"Retrieved {len(table_ids)} tables from BigQuery: {table_ids}")
        return table_ids
    except exceptions.NotFound:
        print(f"Dataset {dataset_id} not found.")
        return []
    except Exception as e:
        print(f"Error retrieving tables from BigQuery: {str(e)}")
        return []

def get_local_transaction_files(folder_path):
    # Retrieve a list of all CSV files in the specified folder
    files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
    print(f"Found {len(files)} local CSV files: {files}")
    return files

def find_missing_files(local_files, bigquery_tables):
    # Identify which local files are not present in BigQuery
    missing_files = []
    for local_file in local_files:
        table_name = local_file.split('.')[0].replace('-', '_')
        print(f"Checking if {local_file} (table name: {table_name}) exists in BigQuery")
        if table_name not in bigquery_tables:
            missing_files.append(local_file)
            print(f"{local_file} not found in BigQuery tables")
        else:
            print(f"{local_file} found in BigQuery tables")
    return missing_files

def main():
    print("Starting the script to check for missing files")
    
    # Get list of tables in BigQuery
    bigquery_tables = get_bigquery_tables()
    
    # Get list of local transaction files
    local_files = get_local_transaction_files(TRANSACTION_FILES_FOLDER)
    
    # Find missing files
    missing_files = find_missing_files(local_files, bigquery_tables)
    
    # Print results
    if missing_files:
        print("The following transaction files are not yet in BigQuery:")
        for file in missing_files:
            print(f"- {file}")
        print(f"\nTotal missing files: {len(missing_files)}")
    else:
        print("All local transaction files are present in BigQuery.")
    
    print("Script execution completed")

if __name__ == "__main__": # name = main trick again
    main()

In [None]:
# Script to target uploads at failed files to upload (note: file names under missing_files list is hardcoded based on above script)

def upload_to_bigquery(file_path, table_id):
    """Upload a CSV file to BigQuery."""
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        source_format=bigquery.SourceFormat.CSV,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
    )
    
    try:
        with open(file_path, "rb") as source_file:
            job = client.load_table_from_file(source_file, table_id, job_config=job_config)
        
        job.result()  # Wait for the job to complete
        print(f"Uploaded {file_path} to {table_id}")
        return True
    except BadRequest as e:
        print(f"Error uploading {file_path}: {str(e)}")
        return False
    except Exception as e:
        print(f"Unexpected error uploading {file_path}: {str(e)}")
        return False

def main():
    # List of missing files (hardcoded file names as opposed to saving file names to list for increased malleability, leaving in oen file name as example)
    missing_files = [
        'transArchive_201210_201212_clean.csv' # this one file failed to upload a few times, unsure why 
    ]

    successful_uploads = 0
    failed_uploads = 0

    for file in missing_files:
        file_path = os.path.join(TRANSACTION_FILES_FOLDER, file)
        table_name = file.split('.')[0].replace('-', '_')
        table_id = f"{gbq_proj_id}.{dataset_id}.{table_name}"

        print(f"Attempting to upload {file} to {table_id}")
        if upload_to_bigquery(file_path, table_id):
            successful_uploads += 1
        else:
            failed_uploads += 1

    print(f"\nUpload process completed.")
    print(f"Successful uploads: {successful_uploads}")
    print(f"Failed uploads: {failed_uploads}")

if __name__ == "__main__": # trick from Claude again
    main()