In [None]:
from google.cloud import bigquery
from google.api_core.exceptions import Conflict
from google.cloud.exceptions import NotFound
import random

DEFAULT_NULLABLE_THRESHOLD = 0.9
DEFAULT_UNIQUE_THRESHOLD = 0.9

In [None]:
def ensure_dataset_exists(client, project_id, dataset_name):
    dataset_ref = client.dataset(dataset_name)
    try:
        dataset = bigquery.Dataset(dataset_ref)
        client.create_dataset(dataset)
        print(f'Dataset {dataset_name} created.')
    except Conflict:
        print(f"Dataset '{dataset_name}' already exists.")


def get_partitioning_info(client, project_id, dataset_name, table_name):
    table_ref = client.dataset(dataset_name).table(table_name)
    table = client.get_table(table_ref)  # API call

    partition_info = {
        "is_partitioned": False,
        "field": None,
        "type": None
    }

    if table.time_partitioning:
        partition_info["is_partitioned"] = True
        partition_info["field"] = table.time_partitioning.field
        partition_info["type"] = str(table.time_partitioning.type_).split('.')[-1]  # Will give "DAY" for day partitioned
    elif table.range_partitioning:
        partition_info["is_partitioned"] = True
        partition_info["field"] = table.range_partitioning.field
        partition_info["type"] = "RANGE"

    return partition_info


def get_all_tables_in_dataset(client, project_id, dataset_name):
    tables = list(client.list_tables(dataset_name))
    return [table.table_id for table in tables]


def get_table_row_count(client, project_id, dataset_name, table_name):
    table_ref = f"{project_id}.{dataset_name}.{table_name}"
    table = client.get_table(table_ref)
    return table.num_rows


def table_exists(client, project_id, dataset_name, table_name):
    table_ref = f"{project_id}.{dataset_name}.{table_name}"
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [None]:
def create_subset_table(client, project_id, dataset_name, table_name, sample_percentage):
    subset_table_name = f"{table_name}_subset"

    # Check if the subset table already exists
    existing_tables = [table.table_id for table in client.list_tables(f"{project_id}.sampling")]
    if subset_table_name in existing_tables:
        print(f"Sample table {subset_table_name} already exists. Skipping table creation.")
        return subset_table_name

    # Ensure the 'sampling' dataset exists
    ensure_dataset_exists(client, project_id, "sampling")

    partition_info = get_partitioning_info(client, project_id, dataset_name, table_name)

    # If table is partitioned by DAY
    if partition_info["is_partitioned"] and partition_info["type"] == "DAY":
        subset_query = f"""
        CREATE OR REPLACE TABLE `{project_id}.sampling.{subset_table_name}` AS
        SELECT *
        FROM `{project_id}.{dataset_name}.{table_name}`
        WHERE {partition_info["field"]} = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
        TABLESAMPLE SYSTEM ({sample_percentage:.2f} PERCENT)
        """

    # If table is partitioned by RANGE (Integer assumed)
    elif partition_info["is_partitioned"] and partition_info["type"] == "RANGE":
        # Fetch a single partition ID using the INFORMATION_SCHEMA.PARTITIONS
        partition_values_query = f"""
        SELECT partition_id
        FROM `{project_id}.{dataset_name}.INFORMATION_SCHEMA.PARTITIONS`
        WHERE table_name = '{table_name}'
        LIMIT 1
        """
        query_job = client.query(partition_values_query)
        results = list(query_job.result())

        # If there are no partition values, exit
        if not results:
            print("No partition values found. Cannot proceed.")
            return

        # Get the partition value from the result
        chosen_partition = results[0].partition_id

        subset_query = f"""
        CREATE OR REPLACE TABLE `{project_id}.sampling.{subset_table_name}` AS
        SELECT *
        FROM `{project_id}.{dataset_name}.{table_name}`
        WHERE {partition_info["field"]} = {chosen_partition}
        """

    else:
        # If not partitioned or another type of partitioning
        subset_query = f"""
        CREATE OR REPLACE TABLE `{project_id}.sampling.{subset_table_name}` AS
        SELECT *
        FROM `{project_id}.{dataset_name}.{table_name}`
        TABLESAMPLE SYSTEM ({sample_percentage:.2f} PERCENT)
        """

    print(f"Creating sample table: {subset_table_name}")
    query_job = client.query(subset_query)
    query_job.result()
    print(f"Sample table {subset_table_name} created successfully.")

    return subset_table_name


