In [1]:
!python3 -m venv venv
!source venv/bin/activate

In [None]:
!pip3 install "google-cloud-bigquery>=3.17"
!pip3 install "google-cloud-aiplatform>=1.38"
!pip3 install "pandas>=2.2.0"

In [4]:
import vertexai
from vertexai.preview import generative_models
from vertexai.preview.generative_models import GenerativeModel
from google.cloud import bigquery
import pandas as pd
import random
import time

project_id = "ai-sandbox-sw"
dataset_id = "mstudy"

raw_target_table = "target2"
target_table = "target2_ordered"

raw_source_tables = ["source-uipetmis","source-uispet"]
raw_source_tables_wildcard = 'source*'
source_table = "source_ordered"

maximum_fields_per_request = 12
min_group_size = 4
jobs_per_target_row = 10
job_scheduler_bucket_name = "ai-sandbox-sw-job-scheduler-bucket-test"
queued_jobs_bucket_name = "ai-sandbox-sw-queued-jobs-bucket-test"

# Initialise BQ client
client = bigquery.Client(project=project_id)

In [None]:
import re

def validate_and_extract(string):
    pattern = r"^jobs_per_target_row-(\d{2})-min_group_size-(\d{2})-maximum_fields_per_request-(\d{2})$"
    match = re.match(pattern, string)
    if match:
        jobs_per_target_row = int(match.group(1))
        min_group_size = int(match.group(2))
        maximum_fields_per_request = int(match.group(3))
        return jobs_per_target_row, min_group_size, maximum_fields_per_request
    else:
        return None  # Or you can choose to raise an exception


# Example usage
my_string = "jobs_per_target_row-12-min_group_size-05-maximum_fields_per_request-87"
result = validate_and_extract(my_string)

if result:
    jobs_per_target_row, min_group_size, maximum_fields_per_request = result
    print(jobs_per_target_row)  # Output: 12
    print(min_group_size)       # Output: 5
    print(maximum_fields_per_request)  # Output: 87
else:
    print("The string does not match the format.")


In [5]:
def add_unique_ref_and_create_new_table(project_id, dataset_id, raw_table, new_table, new_col):
    """Adds 'Source_Unique_Ref' column if missing, then creates a new BigQuery table.

    Args:
        project_id: BigQuery project ID.
        dataset_id: BigQuery dataset ID.
        raw_tables: Dict containing raw table names
        new_table: Desired name of finalised table
        new_col: the name of the reference column for the table
    """

    raw_query = f"""
        SELECT *  
        FROM `{project_id}.{dataset_id}.{raw_table}`
    """
    raw_df = client.query(raw_query).to_dataframe()
    raw_df[new_col] = range(1, len(raw_df) + 1)  
    new_table_id = f"{project_id}.{dataset_id}.{new_table}"

    job_config = bigquery.LoadJobConfig()  
    job = client.load_table_from_dataframe(raw_df, new_table_id, job_config=job_config)
    job.result()

    return job.result()

In [6]:
# Source table setup
source_table_ref = client.dataset(dataset_id).table(source_table) 
try:
    client.get_table(source_table_ref)  # Will raise NotFound if the table doesn't exist
    print("Source table '{}' exists.".format(source_table))
except:
    print(f"Source table {source_table} does not exist. Creating Source table...")
    new_source_col = 'Source_Unique_Ref'
    add_unique_ref_and_create_new_table(project_id, client, dataset_id, raw_source_tables_wildcard, source_table, new_source_col)

source_query = f"SELECT * FROM `{project_id}.{dataset_id}.{source_table}`"
source_df = client.query(source_query).to_dataframe()
print(f"source_df length is {source_df.shape[0]}")


# Target table setup
target_table_ref = client.dataset(dataset_id).table(target_table) 
try:
    client.get_table(target_table_ref)  # Will raise NotFound if the table doesn't exist
    print(f"Target table {target_table} exists.")
