<a href="https://colab.research.google.com/github/shrisha-rao/zembo-demo/blob/main/Populate_Bigquery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import zipfile
import io
from google.cloud import storage

def unzip_and_upload(bucket_name, temp_folder="temp_extracted"):
    """Unzips all nested zip files in a GCS bucket and uploads CSV/TXT files."""

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blobs = bucket.list_blobs()  # List all blobs

    zip_blobs = [blob for blob in blobs if blob.name.endswith(".zip")]

    def process_zip(zip_blob, current_path=""):
        """Recursively processes a zip file and its nested zips."""
        try:
            zip_bytes = zip_blob.download_as_bytes()

            with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z:
                for filename in z.namelist():
                    if filename.endswith(".zip"):
                        # Nested zip file
                        with z.open(filename) as nested_zip_file:
                            nested_zip_bytes = nested_zip_file.read()
                            nested_zip_blob_name = os.path.join(current_path, zip_blob.name.replace(".zip", ""), filename)
                            nested_zip_blob = type('obj', (object,), {'name': nested_zip_blob_name, 'download_as_bytes': lambda: nested_zip_bytes}) #create a blob like object
                            process_zip(nested_zip_blob, os.path.join(current_path, zip_blob.name.replace(".zip", ""))) #recursive call
                    elif filename.endswith((".csv", ".txt")):
                        with z.open(filename) as f:
                            data = f.read()

                            # Construct the temporary blob name, preserving folder structure
                            temp_blob_name = os.path.join(temp_folder, current_path, zip_blob.name.replace(".zip", ""), filename)

                            temp_blob = bucket.blob(temp_blob_name)
                            temp_blob.upload_from_string(data)
                            print(f"Uploaded {filename} from {zip_blob.name} to gs://{bucket_name}/{temp_blob_name}")
        except zipfile.BadZipFile:
            print(f"Warning: {zip_blob.name} is not a valid zip file.")
        except Exception as e:
            print(f"Error processing {zip_blob.name}: {e}")

    for zip_blob in zip_blobs:
        process_zip(zip_blob)

In [3]:
# GCP project and bucket details
project_id = "zembo_demo"
bucket_name = "zembo-data" # zembo-data/data
temp_folder = "temp_extracted"  # Temporary folder in your bucket
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/zembo-demo-2ebb06d8deb6.json"
#
unzip_and_upload(bucket_name, temp_folder)

Uploaded BGU9007.csv from data/battery-data-Apr-24/04-Apr-24/BGU9007_to_BGU9009.zip to gs://zembo-data/temp_extracted/data/battery-data-Apr-24/data/battery-data-Apr-24/04-Apr-24/BGU9007_to_BGU9009/BGU9007.csv
Uploaded BGU9008.csv from data/battery-data-Apr-24/04-Apr-24/BGU9007_to_BGU9009.zip to gs://zembo-data/temp_extracted/data/battery-data-Apr-24/data/battery-data-Apr-24/04-Apr-24/BGU9007_to_BGU9009/BGU9008.csv
Uploaded BGU9009.csv from data/battery-data-Apr-24/04-Apr-24/BGU9007_to_BGU9009.zip to gs://zembo-data/temp_extracted/data/battery-data-Apr-24/data/battery-data-Apr-24/04-Apr-24/BGU9007_to_BGU9009/BGU9009.csv
Uploaded BGU9067.csv from data/battery-data-Apr-24/04-Apr-24/BGU9067_to_BGU9069.zip to gs://zembo-data/temp_extracted/data/battery-data-Apr-24/data/battery-data-Apr-24/04-Apr-24/BGU9067_to_BGU9069/BGU9067.csv
Uploaded BGU9068.csv from data/battery-data-Apr-24/04-Apr-24/BGU9067_to_BGU9069.zip to gs://zembo-data/temp_extracted/data/battery-data-Apr-24/data/battery-data-Apr

Load CSVs from Temporary Location to BigQuery

In [17]:
from google.cloud import bigquery
from google.cloud import storage
import os

def create_dataset(project_id, dataset_id):
    """Creates a BigQuery dataset."""

    client = bigquery.Client(project=project_id)
    dataset_id = f"{project_id}.{dataset_id}"
    dataset = bigquery.Dataset(dataset_id)

    try:
        dataset = client.create_dataset(dataset, timeout=30)
        print(f"Created dataset {client.project}.{dataset.dataset_id}")
    except Exception as e:
        print(f"Error creating dataset: {e}")

def populate_bigquery(project_id, dataset_id, bucket_name, temp_folder="temp_extracted"):
    """Populates BigQuery tables from CSV and TXT files, ignoring corrupted CSVs."""

    storage_client = storage.Client(project=project_id)
    bucket = storage_client.bucket(bucket_name)
    bigquery_client = bigquery.Client(project=project_id)

    dataset_ref = bigquery_client.dataset(dataset_id)

    csv_blobs = [blob for blob in bucket.list_blobs(prefix=temp_folder) if blob.name.endswith(".csv")]
    txt_blobs = [blob for blob in bucket.list_blobs(prefix=temp_folder) if blob.name.endswith(".txt")]

    # 1. Populate all CSV files to one table, ignoring corrupted ones
    if csv_blobs:
        csv_uris = []
        for blob in csv_blobs:
            csv_uris.append(f"gs://{bucket_name}/{blob.name}")

        csv_table_id = f"{project_id}.{dataset_id}.battery-data"

        job_config_csv = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.CSV,
            autodetect=True,
            skip_leading_rows=1,  # Corrected: Skip header row
            max_bad_records=1000000
        )

        try:
            load_job_csv = bigquery_client.load_table_from_uri(csv_uris, csv_table_id, job_config=job_config_csv)
            load_job_csv.result()
            print(f"Loaded CSV files to {csv_table_id}")
        except Exception as e:
            print(f"Error loading CSV files: {e}")

    else:
        print("No CSV files found in the temporary folder and its subfolders.")

    # 2. Populate TXT files to different tables
    for txt_blob in txt_blobs:
        table_name = os.path.splitext(txt_blob.name.split('/')[-1])[0] + 'tst'
        table_id = f"{project_id}.{dataset_id}.{table_name}"

        job_config_txt = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.CSV,
            autodetect=True,
            skip_leading_rows=1, #adjust if your txt files have header.
        )

        try:
            load_job_txt = bigquery_client.load_table_from_uri(
                f"gs://{bucket_name}/{txt_blob.name}", table_id, job_config=job_config_txt
            )
            load_job_txt.result()
            print(f"Loaded {txt_blob.name} to {table_id}")
        except Exception as e:
            print(f"Error loading {txt_blob.name}: {e}")

In [36]:
dataset_id = "zembo_data"
project_id = "zembo-demo"
temp_folder = "temp_extracted"
print(f"Project ID: {project_id}")
print(f"Dataset ID: {dataset_id}")


create_dataset(project_id, dataset_id) # create dataset

populate_bigquery(project_id, dataset_id, bucket_name, temp_folder)

Project ID: zembo-demo
Dataset ID: zembo_data
Created dataset zembo-demo.zembo_data
Loaded CSV files to zembo-demo.zembo_data.battery-data
