In [None]:
# default_exp bigquery

# BigQuery

> BigQuery-related util functions.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export

from typing import List
from airflow.providers.google.cloud.operators.bigquery import BigQueryUpdateTableSchemaOperator, \
    BigQueryUpdateTableOperator
from netdata_airflow_utils.utils import dest_dict


In [None]:
#export

def make_table_desc_op(destination_table: str, description: str,
                       task_id: str = None) -> BigQueryUpdateTableOperator:
    """Creates an `BigQueryUpdateTableOperator` in order to update table's description."""
    if task_id is None:
        task_id = f"description__{'.'.join(destination_table.split('.')[-2:])}"

    return BigQueryUpdateTableOperator(
        task_id=task_id,
        dataset_id=dest_dict(destination_table)['datasetId'],
        table_id=dest_dict(destination_table)['tableId'],
        table_resource={
            "description": description,
        },
    )

In [None]:
#hide
#tests
desc_op = make_table_desc_op('sample1.sample2.sample3', 'blah', 'random_task_id')
assert type(desc_op) == BigQueryUpdateTableOperator
assert desc_op.task_id == 'random_task_id'

In [None]:
# export

def make_table_schema_update_op(destination_table: str, schema: List[List[str]],
                                task_id: str = None) -> BigQueryUpdateTableSchemaOperator:
    """Creates an `BigQueryUpdateTableSchemaOperator` in order to update schema's fields"""
    if task_id is None:
        task_id = f"schema__{'.'.join(destination_table.split('.')[-2:])}"

    return BigQueryUpdateTableSchemaOperator(
        task_id=task_id,
        dataset_id=dest_dict(destination_table)['datasetId'],
        table_id=dest_dict(destination_table)['tableId'],
        schema_fields_updates=[{'name': s[0], 'description': s[1]} for s in schema]
    )


In [None]:
#tests
#hide
schema_op = make_table_schema_update_op('p.d.t', schema=[['n', 'v']])
assert type(schema_op) == BigQueryUpdateTableSchemaOperator