except:
    print(f"Target table {target_table} does not exist. Creating Target table...")
    new_target_col = 'Target_Unique_Ref'   
    add_unique_ref_and_create_new_table(project_id, dataset_id, raw_target_table, target_table, new_target_col)

target_query = f" SELECT * FROM `{project_id}.{dataset_id}.{target_table}`"
target_df = client.query(target_query).to_dataframe()
print(f"target_df length is {target_df.shape[0]}")

Source table 'source_ordered' exists.
source_df length is 5411
Target table target2_ordered exists.
target_df length is 997


In [9]:
target_df.head()

Unnamed: 0,Original_Unique_Ref,Target_Tranche,Target_Level_1,Target_Level_2,Target_Level_3,Target_Level_4,Target_Complex_Type,Target_Attribute,Target_Description,Target_Mandatory__,Target_Data_Type,Target_Accepted_Values,Target_Validation,Target_Drop_Down_Metaval,Target_Unique_Ref
0,1,CLIENT,Configuration,,,,configuration,dataImportOperatorName,This is the logon name for the data import ope...,Mandatory,string,Reference Data: operator.loginName,Must be Strata Operator ID. ‘-11’,,1
1,2,CLIENT,Configuration,,,,configuration,requestUUID,The is a unique identifier for the XML message,Optional,string (8),,,,2
2,3,CLIENT,Configuration,,,,configuration,noUpdateMode,Setting to true will mean the XML message will...,Optional,boolean,,,,3
3,4,CLIENT,Configuration,,,,configuration,overrideStrataReferences,Setting to true will mean the classicOffsetRef...,Optional,integer,,,,4
4,5,CLIENT,Configuration,,,,configuration,useDvlaLookup,Specifies whether the DVLA service will be cal...,Optional,boolean,,,,5


In [10]:
source_df.head()

Unnamed: 0,Source_SchemaName,Source_TableName,Source_Column_Name,Source_Data_type,Source_Max_Length,Source_precision,Source_scale,Source_is_nullable,Source_Unique_Ref
0,Aggregator,coinsurance_override,force_coinsurance,bit,1,1,0,0,1
1,dbo,bad_quotes_adu_20160722,confirm_excluded_breeds,bit,1,1,0,0,2
2,dbo,blog,show,bit,1,1,0,0,3
3,dbo,charity,active,bit,1,1,0,0,4
4,dbo,discount,staff,bit,1,1,0,0,5


In [7]:
def create_df_groups(df, grouping_levels):
    """Groups a DataFrame by nested schema paths up to a specified level.

    Args:
        df: The DataFrame to group.

    Returns:
        A dictionary of DataFrames, where keys are the nested paths, and
        values are DataFrames containing fields sharing that path. 
    """

    grouped_dfs = {}

    for _, row in df.iterrows():
        path = '.'.join(row[col] for col in grouping_levels)
        if path not in grouped_dfs:
            grouped_dfs[path] = pd.DataFrame(columns=df.columns)  
        grouped_dfs[path] = pd.concat([grouped_dfs[path], row.to_frame().T], ignore_index=True)

    return grouped_dfs

def dataframe_to_string(df):
    """Converts a DataFrame to a string with column names and row values.

    Args:
        df: The pandas DataFrame to convert.

    Returns:
        A string representation of the DataFrame.
    """

    output = f"Column Names: {', '.join(df.columns)}\n"  # Header with column names

    for _, row in df.iterrows():
        row_string = ', '.join(str(value) for value in row)
        output += f"Row: {row_string}\n"

    return output

