In [3]:
!pip install python-magic

from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from google.cloud import bigquery

import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
import pandas as pd
import uuid
import pathlib
import magic
import re
import io
import json
import base64



In [4]:
class OpenpyxlSheetImages:
    """
    Class assumes:
      Sheets have column headers in the first row (no offset)
      The first column is used for data (no offset)
      All public methods are 1-indexed
      This class is for reading xls only
    """

    def __init__(self, sheet: Worksheet):
        self._images = [OpenpyxlSheetImage(image) for image in sheet._images]
        self.num_rows = sheet.max_row - 1     # discount header
        self.num_columns = sheet.max_column
        self.columns_with_images = self._set_columns_with_images() # 1-indexed

    def _set_columns_with_images(self):
        aggregator = {}
        for col_idx in range(1, self.num_columns + 1): # maintain 1-index
            if set(self.get_images_by_column(col_idx)) != {None}:
                aggregator[col_idx] = True

        return list(aggregator.keys())

    def get_images(self):
        return self._images

    def get_columns_with_images(self):
        return self.columns_with_images

    def get_images_by_column(self, column: int, as_b64=False):
        return_array = [None] * self.num_rows
        images = [image for image in self._images if image.column + 1 == column]

        for row_index, item in enumerate(return_array):
            for image in images:
                if image.row == row_index:      # both are 0-indexed
                    img = base64.b64encode(image.image_bytes) if as_b64 else image
                    return_array[row_index] = img

        return return_array


    def get_unique_image_names_by_column(self, column: int):
        column_data = self.get_images_by_column(column)

        for index, image in enumerate(column_data):
            if image:
                column_data[index] = image.unique_file_name

        return column_data


class OpenpyxlSheetImage:

    def __init__(self, image):
        self.image_bytes = image._data()          # done this way to sidestep io problem
        self.image_type = image.format            # e.g. png
        self.row = image.anchor._from.row         # 0-indexed
        self.column = image.anchor._from.col      # 0-indexed
        self.path = image.path
        self.mime_type = magic.from_buffer(self.image_bytes, mime=True)
        self.file_name = pathlib.Path(self.path).name
        self.unique_file_name = self._set_unique_filename()

    def _set_unique_filename(self, delimiter: str='-'):
        return pathlib.Path(self.file_name).stem + delimiter + uuid.uuid4().hex + '.' + self.image_type

In [6]:
def get_safe_table_name(blobname: str):
    file_stem = pathlib.Path(blobname).stem
    table_name = file_stem.replace(" ", "_")
    table_name = re.sub(r'[^a-zA-Z0-9_]', '', table_name)
    table_name = table_name.lower()
    if not table_name[0].isalpha():
        table_name = "table_" + table_name

    return table_name

def load_dataframe_to_bq(
    df: pd.DataFrame,
    full_table_id: str,
    write_disp: str=bigquery.enums.WriteDisposition.WRITE_APPEND):

    client = bigquery.Client()
    job_config = bigquery.LoadJobConfig(
        write_disposition=write_disp,
    )

    job = client.load_table_from_dataframe(
        df, full_table_id, job_config=job_config
    )  # Make an API request.
    job.result()  # Wait for the job to complete.

    table = client.get_table(full_table_id)
    print(
        "Loaded {} rows and {} columns to {}".format(
            df.shape[0], len(table.schema), full_table_id
        )
    )

def delete_images(bucket: Bucket, image_names_list: list=[]):
    successful_deletes = []
    had_failure = False

    try:
        for image_name in image_names_list:
            bucket.delete_blob(image_name)
            successful_deletes.append(image_name)

    except Exception as e:
        had_failure = True
        print(e)

    print('Successfully deleted from '+ bucket.name + ' ' + json.dumps(successful_deletes))
    return successful_deletes, had_failure


def store_images(bucket: Bucket, image_handler: OpenpyxlSheetImages):
    successful_uploads = []
    had_failure = False

    images = image_handler.get_images()
    try:
        for image in images:
          blob=bucket.blob(image.unique_file_name)
          blob.upload_from_string(
                data=image.image_bytes,
                content_type=image.mime_type,
                num_retries=3
          )
          successful_uploads.append(image.unique_file_name)

    except Exception as e:
        had_failure = True
        print(e)

    print('Successfully stored to '+ bucket.name + ' ' + json.dumps(successful_uploads))
    return successful_uploads, had_failure

In [7]:
class OpenpyxlSheetHeaderHelper:
    """
    Class assumes:
      All public methods are 1-indexed
      This class is for reading xls only
    """
    def __init__(self, sheet: Worksheet):
        self.column_names = self._set_column_names(sheet)
        self.safe_column_names = self._set_safe_column_names()

    def _set_column_names(self, sheet: Worksheet) -> list:
        columns = []
        for row in sheet.iter_rows(min_row=1, max_row=1):
            for cell in row:
                columns.append(cell.value)

        return columns

    def _set_safe_column_names(self):
        return [self.get_safe_column_name(name) for name in self.column_names]

    def get_safe_column_name(self, unsafe_name: str):
        safe_name = unsafe_name.replace(" ", "_")
        safe_name = re.sub(r'[^a-zA-Z0-9_]', '', safe_name)
        safe_name = safe_name.lower()
        if not safe_name[0].isalpha():
            safe_name = "col_" + safe_name

        return safe_name

    def get_column_names(self, safe_names: bool=False):
        return self.safe_column_names if safe_names else self.column_names

    def get_column_name_by_index(self, idx: int, safe_names: bool=False):
        return self.get_column_names(safe_names)[idx - 1]

    def get_index_by_column_name(self, column_name: int, safe_names: bool=False):
        the_list = (self.safe_column_names if safe_names else self.column_names)

        return the_list.index(column_name) + 1  # maintain 1-index

    def get_pandas_rename_dict(self):
        rename_dict = {}
        for idx, column in enumerate(self.column_names):
            rename_dict[column] = self.safe_column_names[idx]

        return rename_dict

