In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.`
# See the License for the specific language governing permissions and
# limitations under the License.

# Environment Setup

Install the following python packages to setup the environment.

In [1]:
! pip install -U google-cloud-datacatalog
! pip install -U google-cloud-storage
! pip install -U google-cloud-bigquery
! pip install -U numpy

Collecting google-cloud-datacatalog
  Downloading google_cloud_datacatalog-3.18.1-py2.py3-none-any.whl.metadata (5.3 kB)
Downloading google_cloud_datacatalog-3.18.1-py2.py3-none-any.whl (332 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m333.0/333.0 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: google-cloud-datacatalog
Successfully installed google-cloud-datacatalog-3.18.1
Collecting google-cloud-bigquery
  Downloading google_cloud_bigquery-3.17.2-py2.py3-none-any.whl.metadata (8.8 kB)
Downloading google_cloud_bigquery-3.17.2-py2.py3-none-any.whl (230 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m230.3/230.3 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hInstalling collected packages: google-cloud-bigquery
  Attempting uninstall: google-cloud-bigquery
    Found existing installation: google-cloud-bigquery 3.17.1
    Uninstalling google-cloud-bigquery-3.17.1:
      Success

Specify your project ID in the next cell.

In [2]:
! pip install numpy==1.25.0

Collecting numpy==1.25.0
  Downloading numpy-1.25.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Downloading numpy-1.25.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.6/17.6 MB[0m [31m46.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.26.4
    Uninstalling numpy-1.26.4:
      Successfully uninstalled numpy-1.26.4
Successfully installed numpy-1.25.0


In [10]:
PROJECT_ID = 'do-data-hub'  # Change to your project ID
LOCATION = 'us-central1'
DATASET_ID = 'cdp_dataset'

# Tag template 
TAG_TEMPLATE_ID = 'llmcdptemplate'
TAG_TEMPLATE_PATH = f"projects/{PROJECT_ID}/locations/{LOCATION}/tagTemplates/{TAG_TEMPLATE_ID}"

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


### BigQuery: Create dataset

Create a BigQuery dataset to upload the CDP data.

In [11]:
# Create BigQuery Dataset talktodata on your project
from google.cloud import bigquery
from google.cloud import datacatalog_v1

bq_client = bigquery.Client(project=PROJECT_ID)
datacatalog_client = datacatalog_v1.DataCatalogClient()

dataset_id = "{}.{}".format(bq_client.project, DATASET_ID)
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"

# Create the dataset
try:
    dataset = bq_client.create_dataset(dataset, timeout=30)
    print(f'Dataset {DATASET_ID} create successfully.')
except Exception as e:
    print(e)

Dataset cdp_dataset create successfully.


### BigQuery: Create tables and populate with data

The next cell will generate the synthetic data for the tables and load to BigQuery.

> This process will take approximately 1 minute and 30 seconds.

If this process fails, try to recreate the dataset with the cell above and regenerate the data.

In [12]:
from aux_data import data_gen

data_gen.generate_and_populate_dataset(
    PROJECT_ID=PROJECT_ID,
    DATASET_ID=DATASET_ID
)

Creating tables ...
Generating and populating METADATA table ...
Generating and populating CUSTOMERS table ...
Generating and populating EVENTS table ...
Generating and populating TRANSACTIONS table ...


### Setup Data Catalog

The cell bellow will execute the following steps:

1) Specify a query to retrieve the metadata from the tables you just uploaded;
2) Create a TagTemplate on Google Dataplex that specifies how the table will be tagged with medatada;
3) Tag all the tables you created on BigQuery.

In [13]:
from aux_data import bq_tag_generation

bq_tag_generation.create_template_and_tag_bq(
    PROJECT_ID,
    DATASET_ID,
    TAG_TEMPLATE_ID,
    LOCATION
)

Tag created
tag created/updated for event_id
tag created/updated for event_date
tag created/updated for event_type
tag created/updated for customer_id
tag created/updated for email
tag created/updated for city
tag created/updated for state
tag created/updated for channel
tag created/updated for total_purchases
tag created/updated for total_value
tag created/updated for total_emails
tag created/updated for loyalty_score
tag created/updated for is_media_follower
tag created/updated for last_sign_up_date
tag created/updated for last_purchase_date
tag created/updated for last_activity_date
tag created/updated for cart_total
tag created/updated for transaction_id
tag created/updated for transaction_quantity
tag created/updated for transaction_value
tag created/updated for transaction_type
tag created/updated for transaction_date
tag created/updated for product_name
tag created/updated for customer_id
tag created/updated for customer_id
tag created/updated for product_id


# Quick test

Test the integration by retrieving the metadata from BigQuery tables.

In [14]:
QUERY = f'SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.TABLES` WHERE table_name NOT LIKE "%metadata%"'

In [15]:
def get_tags_from_table(table_id):
    # Lookup Data Catalog's Entry referring to the table.
    resource_name = (
        f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{table_id}"
    )
    table_entry = datacatalog_client.lookup_entry(
        request={"linked_resource": resource_name}
    )

    # Make the request
    page_result = datacatalog_client.list_tags(parent=table_entry.name)
    # print(page_result)

    tags_str = ''

    # Handle the response
    for response in page_result:
        if response.template == TAG_TEMPLATE_PATH:
            desc = response.fields["description"].string_value
            data_type = response.fields["data_type"].string_value
            pk = response.fields["is_primary_key"].bool_value
            fk = response.fields["is_foreign_key"].bool_value            
            tags_str += ("Table: {} "
                         "- Column: {} " 
                         "- Data Type: {} " 
                         "- Primary Key: {} " 
                         "- Foreing Key: {} " 
                         "- Description: {}\n".format(
                table_id, response.column, data_type, pk, fk, desc))
    return tags_str

In [16]:
def get_metadata_from_dataset(
        query: str
):
    # print("Gets the metadata once")
    query_job = bq_client.query(query)  # API request
    rows = query_job.result()
    metadata = []

    for row in rows:
        table_metadata = {}
        table_metadata['ddl'] = row.ddl
        table_metadata['description'] = get_tags_from_table(row.table_name)
        metadata.append(table_metadata)
    
    return metadata

In [18]:
tags = get_metadata_from_dataset(QUERY)
for i in tags:
    print(i['description'])

Table: transactions - Column: customer_id - Data Type: INT64 - Primary Key: False - Foreing Key: True - Description: A unique identifier of the customer.
Table: transactions - Column: product_id - Data Type: INT64 - Primary Key: False - Foreing Key: True - Description: The code of the inventory item that was purchased.
Table: transactions - Column: product_name - Data Type: STRING - Primary Key: False - Foreing Key: False - Description: The name of the product that was purchased.
Table: transactions - Column: transaction_date - Data Type: DATE - Primary Key: False - Foreing Key: False - Description: The date the transaction was made.
Table: transactions - Column: transaction_id - Data Type: INT64 - Primary Key: True - Foreing Key: False - Description: A unique identifier for the transaction.
Table: transactions - Column: transaction_quantity - Data Type: INT64 - Primary Key: False - Foreing Key: False - Description: The quantity of items purchased in the transaction.
Table: transaction