In [None]:
import datasets
import pandas as pd
import google.cloud.bigquery as bq
from google.cloud.exceptions import NotFound

from typing import Union, Sequence, Tuple, Dict, Optional

In [None]:
PROJECT_ID = ""
DATASET_NAME = "vertex_ai"
TABLE_NAME = "test_data"

In [None]:
def bq_create_dataset(project_id: str, dataset_name: str, location='EU') -> bq.dataset.Dataset:
    '''Create big query dataset'''
    client = bq.Client(project=project_id)
    dataset = bq.Dataset(client.dataset(dataset_name))
    dataset.location = location
    try:
        return client.get_dataset(dataset)
    except NotFound as e:
        return client.create_dataset(dataset)

def bq_create_table(
    project_id: str,
    dataset_name: str,
    table_name: str,
    schema: list,
    date_partition_field: Optional[str] = None,
    delete_existing: bool = False
) -> bq.table.Table:
    '''Create big query table'''
    table_ref = '{}.{}.{}'

    client = bq.Client(project=project_id)
    table_ref = table_ref.format(project_id, dataset_name, table_name)
    table = bq.Table(table_ref, schema=schema)

    if date_partition_field is not None:
        try:
            assert [x for x in schema if x.name==date_partition_field][0].field_type == "TIMESTAMP"
        except:
            raise ValueError("date_partition_field needs to be type=TIMESTAMP")
        table.time_partitioning = bq.TimePartitioning(
            type_=bq.TimePartitioningType.DAY,
            field=date_partition_field,  # name of column to use for partitioning
            expiration_ms=None,
        )  # 90 days

    return client.create_table(table, exists_ok=delete_existing)

def bq_insert_rows(
    rows_to_insert: Union[pd.DataFrame, Sequence[Tuple], Sequence[Dict]],
    project_id: str,
    dataset_name: str,
    table_name: str,
    schema: list,
    date_partition_field: Optional[str] = None,
    try_create: bool = True
) -> None:
    '''Insert rows into big query table'''    
    table_ref = '{}.{}.{}'

    client = bq.Client(project=project_id)
    table_ref = table_ref.format(project_id, dataset_name, table_name)

    try:
        table = client.get_table(table_ref.format(project_id, dataset_name, table_name))
    except NotFound as e:
        if try_create:
            table = bq_create_table(
                project_id,
                dataset_name,
                table_name,
                schema,
                date_partition_field
            )
        else:
            raise e

    if isinstance(rows_to_insert, pd.DataFrame):
        return client.insert_rows_from_dataframe(table, rows_to_insert)
    else:
        return client.insert_rows(table, rows_to_insert)

In [None]:
dataset = datasets.load_dataset('sst2', split="test")
df = dataset.data.to_pandas()
df.head()

In [None]:
bq_create_dataset(PROJECT_ID, DATASET_NAME)

In [None]:
schema = [
    bq.SchemaField("idx", "INTEGER"),
    bq.SchemaField("sentence", "STRING"),
    bq.SchemaField("label", "INTEGER")
]

In [None]:
bq_insert_rows(
    df,
    PROJECT_ID,
    DATASET_NAME,
    TABLE_NAME,
    schema,
    try_create=True
)