In [None]:
def build_and_run_profiling(client, project_id, dataset_name, table_name, nullable_threshold, unique_threshold, profiling_dataset, batch_size=10):

    # Step 1: Get all the columns to be profiled
    columns_query = f"""
    SELECT column_name, data_type
    FROM `{project_id}.{dataset_name}.INFORMATION_SCHEMA.COLUMNS`
    WHERE table_name = '{table_name}'
    AND data_type IN ('STRING', 'BOOL', 'TIMESTAMP', 'DATE', 'INT64', 'NUMERIC', 'FLOAT64')
    """
    query_job = client.query(columns_query)
    columns = list(query_job.result())

    # Step 2: Divide the columns into chunks
    column_chunks = [columns[i:i+batch_size] for i in range(0, len(columns), batch_size)]

    temp_tables = []

    for index, column_chunk in enumerate(column_chunks):
        # Build the query for the current chunk
        query = build_profiling_query_for_chunk(client, project_id, dataset_name, table_name, nullable_threshold, unique_threshold, column_chunk)

        # Define a temp table name for the current chunk
        temp_table_name = f"{project_id}.{profiling_dataset}.temp_{table_name}_{index}"
        temp_tables.append(temp_table_name)

        # Run the query and store the result in the temp table
        job_config = bigquery.QueryJobConfig(destination=temp_table_name)
        client.query(query, job_config=job_config).result()

    # Step 3: Union all the temp tables into the final table
    union_query = " UNION ALL ".join([f"SELECT * FROM {temp_table}" for temp_table in temp_tables])
    final_table_name = f"{project_id}.{profiling_dataset}.profiling_results_{table_name}"
    job_config = bigquery.QueryJobConfig(destination=final_table_name)
    client.query(union_query, job_config=job_config).result()

    # Step 4: Clean up - Delete all the temp tables
    for temp_table in temp_tables:
        client.delete_table(temp_table, not_found_ok=True)


In [None]:
def build_profiling_query_for_chunk(client, project_id, dataset_name, table_name, nullable_threshold, unique_threshold, column_chunk):

    partition_info = get_partitioning_info(client, project_id, dataset_name, table_name)

    subqueries = []
    for column in column_chunk:
      subquery = f"""
      SELECT
          '{column.column_name}' AS column_name,
          '{column.data_type}' AS data_type,
          CAST(MIN({column.column_name}) AS STRING) AS min_val,
          CAST(MAX({column.column_name}) AS STRING) AS max_val,
          COUNTIF({column.column_name} IS NULL) AS null_count,
          COUNTIF({column.column_name} IS NOT NULL) AS not_null_count,
          APPROX_COUNT_DISTINCT({column.column_name}) AS unique_count,
          CAST((SELECT {column.column_name} FROM `{project_id}.{dataset_name}.{table_name}` GROUP BY {column.column_name} ORDER BY COUNT(*) DESC LIMIT 1) AS STRING) AS most_common_value,
          (SELECT COUNT(*) FROM `{project_id}.{dataset_name}.{table_name}` GROUP BY {column.column_name} ORDER BY COUNT(*) DESC LIMIT 1) AS most_common_value_count,
          '{partition_info["field"] if partition_info["field"] else 'No'}' AS partitioned_by_column,
          '{partition_info["type"] if partition_info["type"] else 'No'}' AS partition_type
      FROM `{project_id}.{dataset_name}.{table_name}`
      """
      subqueries.append(subquery)

    final_query = f"""
    WITH total AS (
      SELECT COUNT(*) AS total_count
      FROM `{project_id}.{dataset_name}.{table_name}`
    ),
    stats AS (
      { ' UNION ALL '.join(subqueries) }
    )

    SELECT
      data_type,
      column_name,
      min_val,
      max_val,
      null_count,
      not_null_count,
      CASE WHEN (null_count/total_count) >= {nullable_threshold} THEN 'Yes' ELSE 'No' END AS suggest_not_nullable,
      unique_count,
      CASE WHEN (unique_count/total_count) >= {unique_threshold} THEN 'Yes' ELSE 'No' END AS suggest_unique,
      most_common_value,
      most_common_value_count,
      partitioned_by_column,
      partition_type
    FROM stats
    CROSS JOIN total
    """

    return final_query