def chop_source_df_groups(source_df_groups, max_rows_per_group):
    """Chops source dataframe groups into smaller groups with a specified max number of rows.

    Args:
        source_df_groups: The dictionary of source dataframe groups.
        max_rows_per_group: The maximum number of rows allowed in each group.

    Returns:
        A modified dictionary of source dataframe groups with smaller groups.
    """
    chopped_source_df_groups = {}

    for path, source_group_df in source_df_groups.items():
        # Check if the group needs to be chopped
        if len(source_group_df) <= max_rows_per_group:
            chopped_source_df_groups[path] = source_group_df
        else:
            # Split the group into smaller groups with a maximum of max_rows_per_group rows
            num_subgroups = len(source_group_df) // max_rows_per_group
            remainder = len(source_group_df) % max_rows_per_group

            for i in range(num_subgroups):
                start_idx = i * max_rows_per_group
                end_idx = (i + 1) * max_rows_per_group
                sub_df = source_group_df.iloc[start_idx:end_idx]
                chopped_source_df_groups[f"{path}_subgroup_{i+1}"] = sub_df

            # Add the remainder as a separate subgroup
            if remainder > 0:
                sub_df = source_group_df.iloc[-remainder:]
                chopped_source_df_groups[f"{path}_subgroup_{num_subgroups+1}"] = sub_df

    return chopped_source_df_groups


def merge_source_df_groups(source_df_groups, max_rows_per_group):
    """Chops source dataframe groups and combines smaller groups.

    Args:
        source_df_groups: The dictionary of source dataframe groups.
        max_rows_per_group: The maximum number of rows allowed in each group.

    Returns:
        A modified dictionary of source dataframe groups with optimized sizing.
    """
    chopped_source_df_groups = {}
    group_paths = list(source_df_groups.keys())  # Get a list of group paths for iteration

    i = 0
    while i < len(group_paths):
        current_path = group_paths[i]
        current_group_df = source_df_groups[current_path]
            
        # Combine with subsequent groups while possible
        while i + 1 < len(group_paths) and len(current_group_df) + len(source_df_groups[group_paths[i + 1]]) <= max_rows_per_group:
            next_path = group_paths[i + 1]
            next_group_df = source_df_groups[next_path]
            current_group_df = pd.concat([current_group_df, next_group_df], ignore_index=True)
            del source_df_groups[next_path]  # Remove the merged group
            group_paths.pop(i + 1)  # Update the list of group paths

        # Add the combined (or original) group
        chopped_source_df_groups[current_path] = current_group_df
        i += 1

    return chopped_source_df_groups

def merge_small_groups(chopped_source_df_groups, min_group_size):
    """Merges small groups (length < min_group_size) with their preceding groups.

    Args:
        chopped_source_df_groups: The dictionary of chopped dataframe groups.
        min_group_size: min group size allowed (smaller than this is merged with preceding group)

    Returns:
        A modified dictionary of dataframe groups with fewer small groups.
    """
    group_paths = list(chopped_source_df_groups.keys())
    i = 1  # Start from the second group
    while i < len(group_paths):
        current_path = group_paths[i]
        current_group_df = chopped_source_df_groups[current_path]

        if len(current_group_df) < min_group_size:
            prev_path = group_paths[i - 1]
            prev_group_df = chopped_source_df_groups[prev_path]

            # Merge with the previous group
            chopped_source_df_groups[prev_path] = pd.concat([prev_group_df, current_group_df], ignore_index=True)

            # Remove the current group
            del chopped_source_df_groups[current_path]
            group_paths.pop(i) 
        else:
            i += 1

    return chopped_source_df_groups


### Prepare the subdivided groups for the source and destination schemas

This is required so we can come in below the maximum token size for Gemini
(32K input, 2K output) https://ai.google.dev/models/gemini#model_variations

In [37]:
source_grouping_levels = ['Source_SchemaName', 'Source_TableName']
source_df_groups = create_df_groups(source_df, source_grouping_levels)

source_string_groups = []
for path, source_group_df in source_df_groups.items():
    source_group_sting = dataframe_to_string(source_group_df)
    source_string_groups.append(source_group_sting)
    
print(f"Number of source schema dataframe groupings: {len(source_df_groups)}")
print(f"Number of source schema string groupings: {len(source_string_groups)}\n")

