In [34]:
# imports
import typing
from prefect import task, flow
from prefect_gcp import GcpCredentials
from google.cloud import bigquery

print("Setup Complete")

Setup Complete


In [35]:
# Deployment 2
@task(log_prints=True, name="get-gcp-creds")
# Define a function to get GCP Credentials
def get_bigquery_client():
    gcp_creds_block = GcpCredentials.load(
        "prefect-gcs-2023-creds"
    )  # Change this credentials to yours
    gcp_creds = gcp_creds_block.get_credentials_from_service_account()
    client = bigquery.Client(credentials=gcp_creds)
    return client


 `@task(name='my_unique_name', ...)`


In [36]:
# @flow()
# def create_dataset(dataset_name: str):
#     # Construct a BigQuery client object.
#     client = get_bigquery_client()

#     # TODO(developer): Set dataset_id to the ID of the dataset to create.
#     dataset_id = f"{client.project}.{dataset_name}"

#     # Construct a full Dataset object to send to the API.
#     dataset = bigquery.Dataset(dataset_id)

#     # TODO(developer): Specify the geographic location where the dataset should reside.
#     dataset.location = "asia-southeast1"

#     # Send the dataset to the API for creation, with an explicit timeout.
#     # Raises google.api_core.exceptions.Conflict if the Dataset already
#     # exists within the project.
#     dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
#     print(f"Created dataset {client.project}.{dataset.dataset_id}")
#     return

# create_dataset(dataset_name ="prod_bandcamp")

In [37]:
# Upload data from GCS to BigQuery
@flow(log_prints=True, name="etl-gcs-to-bq")
def etl_gcs_to_bq(file_num: int):
    client = get_bigquery_client()
    table_id = f"dtc-de-2023.stg_bandcamp.albums-full-info-{file_num}"

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        schema=[
            bigquery.SchemaField("_id", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("numTracks", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("keywords", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("datePublished", "DATETIME", mode="NULLABLE"),
            bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("dateModified", "DATETIME", mode="NULLABLE"),
            bigquery.SchemaField("comment", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("inAlbum", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("offers", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("duration_secs", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("url", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("duration", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("isrcCode", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist_image", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist_genre", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist_@type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist_sameAs", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist_name", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist_description", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("track_itemListElement", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("track_@type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("track_numberOfItems", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("track", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("inAlbum_name", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("inAlbum_@id", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("inAlbum_@type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("offers_availability", "STRING", mode="NULLABLE"),
            bigquery.SchemaField(
                "offers_priceSpecification_minPrice", "FLOAT", mode="NULLABLE"
            ),
            bigquery.SchemaField("offers_price", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("offers_@type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("offers_priceCurrency", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("offers_url", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("recordingOf_@type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("recordingOf_lyrics_text", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("recordingOf_lyrics_@type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("byArtist", "FLOAT", mode="NULLABLE"),
        ],
    )
    uri = f"gs://prefect-gcs-bucket-bandcamp/albums-full-info-{file_num}"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)
    print(f"Loaded {destination_table.num_rows} rows.")



 `@flow(name='my_unique_name', ...)`


In [38]:
@flow(log_prints=True, name="deduplicate data")
# Define a function to remove duplicate
def deduplicate_data(num: int):
    client = get_bigquery_client()

    query_dedup = f"CREATE OR REPLACE TABLE \
                        `dtc-de-2023.stg_bandcamp.albums-full-info-{num}`  AS ( \
                            SELECT DISTINCT * \
                            FROM `dtc-de-2023.stg_bandcamp.albums-full-info-{num}` \
                            )"

    # limit query to 10GB
    safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**10)
    # priority=bigquery.QueryPriority.BATCH
    # query
    query_job = client.query(query_dedup, job_config=safe_config)

    # Check progress
    query_job = typing.cast(
        "bigquery.QueryJob",
        client.get_job(
            query_job.job_id, location=query_job.location
        ),  # Make an API request.
    )
    print("Complete removing duplicates")
    print(f"Job {query_job.job_id} is currently in state {query_job.state}")



 `@flow(name='my_unique_name', ...)`


In [39]:
# TABLE_LIST = [f"albums-full-info-{file_num}" for file_num in range(1,12)] 
# TABLE_LIST

In [40]:
# Union all tables
def union_all():
    # Parameters
    client = get_bigquery_client()
    project_id = "dtc-de-2023"
    dataset_id = "stg_bandcamp"
    table_list = [f"albums-full-info-{file_num}" for file_num in range(1,12)]
    # Query
    query_template = "SELECT * FROM `{project_id}.{dataset_id}.{table_name}`"
    union_table_name = "bandcamp_items_all"
    destination_table_id = f"{project_id}.{dataset_id}.{union_table_name}"
    
    union_query = "\nUNION ALL\n".join([query_template.format(project_id=project_id, dataset_id=dataset_id, table_name=table) for table in table_list])
    
    job_config = bigquery.QueryJobConfig(destination=destination_table_id, write_disposition="WRITE_TRUNCATE")
    job = client.query(query=union_query, job_config=job_config)
    job.result
    return


In [41]:
# Parent flow ETL
@flow(log_prints=True, name="etl-parent-to-bq")
def etl_parent_bq_flow(file_num_list: list[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]):
    for file_num in file_num_list:
        etl_gcs_to_bq(file_num)
    # Union all tables
    union_all()


 `@flow(name='my_unique_name', ...)`


In [42]:
# Run main
if __name__ == "__main__":
    file_num_list = [10,11]

    etl_parent_bq_flow(file_num_list)


 `@task(name='my_unique_name', ...)`


BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/dtc-de-2023/jobs?prettyPrint=false: Invalid field name "byArtist_@type". Fields must contain the allowed characters, and be at most 300 characters long. For allowed characters, please refer to https://cloud.google.com/bigquery/docs/schemas#column_names