# Basic BigQuery Operations - Loading Binary Classification Data 

<img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">

... some semi-advanced Code to manage BigQuery Tables including partitions. Though these sample files are so small partitions are not really needed this might come in handy at another time.



### GCP Locations
https://cloud.google.com/about/locations

https://cloud.google.com/bigquery/docs/locations#supported_locations


---

by Markus Lauber (https://medium.com/@mlxl)

https://yam-united.telekom.com/profile/markus-lauber/

In [None]:
from google.colab import auth
auth.authenticate_user()

# project_id = 'de12345-user-prod-1'
project_id = 'de123456-user-prd-1'
dataset_id = 'xgb_classification_project'
bucket_id = 'gs://mybucket/' # enter your path or load the data in another way

from google.cloud import bigquery
import pandas as pd
from pandas_gbq import to_gbq

# Initialize the BigQuery client
client = bigquery.Client(project=project_id)



### Rename Columns that have - (hyphen) in their name to _ (underscore)

Standard big query tables do not seem to like special characters in column names. So best to remove hyphens or integers at the start of the column name (in this case)

In [None]:
def rename_columns_underscore_and_number(project_id, dataset_id, table_re_name):
    """
    Args:
      project_id:
      dataset_id:
      table_re_name:
    """
    # Initialize a client
    client = bigquery.Client(project=project_id)

    # Get the table schema
    table_re_ref = client.dataset(dataset_id).table(table_re_name)
    table_re = client.get_table(table_re_ref)

    # Prepare SQL to rename columns with hyphens or starting with a number
    queries = []
    for field in table_re.schema:
        new_name = field.name

        # Replace hyphens with underscores
        if '-' in field.name:
            new_name = new_name.replace('-', '_')

        # Check if the column name starts with a digit
        if field.name[0].isdigit():
            new_name = f"v_{new_name}"

        # If a change is required, prepare the SQL statement
        if new_name != field.name:
            print("Old: ", field.name, " | New: ", new_name)
            sql = f"ALTER TABLE `{project_id}.{dataset_id}.{table_re_name}` RENAME COLUMN `{field.name}` TO `{new_name}`;"
            queries.append(sql)

    # Execute the queries
    for query in queries:
        client.query(query)

    print("Column renaming completed.")


The Bucket where the Parquet files are being stored

gs://my-bucket/test_data/train.parquet

In [None]:
# Define your table ID (which includes dataset)
table_name = 'census_train'
table_id = f"{project_id}.{dataset_id}.{table_name}"

# Set up the configuration for the load job
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE  # This line replaces the data instead of appending
)

# URI of the Parquet file in Google Cloud Storage
# the files can be found here: https://github.com/ml-score/knime_meets_python/tree/main/machine_learning/binary
# https://archive.ics.uci.edu/ml/datasets/census+income
uri = bucket_id +  'test_data/train.parquet'

# API request - starts the load job
load_job = client.load_table_from_uri(
    uri,
    table_id,
    job_config=job_config
)

# Wait for the job to complete
load_job.result()

print("Job finished.")


Job finished.


In [None]:
# Example usage to remove unwanted characters in column names:
rename_columns_underscore_and_number(project_id, dataset_id, table_name)

Old:  education-num  | New:  education_num
Old:  marital-status  | New:  marital_status
Old:  capital-gain  | New:  capital_gain
Old:  capital-loss  | New:  capital_loss
Old:  hours-per-week  | New:  hours_per_week
Old:  native-country  | New:  native_country
Column renaming completed.


In [None]:
# sometimes it seems on go is not enough to capture all of the instances. So just run this a second or third time
rename_columns_underscore_and_number(project_id, dataset_id, table_name)

Column renaming completed.


In [None]:
# Fetch the table (object)
table = client.get_table(table_id)  # Make sure to provide the full table ID here

```Python
@bpd.remote_function(bpd.DataFrame, str, str)
def add_constant_column(df, column_name, constant_value):
    df[column_name] = constant_value
    return df
```


In [None]:
type(table)

google.cloud.bigquery.table.Table