#Further split up the source_df_groups to make sure there is no group larger than maximum_fields_per_request variable. This prevents LLM innacuacies when the number of requested field mappings is too high.
chopped_source_df_groups = chop_source_df_groups(source_df_groups, maximum_fields_per_request)
print(f"Number of chopped source schema dataframe groupings: {len(chopped_source_df_groups)}")
chopped_length_counts = {}
for group_df in chopped_source_df_groups.values():
    group_length = len(group_df)
    if group_length in chopped_length_counts:
        chopped_length_counts[group_length] += 1
    else:
        chopped_length_counts[group_length] = 1
print(f"Distribution of chopped lengths:")
for length, count in chopped_length_counts.items():
    print(f"{count} x groups with length {length}")


#Merge groups in cases where adjascent groups could be merged together and still fit below the maximum_fields_per_request variable.
merged_source_df_groups = merge_source_df_groups(chopped_source_df_groups, maximum_fields_per_request)
print(f"\nNumber of merged source schema dataframe groupings: {len(merged_source_df_groups)}")
merged_length_counts = {}
for group_df in merged_source_df_groups.values():
    group_length = len(group_df)
    if group_length in merged_length_counts:
        merged_length_counts[group_length] += 1
    else:
        merged_length_counts[group_length] = 1
print(f"Distribution of merged lengths:")
for length, count in merged_length_counts.items():
    print(f"{count} x groups with length {length}")



#To remove small groups, merge together groups when the group adjascent group is equal or less than , reglardless of maximum_fields_per_request variable.
combined_source_df_groups = merge_small_groups(merged_source_df_groups, min_group_size)
print(f"\nNumber of combined source schema dataframe groupings: {len(combined_source_df_groups)}")

combined_length_counts = {}
for group_df in combined_source_df_groups.values():
    group_length = len(group_df)
    if group_length in combined_length_counts:
        combined_length_counts[group_length] += 1
    else:
        combined_length_counts[group_length] = 1
print(f"Distribution of combined lengths:")
for length, count in combined_length_counts.items():
    print(f"{count} x groups with length {length}")

Number of source schema dataframe groupings: 528
Number of source schema string groupings: 528

Number of chopped source schema dataframe groupings: 752
Distribution of chopped lengths:
58 x groups with length 3
241 x groups with length 12
49 x groups with length 8
48 x groups with length 7
63 x groups with length 5
44 x groups with length 6
98 x groups with length 2
13 x groups with length 11
76 x groups with length 4
13 x groups with length 10
27 x groups with length 9
22 x groups with length 1

Number of merged source schema dataframe groupings: 541
Distribution of merged lengths:
7 x groups with length 3
279 x groups with length 12
51 x groups with length 8
46 x groups with length 11
11 x groups with length 2
29 x groups with length 7
33 x groups with length 10
13 x groups with length 4
13 x groups with length 6
42 x groups with length 9
5 x groups with length 1
12 x groups with length 5

Number of combined source schema dataframe groupings: 519
Distribution of combined lengths:
1 

In [27]:
len(combined_source_df_groups)
count = 0
for group_df in combined_source_df_groups.values():
    if count < 1:
        print(group_df)
        print(type(group_df))
        count = count + 1

  Source_SchemaName      Source_TableName Source_Column_Name Source_Data_type  \
0        Aggregator  coinsurance_override  force_coinsurance              bit   
1        Aggregator  coinsurance_override      aggregator_id              int   
2        Aggregator  coinsurance_override           affinity          varchar   

  Source_Max_Length Source_precision Source_scale Source_is_nullable  \
0                 1                1            0                  0   
1                 4               10            0                  0   
2                 3                0            0                  0   

  Source_Unique_Ref  
0                 1  
1                39  
2               538  
<class 'pandas.core.frame.DataFrame'>


In [41]:
def dataframe_to_custom_source_string(df):

    output = ""  # Header with column names

    for _, row in df.iterrows():
        row_string = f"source_field: {row['Source_SchemaName']}.{row['Source_TableName']}.{row['Source_Column_Name']}; data_type: {row['Source_Data_type']}; source_field_unique_ref: {row['Source_Unique_Ref']}"
        output += f"{row_string}\n"

    return output

