In [None]:
# Ridership Open Lakehouse Demo

This notebook will demonstrate a strategy to implement an open lakehouse on GCP, using Apache Iceberg, as an open source standard for managing data, while still levergin GCP native capabilities. This demo will use BigQuery Manged Iceberg Tables, Managed Apache Kafka and Apache Kafka Connect to ingest streaming data, Vertex AI for Generative AI queries on top of the data and Dataplex to govern tables.

This notebook will load the data generated in the previous notebook to BQ, and setup streaming resources

## Setup the environment

In [None]:
!pip install google-cloud-bigquery google-cloud-aiplatform google-cloud-storage --upgrade --quiet

In [None]:
PROJECT_ID = "your project ID here" # @param {type:"string"}
LOCATION = "us-central1" # @param {type:"string"}

# in-case someone didn't update the project manually, assume current project is the right one
if PROJECT_ID == "your project ID here":
    PROJECT_ID = !gcloud config get-value project
    PROJECT_ID = PROJECT_ID[0]

BUCKET = f"{PROJECT_ID}-ridership-lakehouse" # bucket will be created in a subsequant step
SOURCE_DATA_BUCKET = f"{PROJECT_ID}-ridership-lakehouse"
USER_AGENT = "cloud-solutions/data-to-ai-nb-v3"
BQ_DATASET = "ridership_lakehouse"
PROJECT_ID

## Create Clients

In [None]:
from google.cloud import storage, bigquery
from google.api_core import exceptions
from google.api_core.client_info import ClientInfo
from google.cloud.exceptions import NotFound

bigquery_client = bigquery.Client(
    project=PROJECT_ID,
    location=LOCATION,
    client_info=ClientInfo(user_agent=USER_AGENT)
)
storage_client = storage.Client(
    project=PROJECT_ID,
    client_info=ClientInfo(user_agent=USER_AGENT)
)


## TODO: Move the next 5 cells to Terraform

In [None]:
try:
    bucket = storage_client.create_bucket(BUCKET, location=LOCATION)
    print(f"Bucket {BUCKET} created")
except exceptions.Conflict:
    # Bucket already exists - return the existing bucket
    bucket = storage_client.bucket(BUCKET)
    print(f"Bucket {BUCKET} already exists")
except Exception as e:
    print(f"Error creating bucket {BUCKET}: {e}")

In [None]:
try:
  dataset = bigquery.Dataset(f'{PROJECT_ID}.{BQ_DATASET}')
  dataset.location = LOCATION
  bigquery_client.get_dataset(BQ_DATASET)
  print("dataset exists")
except NotFound:
  bigquery_client.create_dataset(dataset, timeout=30)
  print('dataset created {}'.format(e))

dataset_ref = bigquery_client.dataset(BQ_DATASET)

In [None]:
!bq mk \
--connection \
--location={LOCATION} \
--project_id={PROJECT_ID} \
--connection_type=CLOUD_RESOURCE \
 {BQ_DATASET}

In [None]:
import json
connection_details_json_str = !bq show --format json --connection {PROJECT_ID}.{LOCATION}.{BQ_DATASET}
connection_details_dict = json.loads(connection_details_json_str[0])
CONNECTION_SA_ID = connection_details_dict["cloudResource"]["serviceAccountId"]
if not CONNECTION_SA_ID:
    # it's possible that this command failed, when ran immediately after the previous command
    # this is due to the time it takes the API to be consistent due to async actions on GCP
    # we will wait 10 seconds, and try again
    # if this still fails, we'll throw an exception
    import time
    time.sleep(10)
    connection_details_json_str = !bq show --format json --connection {PROJECT_ID}.{LOCATION}.{BQ_DATASET}
    connection_details_dict = json.loads(connection_details_json_str[0])
    CONNECTION_SA_ID = connection_details_dict["cloudResource"]["serviceAccountId"]
if not CONNECTION_SA_ID:
    raise ValueError("No Service Account detected for BQ Connection")