In [None]:
# Extract the schema and format it into the desired Python code structure
schema_code_snippets = []
for field in table.schema:
    description = field.description if field.description else ''
    schema_line = f'bigquery.SchemaField("{field.name}", "{field.field_type}", description="{description}")'
    schema_code_snippets.append(schema_line)

In [None]:
# Extract the field names and join them with commas
field_names = ", ".join(field.name for field in table.schema)

# Print the comma-separated field names
print("Fields: ", field_names)

Fields:  age, workclass, fnlwgt, education, education_num, marital_status, occupation, relationship, race, sex, capital_gain, capital_loss, hours_per_week, native_country, Target, row_id


In [None]:
# Combine the snippets into the full schema definition
schema_code = "[\n    " + ",\n    ".join(schema_code_snippets) + "\n]"

# Print the generated code
print("# Define the table schema with various data types")
print("schema = ", schema_code)

# Define the table schema with various data types
schema =  [
    bigquery.SchemaField("age", "INTEGER", description=""),
    bigquery.SchemaField("workclass", "STRING", description=""),
    bigquery.SchemaField("fnlwgt", "INTEGER", description=""),
    bigquery.SchemaField("education", "STRING", description=""),
    bigquery.SchemaField("education_num", "INTEGER", description=""),
    bigquery.SchemaField("marital_status", "STRING", description=""),
    bigquery.SchemaField("occupation", "STRING", description=""),
    bigquery.SchemaField("relationship", "STRING", description=""),
    bigquery.SchemaField("race", "STRING", description=""),
    bigquery.SchemaField("sex", "STRING", description=""),
    bigquery.SchemaField("capital_gain", "INTEGER", description=""),
    bigquery.SchemaField("capital_loss", "INTEGER", description=""),
    bigquery.SchemaField("hours_per_week", "INTEGER", description=""),
    bigquery.SchemaField("native_country", "STRING", description=""),
    bigquery.

In [None]:
# Define your dataset and table ID
table_id = 'census_income'

# TIMESTAMP,

# bigquery.SchemaField("category", "STRING", description="")

# Define the table schema with various data types
schema = [
    bigquery.SchemaField("age", "INTEGER", description=""),
    bigquery.SchemaField("workclass", "STRING", description=""),
    bigquery.SchemaField("fnlwgt", "INTEGER", description=""),
    bigquery.SchemaField("education", "STRING", description=""),
    bigquery.SchemaField("education_num", "INTEGER", description=""),
    bigquery.SchemaField("marital_status", "STRING", description=""),
    bigquery.SchemaField("occupation", "STRING", description=""),
    bigquery.SchemaField("relationship", "STRING", description=""),
    bigquery.SchemaField("race", "STRING", description=""),
    bigquery.SchemaField("sex", "STRING", description=""),
    bigquery.SchemaField("capital_gain", "INTEGER", description=""),
    bigquery.SchemaField("capital_loss", "INTEGER", description=""),
    bigquery.SchemaField("hours_per_week", "INTEGER", description=""),
    bigquery.SchemaField("native_country", "STRING", description=""),
    bigquery.SchemaField("Target", "STRING", description=""),
    bigquery.SchemaField("row_id", "STRING", description=""),
    bigquery.SchemaField("category", "STRING", description="Indicating if Test or Training")
]

# Define integer range partitioning settings for the "education_num" column, could also be year or any other
range_partitioning = bigquery.RangePartitioning(
    field="education_num",
    range_=bigquery.PartitionRange(
        start=1,
        end=100,
        interval=1
    )
)

# Define table reference
table_ref = client.dataset(dataset_id).table(table_id)

# Define table with schema and partitioning settings
table = bigquery.Table(table_ref, schema=schema)
table.range_partitioning = range_partitioning

# Specify clustering fields directly in the table object
table.clustering_fields = ["workclass", "occupation"]

# Create the table
client.create_table(table)


Table(TableReference(DatasetReference('de123456-user-prd-1', 'xgb_classification_project'), 'census_income'))

### Import the TEST data also

In [None]:
# Define your table ID (which includes dataset)
table_name = 'census_test'
table_id = f"{project_id}.{dataset_id}.{table_name}"
# table_id = '{}.{}.census_train'.format(project_id, dataset_name)

# Set up the configuration for the load job
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE  # This line replaces the data instead of appending
)