In [8]:
CONST_PROJECT_NAME = '<your project name>'
CONST_BQ_DATASET_NAME = '<your dataset name>'
CONST_IMAGE_PROCESSED_BUCKET_NAME = '<your object bucket name>'


TEST_EVENT_DATA = {
    "updated": "2024-10-23T19:06:39.349Z",
    "kind": "storage#object",
    "generation": "1729710399346045",
    "bucket": "ingestion-demo-landing-test-bucket",
    "etag": "CP2qq6+ZpYkDEAE=",
    "contentType": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    "md5Hash": "5hwHeX38TuK31NFmTWmXNQ==",
    "size": "4567895",
    "metageneration": "1",
    "timeCreated": "2024-10-23T19:06:39.349Z",
    "timeStorageClassUpdated": "2024-10-23T19:06:39.349Z",
    "crc32c": "yVwtNA==",
    "name": "CDD Excel Export - cmpd registration.xlsx",
    "storageClass": "STANDARD"
}
TEST_CLOUD_EVENT = {
    "data": TEST_EVENT_DATA
}

In [11]:
def main(cloud_event):

    # Event data
    event_data = cloud_event['data']
    file_name = event_data['name']
    time_created = event_data['timeCreated']


    # Storage Client / Buckets
    storage_client = storage.Client()
    landing_bucket = storage_client.get_bucket(event_data['bucket'])
    image_processed_bucket = storage_client.get_bucket(CONST_IMAGE_PROCESSED_BUCKET_NAME)

    # Get xls
    buffer = io.BytesIO()
    the_file_blob = landing_bucket.get_blob(event_data['name'])
    the_file = the_file_blob.download_to_file(buffer)

    # Openpyxl
    wb = openpyxl.load_workbook(buffer)
    default_sheet = wb.sheetnames[0]
    sheet = wb[default_sheet]

    # Special Handlers
    image_handler = OpenpyxlSheetImages(sheet)
    header_helper = OpenpyxlSheetHeaderHelper(sheet)

    ######################################
    # Handle table data - load dataframe
    df = pd.read_excel(buffer)

    # Process Dataframe step 1: patch in unique filenames into columns images were extracted from
    col_idx_with_images = image_handler.get_columns_with_images()

    for col_idx in col_idx_with_images:
        column_name = header_helper.get_column_name_by_index(col_idx)
        b64_column_name = header_helper.get_safe_column_name(column_name) + '_base64'

        df[column_name] = image_handler.get_unique_image_names_by_column(col_idx)
        df[b64_column_name] = image_handler.get_images_by_column(col_idx, as_b64=True)

    # Process Dataframe step 2: rename columns to safe value
    column_rename_dict = header_helper.get_pandas_rename_dict()
    df.rename(columns = column_rename_dict, inplace = True)

    # Load to BQ
    full_table_id = '{}.{}.{}'.format(
        CONST_PROJECT_NAME,
        CONST_BQ_DATASET_NAME,
        get_safe_table_name(file_name),
    )

    load_dataframe_to_bq(df, full_table_id)


    ######################################
    # Attempt upload of images
    uploaded_image_list, load_had_failure = store_images(
        bucket=image_processed_bucket,
        image_handler=image_handler
    )

    if load_had_failure:

      deleted_image_list, del_had_failure = delete_images(
          bucket=image_processed_bucket,
          image_names_list=uploaded_image_list
      )

      if del_had_failure:
        diff = list(set(uploaded_image_list).difference(set(deleted_image_list)))
        raise Exception('image uploads failed, manually delete '+json.dumps(diff))


    # TODO:
    # Add rollback if BQ save fails
    # Add ingest time to processed file name and move into processed bucket


main(TEST_CLOUD_EVENT)

Loaded 499 rows and 8 columns to project.dataset.table
Successfully stored to object-table-bucket ["image1-4f8a1544e0e3427c87ed8b25cba977e7.png", "image1-187c3d40a8c64d42bc35c2e3b1424deb.png"]


In [12]:
## For the deletion of test images - easier than deleting files from the console
storage_client = storage.Client()
landing_bucket = storage_client.get_bucket(CONST_IMAGE_PROCESSED_BUCKET_NAME)
blobs = landing_bucket.list_blobs()

landing_bucket.delete_blobs([blob.name for blob in blobs])


# client = bigquery.Client()
#     job_config = bigquery.LoadJobConfig(
#         write_disposition=write_disp,
#     )

#     job = client.load_table_from_dataframe(
#         df, full_table_id, job_config=job_config
#     )  # Make an API request.
#     job.result()  # Wait for the job to complete.

In [None]:
!python --version


Python 3.10.12
