# Import required libraries

In [1]:
pip install google-cloud-storage


Note: you may need to restart the kernel to use updated packages.


In [2]:
import os

# Set the path to your service account key file
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"C:\Users\Sponges-TM\Untitled Folder\service_account_key.json"


In [3]:
import logging  # For logging warnings and messages
import os  # To interact with environment variables
import traceback  # To capture and display error tracebacks
import re  # For working with regular expressions

In [4]:
!pip install --upgrade google-cloud-bigquery




# Import Google Cloud libraries

In [46]:
from google.cloud import bigquery  # For interacting with Google BigQuery
from google.cloud import storage  # For accessing Google Cloud Storage

In [47]:
import yaml  # For reading YAML files

In [48]:
# Load schema configuration from a YAML file (e.g., schemas.yaml)
with open(r"C:\Users\Sponges-TM\Untitled Folder\schemas.yaml") as schema_file:
     config = yaml.load(schema_file, Loader=yaml.Loader)  # Load YAML content into a Python object


# Global variables


In [49]:
PROJECT_ID = os.getenv('cloudquicklab')  # GCP Project ID (fetched from environment variable)
BQ_DATASET = 'staging'  # Name of the BigQuery dataset to store data
CS = storage.Client()  # Initialize Google Cloud Storage client
BQ = bigquery.Client()  # Initialize BigQuery client
job_config = bigquery.LoadJobConfig()  # Initialize BigQuery job configuration

In [50]:
def streaming(data):
    """
    Main function to process incoming data, check table existence, and load data into BigQuery.
    Args:
        data (dict): Dictionary containing information about the uploaded file.
                     Keys: 'bucket', 'name', 'timeCreated'
    """


  # Extract bucket name, filename, and timeCreated from input data


In [51]:
# Sample input data simulating a GCS file event
data = {
    "bucket": "gcs-bucket-name",  # Replace with your GCS bucket name
    "name": "file.json",  # Replace with the name of your file
    "timeCreated": "2024-06-14T00:00:00Z"  # Replace with the actual creation timestamp
}

# Call the main streaming function with the input data
streaming(data)


In [52]:
 bucketname = data['bucket'] 
print("Bucket name:", bucketname)
filename = data['name']   
print("File name:", filename)  
timeCreated = data['timeCreated']
print("Time Created:", timeCreated) 


Bucket name: gcs-bucket-name
File name: file.json
Time Created: 2024-06-14T00:00:00Z


In [53]:
try:
    # Code to attempt
    for table in config:
        tableName = table.get("name")  # Indented code block
except Exception as e:
    print(f"Error occurred: {e}")


##  # Check if the filename matches the table name using regular expressions


In [54]:
            if re.search(tableName.replace("_", "-"), filename) or re.search(tableName, filename):
                tableSchema = table.get("schema")  # Get the table schema from YAML
                _check_if_table_exists(tableName, tableSchema)  # Ensure the table exists in BigQuery
                tableFormat = table.get("format")  # Get the expected file format (e.g., JSON)



 # If the file format matches 'NEWLINE_DELIMITED_JSON', load it into BigQuery


In [69]:
try:
    # Loop through the configuration list to process each table
    for table in config:
        tableName = table.get('name')  # Get the table name
        # Check if the table name matches any part of the filename (allowing underscores or hyphens)
        if re.search(tableName.replace('_', '-'), filename) or re.search(tableName, filename):
            tableSchema = table.get('schema')  # Get the schema for the table
            _check_if_table_exists(tableName, tableSchema)  # Check if the table exists in the schema
            tableFormat = table.get('format')  # Get the table format

            # If the table format is 'NEWLINE_DELIMITED_JSON', process the table accordingly
            if tableFormat == 'NEWLINE_DELIMITED_JSON':
                _load_table_from_uri(data['bucket'], data['name'], tableSchema, tableName)  # Load the table data from the URI

# Catch and handle any exceptions that occur during the file processing
except Exception:
    # Print the error message and include the exception details
    print('Error streaming file. Cause: %s' % (traceback.format_exc()))


In [56]:
def _check_if_table_exists(tableName, tableSchema):
    """
    Checks if the specified table exists in BigQuery. If not, creates the table.
    Args:
        tableName (str): Name of the table to check.
        tableSchema (dict): Table schema definition from YAML file.
    """
    # Construct a BigQuery table ID
    table_id = BQ.dataset(BQ_DATASET).table(tableName)

    try:
        # Attempt to fetch the table to verify its existence
        BQ.get_table(table_id)
    except Exception:
        # If the table doesn't exist, create it
        logging.warning("Creating table: %s" % (tableName))
        schema = create_schema_from_yaml(tableSchema)  # Convert YAML schema to BigQuery schema
        table = bigquery.Table(table_id, schema=schema)  # Create a BigQuery table object
        table = BQ.create_table(table)  # Create the table in BigQuery
        print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


In [57]:
def _load_table_from_uri(bucket_name, file_name, tableSchema, tableName):
    """
    Loads a file from Google Cloud Storage into a BigQuery table.
    Args:
        bucket_name (str): Name of the GCS bucket.
        file_name (str): Name of the file in the bucket.
        tableSchema (dict): Table schema definition from YAML file.
        tableName (str): Name of the BigQuery table to load data into.
    """

# Construct the GCS file URI

In [58]:
 uri = "gs://%s/%s" % (bucketname, filename)
table_id = BQ.dataset(BQ_DATASET).table(tableName)  # Construct the BigQuery table ID


 ### Create BigQuery schema from YAML

In [68]:
def create_schema_from_yaml(tableSchema):
    """
    Converts a table schema definition from YAML format to a BigQuery-compatible schema.
    Args:
        table_schema (list): List of column definitions in YAML format. Each column is defined as:
                             - name: The field name (str).
                             - type: The field type (e.g., STRING, INTEGER, RECORD).
                             - mode: The mode of the field (e.g., NULLABLE, REQUIRED, REPEATED).
                             - fields (optional): For RECORD type, a nested list of field definitions.
    Returns:
        list: A list of BigQuery SchemaField objects.
    """
    schema = []
    # Iterate through each column definition in the YAML schema
    for column in tableSchema:
        # Create a SchemaField object for each column
        schemaField = bigquery.SchemaField(
            name=column["name"],  # Field name
            field_type=column["type"],  # Field type (e.g., STRING, INTEGER)
            mode=column.get("mode", "NULLABLE")  # Field mode, defaulting to NULLABLE if not specified
        )
        # If the column is a RECORD type, process its nested fields recursively
        if column["type"] == "RECORD" and "fields" in column:
            schemaField._fields = create_schema_from_yaml(column["fields"])
        # Add the SchemaField to the schema list
        schema.append(schemaField)
    return schema
print(tableSchema)


NameError: name 'tableSchema' is not defined

In [67]:
schema = create_schema_from_yaml(tableSchema)
print("Schema being used:", schema)


NameError: name 'tableSchema' is not defined