In [42]:
print("\nconverting dataframe groupings to string groupings...")
combined_source_string_groups = []
for path, combined_source_df_group in combined_source_df_groups.items():
    combined_source_string_group = dataframe_to_custom_source_string(combined_source_df_group)
    combined_source_string_groups.append(combined_source_string_group)
print(f"...Complete. Number of combined source schema string groupings: {len(combined_source_string_groups)}\n")



converting dataframe groupings to string groupings...
...Complete. Number of combined source schema string groupings: 519



In [43]:
combined_source_string_groups[0]

'source_field: Aggregator.coinsurance_override.force_coinsurance; data_type: bit; source_field_unique_ref: 1\nsource_field: Aggregator.coinsurance_override.aggregator_id; data_type: int; source_field_unique_ref: 39\nsource_field: Aggregator.coinsurance_override.affinity; data_type: varchar; source_field_unique_ref: 538\n'

In [44]:
def split_into_jobs(combined_source_string_groups, number_jobs_per_target_row):
    """Splits a list into jobs and returns a string blob containing all the source string groups for a job.

    Args:
        combined_source_string_groups: The list to be split into jobs.
        number_jobs_per_target_row: The desired number of jobs.

    Returns:
        A list of strings, where each string represents a job.
    """

    list_length = len(combined_source_string_groups)
    items_per_job = list_length // number_jobs_per_target_row
    remainder = list_length % number_jobs_per_target_row

    jobs = []
    for target_row_num in range(target_df.shape[0]):
        start_index = 0
        for job_num in range(number_jobs_per_target_row):
            end_index = start_index + items_per_job
            if job_num < remainder:  # Distribute remainder items across initial jobs
                end_index += 1
                
            job_content = ""
            for index in range(start_index,end_index):
                job_content += f"""{combined_source_string_groups[index]}\n"""
            job_name = f"target-row-{target_row_num}-source-groups-{start_index}-{end_index}"
            jobs.append({'job_name': job_name, 'job_content': job_content})
            start_index = end_index

    return jobs

In [45]:
jobs_list = split_into_jobs(combined_source_string_groups, jobs_per_target_row)

In [47]:
type(jobs_list) #list
jobs_list[0]