In [None]:
!gcloud storage buckets add-iam-policy-binding 'gs://{BUCKET_NAME}' \
    --member='serviceAccount:{CONNECTION_SA_ID}' \
    --role=roles/storage.objectUser \
    --quiet

!gcloud storage buckets add-iam-policy-binding 'gs://{BUCKET_NAME}' \
    --member='serviceAccount:{CONNECTION_SA_ID}' \
    --role=roles/storage.legacyBucketReader \
    --quiet

## Create Tables in BigQuery

In [None]:
bus_stops_uri = f"gs://{BUCKET}/iceberg_data/bus_stations/"
bus_lines_uri = f"gs://{BUCKET}/iceberg_data/bus_lines/"
ridership_uri = f"gs://{BUCKET}/iceberg_data/ridership/"

bigquery_client.query(f"DROP TABLE IF EXISTS {BQ_DATASET}.bus_stations;").result()
query = f"""
CREATE TABLE {BQ_DATASET}.bus_stations
(
  bus_stop_id INTEGER,
  address STRING,
  school_zone BOOLEAN,
  seating BOOLEAN,
  latitude FLOAT64,
  longtitude FLOAT64
)
WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{BQ_DATASET}`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = '{bus_stops_uri}');
"""
bigquery_client.query(query).result()

In [None]:
bigquery_client.query(f"DROP TABLE IF EXISTS {BQ_DATASET}.bus_lines;").result()
query = f"""
CREATE TABLE {BQ_DATASET}.bus_lines
(
  bus_line_id INTEGER,
  bus_line STRING,
  number_of_stops INTEGER,
  stops ARRAY<INTEGER>
)
WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{BQ_DATASET}`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = '{bus_lines_uri}');
"""
bigquery_client.query(query).result()

In [None]:
bigquery_client.query(f"DROP TABLE IF EXISTS {BQ_DATASET}.ridership;").result()
query = f"""
CREATE TABLE {BQ_DATASET}.ridership
(
  transit_timestamp TIMESTAMP,
  station_id INTEGER,
  ridership INTEGER
)
WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{BQ_DATASET}`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = '{ridership_uri}');
"""
bigquery_client.query(query).result()

## Load data to Lakehouse tables


In [None]:
table_ref = dataset_ref.table("bus_lines")

# BQ tables for Apache Iceberg do not support load with truncating, so we will truncate manually, and then load
truncate = bigquery_client.query(f"DELETE FROM {BQ_DATASET}.bus_lines WHERE TRUE")
truncate.result()

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)

job = bigquery_client.load_table_from_uri(
    f"gs://{SOURCE_DATA_BUCKET}/mta_staging_data/bus_lines.json",
    table_ref,
    job_config=job_config,
)

job.result()

In [None]:
table_ref = dataset_ref.table("bus_stations")

# BQ tables for Apache Iceberg do not support load with truncating, so we will truncate manually, and then load
truncate = bigquery_client.query(f"DELETE FROM {BQ_DATASET}.bus_stations WHERE TRUE")
truncate.result()

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
)

job = bigquery_client.load_table_from_uri(
    f"gs://{SOURCE_DATA_BUCKET}/mta_staging_data/bus_stations.csv",
    table_ref,
    job_config=job_config,
)

job.result()

In [None]:
table_ref = dataset_ref.table("ridership")

# BQ tables for Apache Iceberg do not support load with truncating, so we will truncate manually, and then load
truncate = bigquery_client.query(f"DELETE FROM {BQ_DATASET}.ridership WHERE TRUE")
truncate.result()

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
)

job = bigquery_client.load_table_from_uri(
    f"gs://{SOURCE_DATA_BUCKET}/mta_staging_data/ridership/*.csv",
    table_ref,
    job_config=job_config,
)

job.result()

## Basic Analytics
After loading the data to our open data lakehouse, we will demonstrate some basic analytics, but we will repeat the process with several different engines
- BigQuery
- Spark (serverless?)
- Dataflow