# URI of the Parquet file in Google Cloud Storage
# the files can be found here: https://github.com/ml-score/knime_meets_python/tree/main/machine_learning/binary
# https://archive.ics.uci.edu/ml/datasets/census+income
uri = bucket_id +  'test_data/test.parquet'

# API request - starts the load job
load_job = client.load_table_from_uri(
    uri,
    table_id,
    job_config=job_config
)

# Wait for the job to complete
load_job.result()

print("Job finished.")


Job finished.


In [None]:
rename_columns_underscore_and_number(project_id, dataset_id, table_name)

Old:  education-num  | New:  education_num
Old:  marital-status  | New:  marital_status
Old:  capital-gain  | New:  capital_gain
Old:  capital-loss  | New:  capital_loss
Old:  hours-per-week  | New:  hours_per_week
Old:  native-country  | New:  native_country
Column renaming completed.


In [None]:
rename_columns_underscore_and_number(project_id, dataset_id, table_name)

Column renaming completed.


## Insert TRAIN and TEST into the (partitioned) target table

In [None]:
# Define your source and destination tables
source_table = 'census_test'
source_table_id = f"{project_id}.{dataset_id}.{source_table}"

destination_table = 'census_income'
destination_table_id = f"{project_id}.{dataset_id}.{destination_table}"

print("Source: ", source_table_id, " | Target table: ", destination_table_id)

Source:  de123456-user-prd-1.xgb_classification_project.census_test  | Target table:  de123456-user-prd-1.xgb_classification_project.census_income


## Make sure the table is empty (also handy if you have to set up the table again)

In [None]:
# Create a query to truncate the table
query = f"TRUNCATE TABLE `{destination_table_id}`"

# Execute the query
query_job = client.query(query)
query_job.result()  # Wait for the job to complete

print(f"Table {destination_table_id} has been truncated.")

Table de123456-user-prd-1.xgb_classification_project.census_income has been truncated.


## INSERT TEST data first

In [None]:
# SQL query to append data from source to destination
query = f"""
INSERT INTO `{destination_table_id}`
SELECT *
, '{source_table}' AS category
FROM `{source_table_id}`
WHERE row_id  NOT IN (SELECT DISTINCT row_id FROM `{destination_table_id}` WHERE category = '{source_table}')
"""

# Execute the query
query_job = client.query(query)  # Make an API request.

# Wait for the job to complete
query_job.result()

print("Data has been appended successfully.")


Data has been appended successfully.


## now insert TRAIN data

In [None]:
# Define your source and destination tables
source_table = 'census_train'
source_table_id = f"{project_id}.{dataset_id}.{source_table}"

In [None]:
# SQL query to append data from source to destination
query = f"""
INSERT INTO `{destination_table_id}`
SELECT *
, '{source_table}' AS category
FROM `{source_table_id}`
WHERE row_id  NOT IN (SELECT DISTINCT row_id FROM `{destination_table_id}` WHERE category = '{source_table}')
"""
# Execute the query
query_job = client.query(query)  # Make an API request.

# Wait for the job to complete
query_job.result()

print("Data has been appended successfully.")


Data has been appended successfully.


In [None]:
# SQL Query - modify the query to match your table and desired aggregation
query = f"""
SELECT
  category,
  Target,
  COUNT(*) AS Anzahl
FROM
  `{destination_table_id}` AS t1
  GROUP BY category, Target
  ORDER BY category, Target
"""

# Execute the query and load results into a DataFrame
query_job = client.query(query)  # Run the query
df = query_job.to_dataframe()  # Convert the results into a pandas DataFrame

# Display the DataFrame
print(df)

       category Target  Anzahl
0   census_test      0   11126
1   census_test      1    3527
2  census_train      0   26029
3  census_train      1    8160