{'job_name': 'target-row-0-source-groups-0-52',
 'job_content': 'source_field: Aggregator.coinsurance_override.force_coinsurance; data_type: bit; source_field_unique_ref: 1\nsource_field: Aggregator.coinsurance_override.aggregator_id; data_type: int; source_field_unique_ref: 39\nsource_field: Aggregator.coinsurance_override.affinity; data_type: varchar; source_field_unique_ref: 538\n\nsource_field: dbo.bad_quotes_adu_20160722.confirm_excluded_breeds; data_type: bit; source_field_unique_ref: 2\nsource_field: dbo.bad_quotes_adu_20160722.agree_contact; data_type: bit; source_field_unique_ref: 773\nsource_field: dbo.bad_quotes_adu_20160722.agree_contact_email; data_type: bit; source_field_unique_ref: 774\nsource_field: dbo.bad_quotes_adu_20160722.agree_contact_letter; data_type: bit; source_field_unique_ref: 775\nsource_field: dbo.bad_quotes_adu_20160722.agree_contact_phone; data_type: bit; source_field_unique_ref: 776\nsource_field: dbo.bad_quotes_adu_20160722.agree_contact_SMS; data_type

In [48]:
import io
from google.cloud import storage

def upload_list_items_to_gcs(jobs_list, bucket_name):
    """Iterates over a list and uploads items as objects to Google Cloud Storage.

    Args:
        data_list: The list of items to upload as object contents.
        bucket_name: The name of the Google Cloud Storage bucket.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    for index, item in enumerate(jobs_list):

        blob = bucket.blob(item['job_name'])

        try:
            # Use an in-memory buffer for smaller data
            blob.upload_from_string(item['job_content'])  
            print(f"Uploaded {item['job_name']} successfully.")
        except Exception as e:
            print(f"Error uploading {item['job_name']}: {e}")

upload_list_items_to_gcs(jobs_list, queued_jobs_bucket_name)

Uploaded target-row-0-source-groups-0-52 successfully.
Uploaded target-row-0-source-groups-52-104 successfully.
Uploaded target-row-0-source-groups-104-156 successfully.
Uploaded target-row-0-source-groups-156-208 successfully.
Uploaded target-row-0-source-groups-208-260 successfully.
Uploaded target-row-0-source-groups-260-312 successfully.
Uploaded target-row-0-source-groups-312-364 successfully.
Uploaded target-row-0-source-groups-364-416 successfully.
Uploaded target-row-0-source-groups-416-468 successfully.
Uploaded target-row-0-source-groups-468-519 successfully.
Uploaded target-row-1-source-groups-0-52 successfully.
Uploaded target-row-1-source-groups-52-104 successfully.
Uploaded target-row-1-source-groups-104-156 successfully.
Uploaded target-row-1-source-groups-156-208 successfully.
Uploaded target-row-1-source-groups-208-260 successfully.
Uploaded target-row-1-source-groups-260-312 successfully.
Uploaded target-row-1-source-groups-312-364 successfully.
Uploaded target-row-1-

In [None]:
print(f"*********************************************************************")
print(f"*Jobs summary*")
print(f"*********************************************************************")
print(f"*Target rows:{target_df.shape[0]}*")
print(f"*Source field groups:{combined_source_string_groups}*")
print(f"*Total requests to Gemini:{target_df.shape[0]*combined_source_string_groups}*")
print(f"")
print(f"Which have been split into jobs:")
print(f"Total jobs: {len(jobs_list)}")
print(f"*Jobs per target row:{jobs_per_target_row}*")
print(f"Av. Source field groups (=Av. requests to Gemini) per job:{(target_df.shape[0]*combined_source_string_groups)/len(jobs_list)}")

print(f"*********************************************************************")

In [None]:
def parse_function_call(function_call):
    """Parses a FunctionCall object, adds a description, and returns a JSON-compatible dictionary.

    Args:
        function_call: The FunctionCall object to parse.

    Returns:
        A dictionary containing the function name, attributes, and description.
    """

    result = {
        "function_name": function_call.name,
        "attributes": {},
    }
    for key, value in function_call.args.items():
        result["attributes"][key] = value

    return result

def convert_dict_to_list_of_dicts(dict):
    """Converts a dictionary of lists and strings to a list of flat dictionaries.

    Args:
        data: The input dictionary containing lists and strings.

    Returns:
        A list of dictionaries, where each dictionary represents a  
        combination of elements from the input lists.
    """
    
    list_of_attribute_dicts = []
    string_keys = []
    list_keys = []

    
    for key, value in dict.items() :
        if isinstance(value, str):
            string_keys.append(key)
        else:
            list_keys.append(key)       
    
    for i in range (len(dict[list_keys[0]])):
        new_dict = {}
        
        for key in list_keys:
            new_dict[key] = dict[key][i]
        for key in string_keys:
            new_dict[key] = dict[key]

        list_of_attribute_dicts.append(new_dict)

    return list_of_attribute_dicts

def create_df_from_target_row_df_and_list_of_dicts(list_of_attribute_dicts, test_target_df_row):
    """
    Appends rows to a DataFrame, combining a base row with data from a list of dictionaries.

    Args:
        list_of_dicts: A list of dictionaries, each representing column values.
        test_target_df_row: A DataFrame row containing base columns.

    Returns:
        The modified DataFrame with the newly appended rows.
    """

    df_list = []
     
    for attribute_dict in list_of_attribute_dicts:
        # Combine the base row with the current dictionary

        df = test_target_df_row.copy()  # Make a copy 

        for key in attribute_dict.keys():
            df[key] = attribute_dict[key]

        df_list.append(df)

    return pd.concat(df_list)