In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# BigQuery Streaming Reverse ETL into Pub/Sub



| Author |
| --- |
| [Nick Orlove](https://github.com/norlove) |

## Overview

## Overview

This notebook provides a hands-on example of BigQuery's capability to perform **Streaming Reverse ETL into Pub/Sub**. You'll learn how BigQuery can continuously query data from a source table and stream the results to a Pub/Sub topic in near real-time using a [BigQuery continuous query](https://cloud.google.com/bigquery/docs/continuous-queries-introduction).

Traditionally, ETL (Extract, Transform, Load) moves data into a data warehouse. Reverse ETL, on the other hand, moves data *out* of the data warehouse to operational systems or applications.

This notebook demonstrates how BigQuery's continuous query feature, combined with Pub/Sub, enables a powerful Streaming Reverse ETL pattern for real-time event-based handling of data. This allows for immediate consumption of transformed or filtered data by downstream services, enabling real-time analytics, personalized experiences, or operational alerts based on the freshest data in BigQuery.

### Objectives

You will learn to:

*   Set up a BigQuery continuous query and CONTINUOUS slot reservation.
*   Stream data from BigQuery to a Pub/Sub topic in near real-time.
*   Understand the concept of Streaming Reverse ETL and its applications.
*   Verify the streamed data is successfully arriving in Pub/Sub.

### Services and Costs

This tutorial uses the following billable components of Google Cloud:

* **BigQuery**: [Pricing](https://cloud.google.com/bigquery/pricing)

* **Pub/Sub**: [Pricing](https://cloud.google.com/pubsub/pricing)

You can use the [Pricing Calculator](https://cloud.google.com/products/calculator) to generate a cost estimate based on your projected usage.

---

## Before you begin

### Set up your Google Cloud project
**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

3. [Enable the BigQuery and Pub/Sub APIs](https://console.cloud.google.com/flows/enableapi?apiid=bigquery.googleapis.com,pubsub.googleapis.com).

4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk).

Install and upgrade the `bigquery-magics`, `bigframes`, and Pub/Sub libraries. `bigquery-magics` provides convenient IPython magic commands for BigQuery, while `bigframes` enables scalable data analysis with a DataFrame API directly on BigQuery data.


In [3]:
%pip install --upgrade bigquery-magics
%pip install --upgrade bigframes
%pip install google-cloud-pubsub



### Set your project ID

In [4]:
PROJECT_ID = "my-project"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


### Authenticate to your Google Cloud account

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

**1. Colab Enterprise in BigQuery Studio or Vertex AI**
* Do nothing as you are already authenticated.

**2. Colab Consumer - uncomment and run the following:**

In [None]:
from google.colab import auth
auth.authenticate_user()

**3. Local JupyterLab instance, uncomment and run the following:**



In [None]:
# ! gcloud auth login

### Create a Service Account and set permissions for it

In order to run a continuous query with exports to Pub/Sub, you must use a service account. For more information, see [continuous queries documentation](https://cloud.google.com/bigquery/docs/continuous-queries#choose_an_account_type).

Keep in mind, to submit a job that runs using a service account, the user account must have the [Service Account User (roles/iam.serviceAccountUser)](https://cloud.google.com/iam/docs/service-account-permissions#user-role) role.

In [5]:
import time
import os

# Define a unique name for the service account
SERVICE_ACCOUNT_ID = "bq-continuous-query-demo-sa"

# Construct the full email address for the service account
SERVICE_ACCOUNT_EMAIL = f"{SERVICE_ACCOUNT_ID}@{PROJECT_ID}.iam.gserviceaccount.com"

print(f"Creating service account: {SERVICE_ACCOUNT_ID}...")

# Create the service account using gcloud
! gcloud iam service-accounts create {SERVICE_ACCOUNT_ID} \
    --display-name="BigQuery continuous query to Pub/Sub Export Demo" \
    --description="Service account for the BigQuery continuous query to Pub/Sub tutorial" > /dev/null 2>&1

print(f"\nAssigning necessary IAM roles to {SERVICE_ACCOUNT_EMAIL}...")

# Assign the BigQuery User role
! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member="serviceAccount:{SERVICE_ACCOUNT_EMAIL}" \
    --role="roles/bigquery.user" \
    --condition=None \
    --quiet > /dev/null 2>&1

# Assign the BigQuery Data Editor role
! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member="serviceAccount:{SERVICE_ACCOUNT_EMAIL}" \
    --role="roles/bigquery.dataEditor" \
    --condition=None \
    --quiet > /dev/null 2>&1

# Assign the Pub/Sub Publisher role
! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member="serviceAccount:{SERVICE_ACCOUNT_EMAIL}" \
    --role="roles/pubsub.publisher" \
    --condition=None \
    --quiet > /dev/null 2>&1

# Assign the Pub/Sub Viewer role
! gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member="serviceAccount:{SERVICE_ACCOUNT_EMAIL}" \
    --role="roles/pubsub.viewer" \
    --condition=None \
    --quiet > /dev/null 2>&1

print(f"\n✅ Successfully created service account and assigned permissions.")

# Wait ~60 seconds, to give IAM updates time to propagate. Otherwise, subsequent cells may fail.
time.sleep(60)

Creating service account: bq-continuous-query-demo-sa...

Assigning necessary IAM roles to bq-continuous-query-demo-sa@my-project.iam.gserviceaccount.com...

✅ Successfully created service account and assigned permissions.


### Create a Pub/Sub topic and subscription for the continuous query to write to.

Next, we need a destination for our continuous query's output. We'll create a Pub/Sub topic, which is where the query will stream data to.

To verify that data is arriving correctly, we'll also create a Pub/Sub subscription. A subscription allows us to listen to the messages sent to the topic. Later, we'll use this subscription to pull messages and confirm our end-to-end pipeline is working.

In [6]:
# Define unique names for your Pub/Sub topic and subscription.
TOPIC_ID = "cq_results_topic"
SUBSCRIPTION_ID = "cq_results_viewer_sub"

print(f"Creating Pub/Sub topic: {TOPIC_ID}...")

# Create the Pub/Sub topic using the gcloud CLI
! gcloud pubsub topics create {TOPIC_ID}

print(f"\nCreating Pub/Sub subscription: {SUBSCRIPTION_ID}...")

# Create a subscription attached to the topic
! gcloud pubsub subscriptions create {SUBSCRIPTION_ID} --topic={TOPIC_ID}

print(f"\n✅ Successfully created topic '{TOPIC_ID}' and subscription '{SUBSCRIPTION_ID}'.")

Creating Pub/Sub topic: cq_results_topic...
Created topic [projects/my-project/topics/cq_results_topic].

Creating Pub/Sub subscription: cq_results_viewer_sub...
Created subscription [projects/my-project/subscriptions/cq_results_viewer_sub].

✅ Successfully created topic 'cq_results_topic' and subscription 'cq_results_viewer_sub'.


### Create a Source BigQuery Dataset and Table

Now that we have a destination for our data (the Pub/Sub topic), we need a source. A continuous query reads data from a BigQuery table as new rows are added.

In [7]:
%%bigquery --project {PROJECT_ID}

-- This statement creates a new dataset to hold our source table.
CREATE SCHEMA IF NOT EXISTS `cq_source_dataset`
OPTIONS (
  location = 'US',
  description = 'Dataset for the continuous query source table.'
);

-- This statement creates the source table with a defined schema.
CREATE OR REPLACE TABLE `cq_source_dataset.user_clicks`
(
  event_timestamp TIMESTAMP NOT NULL OPTIONS(description="The exact time of the user event."),
  user_id STRING NOT NULL OPTIONS(description="The unique identifier for the user."),
  product_id STRING OPTIONS(description="The identifier for the product clicked."),
  value FLOAT64 OPTIONS(description="The value of the product the user clicked on.")
);

Query is running:   0%|          |

### Create a BigQuery CONTINUOUS Slot Reservation

Continuous queries must run in their own dedicated BigQuery reservation with a CONTINUOUS job type. You can't run them using the on-demand billing model.

A reservation is a dedicated pool of BigQuery processing power (slots). For this demo, we'll create a reservation that uses autoscaling. It will have 0 baseline slots and will automatically scale up to a maximum of 50 slots as needed to run our query. An assignment links this reservation to our project, telling BigQuery to use this specific slot pool for any jobs of type CONTINUOUS.

**Note**: After a continuous query starts running, it actively listens for incoming data, which consumes slot resources. While a reservation with a running continuous query does not scale down to zero slots, an idle continuous query that is primarily listening for incoming data is expected to consume a minimal amount of slots, typically around 1 slot.

In [8]:
%%bigquery --project {PROJECT_ID} --pyformat

-- This statement creates a new reservation with 0 baseline slots
-- and the ability to autoscale up to 50 slots.
CREATE RESERVATION `region-US.cq-demo-reservation`
OPTIONS(
  edition = 'ENTERPRISE',
  slot_capacity = 0, -- Baseline slots
  autoscale_max_slots = 50
);

-- This statement assigns the reservation to the current project specifically for
-- continuous query jobs. The job_type MUST be 'CONTINUOUS'.
CREATE ASSIGNMENT `region-US.cq-demo-reservation.cq-assignment`
OPTIONS(
  assignee = 'projects/{PROJECT_ID}',
  job_type = 'CONTINUOUS'
);

Executing query with job ID: a8230881-5b38-470d-8746-84f0fa557b2d
Query executing: 0.25s

Wait ~180 seconds, to give the BigQuery reservation time to propagate. Otherwise, subsequent cells may fail.


In [9]:
import time

time.sleep(180)

### Start inserting some random data into the source table

The following cell streams some data into BigQuery. It:

1. Generates a random user ID, product ID, and value.

2. Captures the current timestamp.

3. Inserts this data as a new row into the user_clicks table.

This will insert 100 records into the BigQuery table.



In [10]:
import random
from datetime import datetime
from google.cloud import bigquery

# --- Configuration ---
# Ensure your PROJECT_ID variable is set correctly in your notebook environment
# PROJECT_ID = "your-gcp-project-id"
TABLE_ID = f"{PROJECT_ID}.cq_source_dataset.user_clicks"
NUM_RECORDS = 100

# --- Client and Data Setup ---
client = bigquery.Client()
sample_user_ids = [f"user_{i}" for i in range(100, 120)]
sample_product_ids = [f"product_{i}" for i in range(2000, 2050)]
rows_to_insert = []

print(f"Generating {NUM_RECORDS} synthetic records in memory...")

# 1. Create all 100 records and add them to a list
for _ in range(NUM_RECORDS):
    row = {
        "user_id": random.choice(sample_user_ids),
        "product_id": random.choice(sample_product_ids),
        "event_timestamp": datetime.utcnow().isoformat() + "Z",  # 'Z' for UTC timezone
        "value": round(random.uniform(0, 100), 2),
    }
    rows_to_insert.append(row)

print("Generation complete. Inserting rows into BigQuery as a single batch...")

# 2. Insert all records in one API call
errors = client.insert_rows_json(TABLE_ID, rows_to_insert)

# 3. Report the final result
if not errors:
    print(f"✅ Successfully inserted {len(rows_to_insert)} rows into the table.")
else:
    print(f"❌ Encountered errors while inserting rows: {errors}")

Generating 100 synthetic records in memory...
Generation complete. Inserting rows into BigQuery as a single batch...
✅ Successfully inserted 100 rows into the table.


Run a basic query against our table to varify it is receiving data.

In [11]:
%%bigquery --project {PROJECT_ID}

SELECT
  *
FROM
  `cq_source_dataset.user_clicks`
ORDER BY event_timestamp DESC
LIMIT 10;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,event_timestamp,user_id,product_id,value
0,2025-08-28 19:47:14.899146+00:00,user_108,product_2027,14.59
1,2025-08-28 19:47:14.899142+00:00,user_113,product_2046,43.96
2,2025-08-28 19:47:14.899137+00:00,user_105,product_2046,11.69
3,2025-08-28 19:47:14.899133+00:00,user_118,product_2043,88.59
4,2025-08-28 19:47:14.899129+00:00,user_115,product_2025,53.32
5,2025-08-28 19:47:14.899125+00:00,user_103,product_2031,39.7
6,2025-08-28 19:47:14.899120+00:00,user_113,product_2005,0.91
7,2025-08-28 19:47:14.899116+00:00,user_102,product_2031,78.77
8,2025-08-28 19:47:14.899112+00:00,user_103,product_2000,10.07
9,2025-08-28 19:47:14.899107+00:00,user_107,product_2007,53.02


### Create and Start the Continuous Query

A continuous query is a special type of BigQuery job that you define once and it runs continuously in the background.

This cell will start a continuous query job using the BigQuery DataFrames library. It takes the variables defined in previous cells—your PROJECT_ID, SERVICE_ACCOUNT_EMAIL, and TOPIC_ID—and uses them to create a live, streaming job.

In [12]:
import bigframes
import bigframes.pandas as bpd
import bigframes.streaming as bst

bigframes.options._bigquery_options.project = PROJECT_ID
job_id_prefix = "bq_cq_notebook_" #Set the job prefix for your Job ID so that it is easy to find

#Create the StreamingDataFrame from a BigQuery table, select certain columns, filter rows and preview the output
sdf = bst.read_gbq_table("cq_source_dataset.user_clicks")

sdf = sdf[["event_timestamp","user_id", "product_id", "value"]]
sdf = sdf[sdf["value"] > 50]

iso_format_string = '%Y-%m-%dT%H:%M:%E6S%z'
event_timestamp_str = sdf["event_timestamp"].dt.strftime(iso_format_string)
user_id_str = sdf["user_id"].astype(str)
product_id_str = sdf["product_id"].astype(str)
value_str = sdf["value"].astype(str)

sdf["data"] = (
    '{"timestamp": "'
    + event_timestamp_str
    + '", "user_id": "'
    + user_id_str
    + '", "product_id": "'
    + product_id_str
    + '", "value": "'
    + value_str
    + '"}'
)

# 3. Select only the new "data" column to send to Pub/Sub.
sdf = sdf[["data"]]

job = sdf.to_pubsub(
        topic=TOPIC_ID,
        service_account_email=SERVICE_ACCOUNT_EMAIL,
        job_id=None,
        job_id_prefix=job_id_prefix,
    )

change.
change.


Confirm the continuous query has successfully started.

In [13]:
print(job.running())
print(job.error_result)

# Wait ~180 seconds, to give the BigQuery continuous query time to start up and begin to process incoming data.
# Otherwise, subsequent cells may fail.
time.sleep(180)

True
None


### Read from Pub/Sub

Let's manually pull data from our Pub/Sub subscription to verify the continuous query is successfully writing data to it.

**NOTE**: It may take a couple of minutes for the continuous query to fully start up and begin to process incoming data.

In [14]:
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
import json

# --- Configuration ---
# How long to listen for messages
timeout = 30.0
# List to store received messages
received_messages = []
# Maximum number of messages to process before stopping
max_messages_to_process = 10

# Create a SubscriberClient
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)

def callback(message):
    """A simple function called for each message received."""
    try:
        # Decode the message data from bytes to a string
        decoded_data = message.data.decode('utf-8')
        print(f"  -> Received raw message: {decoded_data}")
        # Append the raw string to our list
        received_messages.append(decoded_data)
        # Acknowledge the message so it's not sent again
        message.ack()

        # Stop listening if we've collected enough messages
        if len(received_messages) >= max_messages_to_process:
            streaming_pull_future.cancel()

    except Exception as e:
        print(f"Error processing message: {e}")
        message.nack()

print(f"Listening for messages on {subscription_path}...")
print(f"(Will stop after {max_messages_to_process} messages or {timeout} seconds)")

# The subscribe() method creates a background thread to pull messages.
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

# Wrap the call in a try/except block to handle the timeout.
with subscriber:
    try:
        # This is a blocking call, waiting for the future to complete.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except Exception as e:
        print(f"An error occurred: {e}")
        streaming_pull_future.cancel()

print("\n" + "-" * 60)
print(f"Finished listening. Collected {len(received_messages)} messages.")
print("-" * 60)

if received_messages:
    # Simply print the raw messages as they came in
    for i, msg_str in enumerate(received_messages):
        print(f"Message {i+1}: {msg_str}")
else:
    print("No messages were collected.")

Listening for messages on projects/my-project/subscriptions/cq_results_viewer_sub...
(Will stop after 10 messages or 30.0 seconds)
  -> Received raw message: {"timestamp": "2025-08-28T19:47:14.899022+0000", "user_id": "user_104", "product_id": "product_2048", "value": "83.52"}
  -> Received raw message: {"timestamp": "2025-08-28T19:47:14.899035+0000", "user_id": "user_108", "product_id": "product_2030", "value": "90.33"}
  -> Received raw message: {"timestamp": "2025-08-28T19:47:14.899044+0000", "user_id": "user_115", "product_id": "product_2044", "value": "50.93"}
  -> Received raw message: {"timestamp": "2025-08-28T19:47:14.899048+0000", "user_id": "user_116", "product_id": "product_2020", "value": "91.88"}  -> Received raw message: {"timestamp": "2025-08-28T19:47:14.899052+0000", "user_id": "user_105", "product_id": "product_2045", "value": "86.9"}
  -> Received raw message: {"timestamp": "2025-08-28T19:47:14.899057+0000", "user_id": "user_119", "product_id": "product_2019", "value"

Feel free to run the synthetic generator again and re-pull Pub/Sub to observe new data is still being processed by the continuous query.

### Recap

In this notebook, you have successfully implemented a **BigQuery Streaming Reverse ETL** pipeline to Pub/Sub. You learned how to:

*   Set up necessary Google Cloud resources, including a service account, Pub/Sub topic and subscription, and a BigQuery source table.
*   Create a dedicated BigQuery CONTINUOUS slot reservation to run the continuous query.
*   Simulate a real-time data stream into the BigQuery source table.
*   Define and start a BigQuery continuous query using the BigFrames library to filter and transform data.
*   Stream the results of the continuous query to a Pub/Sub topic.
*   Verify the data ingestion into Pub/Sub by pulling messages from the subscription.

This demonstrates a powerful pattern for delivering fresh, processed data from BigQuery to downstream operational systems and applications in real-time.


---


# Cleaning Up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

In [15]:
import time
import threading

# Stop the continuous query if it's running
# Check if 'job' in local scope and is a Job object from bigframes.streaming
if 'job' in locals() and hasattr(job, 'running') and job.running():
    print("Stopping continuous query...")
    job.cancel()
    print("Continuous query stopped.")

# Delete the BigQuery dataset (which includes the table)
# The dataset name is 'cq_source_dataset' from cell 'ZR9S5lgpOGDr'
print(f"Deleting BigQuery dataset {PROJECT_ID}.cq_source_dataset...")
! bq rm -r -f {PROJECT_ID}:cq_source_dataset

# Delete Pub/Sub subscription
# The subscription ID is 'cq_results_viewer_sub' from cell 'yWKrcj-fMAh7'
print(f"Deleting Pub/Sub subscription {SUBSCRIPTION_ID}...")
! gcloud pubsub subscriptions delete {SUBSCRIPTION_ID} --quiet

# Delete Pub/Sub topic
# The topic ID is 'cq_results_topic' from cell 'yWKrcj-fMAh7'
print(f"Deleting Pub/Sub topic {TOPIC_ID}...")
! gcloud pubsub topics delete {TOPIC_ID} --quiet

# Delete the BigQuery reservation assignment
# The assignment name is 'cq-assignment' from cell '1JY3WhVBKT8c'
print(f"Deleting BigQuery reservation assignment projects/{PROJECT_ID}/locations/US/reservations/cq-demo-reservation/assignments/cq-assignment...")
! bq rm --location=us --project_id={PROJECT_ID} --reservation_assignment cq-demo-reservation.cq-assignment

# Delete the BigQuery reservation
# The reservation name is 'cq-demo-reservation' from cell '1JY3WhVBKT8c'
print(f"Deleting BigQuery reservation projects/{PROJECT_ID}/locations/US/reservations/cq-demo-reservation...")
! bq rm --location=us --project_id={PROJECT_ID} --reservation cq-demo-reservation

# Delete the service account
# The service account email is 'SERVICE_ACCOUNT_EMAIL' from cell 'perms_code'
print(f"Deleting service account {SERVICE_ACCOUNT_EMAIL}...")
! gcloud iam service-accounts delete {SERVICE_ACCOUNT_EMAIL} --quiet

print("✅ Cleanup complete.")

Stopping continuous query...
Continuous query stopped.
Deleting BigQuery dataset my-project.cq_source_dataset...
Deleting Pub/Sub subscription cq_results_viewer_sub...
Deleted subscription [projects/my-project/subscriptions/cq_results_viewer_sub].
Deleting Pub/Sub topic cq_results_topic...
Deleted topic [projects/my-project/topics/cq_results_topic].
Deleting BigQuery reservation assignment projects/my-project/locations/US/reservations/cq-demo-reservation/assignments/cq-assignment...
BigQuery error in rm operation: Failed to delete reservation assignment 'cq-demo-reservation.cq-
assignment': No assignment found for project_id my-project, location US, assignment_id cq-
assignment.
Deleting BigQuery reservation projects/my-project/locations/US/reservations/cq-demo-reservation...
Reservation 'cq-demo-reservation' successfully deleted.
Deleting service account bq-continuous-query-demo-sa@my-project.iam.gserviceaccount.com...
deleted service account [bq-continuous-query-demo-sa@my-project.ia