In [None]:
# production ready:

def main():
    project_id = 'project-id'
    dataset_name = 'dataset-id'
    sample_percentage = 1
    profiling_dataset = 'data_profiling'

    client = bigquery.Client(project=project_id)

    # Ensure the profiling dataset exists
    ensure_dataset_exists(client, project_id, profiling_dataset)

    # Get all tables in the dataset
    all_tables = get_all_tables_in_dataset(client, project_id, dataset_name)

    for table_name in all_tables:
        # Skip table if row count is 0
        if get_table_row_count(client, project_id, dataset_name, table_name) == 0:
            print(f"Skipping {table_name} as it has no records.")
            continue

        # Check existence of sample and profile tables
        subset_table_name_check = f"{table_name}_subset"
        result_table_name_check = f"profiling_results_{table_name}"

        if table_exists(client, project_id, "sampling", subset_table_name_check) and table_exists(client, project_id, profiling_dataset, result_table_name_check):
            print(f"Both sample and profile tables for {table_name} already exist. Skipping.")
            continue

        print(f"Processing table: {table_name}")

        subset_table_name = create_subset_table(client, project_id, dataset_name, table_name, sample_percentage)
        profiling_query = build_profiling_query(client, project_id, "sampling", subset_table_name, DEFAULT_NULLABLE_THRESHOLD, DEFAULT_UNIQUE_THRESHOLD, profiling_dataset)

        if profiling_query is None:
          continue

        print(f"Profiling query for table {subset_table_name} built.")
        print(profiling_query)

        query_job = client.query(profiling_query)
        results = query_job.result()

        # Store results in a new table with a unique name per source table
        result_table_name = f"profiling_results_{table_name}"
        table_ref = client.dataset(profiling_dataset).table(result_table_name)
        job_config = bigquery.QueryJobConfig(destination=table_ref)
        client.query(profiling_query, job_config=job_config).result()
        print(f"Profiling results for table {subset_table_name} stored in {result_table_name}.")

if __name__ == "__main__":
    main()


In [None]:
# single table testing:
def main():
    project_id = 'project-id'
    dataset_name = 'dataset-id'
    sample_percentage = 1
    profiling_dataset = 'data_profiling'

    client = bigquery.Client(project=project_id)

    # Ensure the profiling dataset exists
    ensure_dataset_exists(client, project_id, profiling_dataset)

    # Get all tables in the dataset
    all_tables = get_all_tables_in_dataset(client, project_id, dataset_name)

    # Get only the first table for processing
    table_name = all_tables[0]

    # Skip table if row count is 0
    if get_table_row_count(client, project_id, dataset_name, table_name) == 0:
        print(f"Skipping {table_name} as it has no records.")
        return

    # Check existence of sample and profile tables
    subset_table_name_check = f"{table_name}_subset"
    result_table_name_check = f"profiling_results_{table_name}"

    if table_exists(client, project_id, "sampling", subset_table_name_check) and table_exists(client, project_id, profiling_dataset, result_table_name_check):
        print(f"Both sample and profile tables for {table_name} already exist. Skipping.")
        return

    print(f"Processing table: {table_name}")

    subset_table_name = create_subset_table(client, project_id, dataset_name, table_name, sample_percentage)
    profiling_query = build_and_run_profiling(client, project_id, "sampling", subset_table_name, DEFAULT_NULLABLE_THRESHOLD, DEFAULT_UNIQUE_THRESHOLD, profiling_dataset, batch_size=10)

    if profiling_query is None:
      return

    print(f"Profiling query for table {subset_table_name} built.")
    print(profiling_query)

    query_job = client.query(profiling_query)
    results = query_job.result()

    # Store results in a new table with a unique name per source table
    result_table_name = f"profiling_results_{table_name}"
    table_ref = client.dataset(profiling_dataset).table(result_table_name)
    job_config = bigquery.QueryJobConfig(destination=table_ref)
    client.query(profiling_query, job_config=job_config).result()
    print(f"Profiling results for table {subset_table_name} stored in {result_table_name}.")

if __name__ == "__main__":
    main()
