In [1]:
import os

from google.cloud import bigquery
from google.oauth2.service_account import Credentials

In [4]:
BIGQUERY_CREDENTIALS_FILE_PATH = "bigquery-user.json"
CREDENTIALS = Credentials.from_service_account_file(BIGQUERY_CREDENTIALS_FILE_PATH)

BQ_CLIENT = bigquery.Client(
    credentials=CREDENTIALS,
)

In [8]:
def init_products(client: bigquery.Client = BQ_CLIENT) -> None:
    query_job = client.query(
        """
        CREATE TABLE IF NOT EXISTS demo.Products (
            ProductID INT64
            , ProductName STRING
            , Category STRING
            , Price NUMERIC
        )
        """
    )

    query_job.result()

    print("demo.Products created.")


def init_sales(client: bigquery.Client = BQ_CLIENT) -> None:
    query_job = client.query(
        """
        CREATE TABLE IF NOT EXISTS demo.Sales (
            TransactionID INT64
            , ProductID INT64
            , Quantity INT64
            , TransactionDate DATE
        )
        """
    )

    query_job.result()

    print("demo.Sales created.")

def etl_products_src_to_bq(client: bigquery.Client = BQ_CLIENT) -> None:
    query_job = client.query(
        """
        INSERT INTO demo.Products (ProductID, ProductName, Category, Price) VALUES
        (1, 'Laptop', 'Electronics', 1200)
        , (2, 'Headphones', 'Electronics', 150)
        , (3, 'Coffee Mug', 'Kitchenware', 20)
        """
    )

    query_job.result()

    print("demo.Products updated.")

def etl_sales_src_to_bq(client: bigquery.Client = BQ_CLIENT) -> None:
    query_job = client.query(
        """
        INSERT INTO demo.Sales (TransactionID, ProductID, Quantity, TransactionDate) VALUES
        (101, 1, 1, '2024-05-01')
        , (102, 2, 2, '2024-05-02')
        , (103, 3, 4, '2024-05-03')
        """
    )

    query_job.result()

    print("demo.Sales updated.")

def update_sales_detail(client: bigquery.Client = BQ_CLIENT) -> None:
    query_job = client.query(
        """
        CREATE OR REPLACE TABLE demo.SalesDetail AS (
            SELECT
                s.TransactionID,
                s.ProductID,
                p.ProductName,
                p.Category,
                p.Price,
                s.Quantity,
                s.TransactionDate
            FROM
                demo.Sales s
            JOIN
                demo.Products p ON s.ProductID = p.ProductID
        )
        """
    )

    query_job.result()

    print("demo.SalesDetail updated.")


In [9]:
if __name__ == "__main__":
    init = True

    if init:
        init_products()
        init_sales()

    etl_products_src_to_bq()
    etl_sales_src_to_bq()
    update_sales_detail()

demo.Products created.
demo.Sales created.
demo.Products updated.
demo.Sales updated.
demo.SalesDetail updated.
