# PubSub
In this demonstration/notebook we create a PubSub topic and a BQ subscription and show how messages written to PubSub appear as new messages appened to a BQ table.

A video illustrating a run through of this notebook can be found [here](https://www.youtube.com/watch?v=4aVo_OsFxvM).


We do some setup and create some BQ artifacts.

We create a dataset called `pubsub`.  And in there we create a sample table called `table1`.


In [None]:
# Change the following for your environment
PROJECT="test1-305123"
PROJECT_NUMBER="604474120566"
DATASET="pubsub"
TABLE="table1"
TOPIC="bq-test"
SUBSCRIPTION="bq-sub"

TOPIC_DLQ=f"{TOPIC}-dlq"
SUBSCRIPTION_DLQ=f"{SUBSCRIPTION}-dlq"
PUBSUB_SERVICE_ACCOUNT=f"service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"

from google.cloud import bigquery
import datetime

#
# run_job
#
# Function to run a job
#
def run_job(statement,project=PROJECT):
  job_config = bigquery.QueryJobConfig()
  job_config.use_query_cache = False
  client = bigquery.Client(default_query_job_config=job_config)
  job = client.query(statement, project=project);
  job.result();
  return job;



In [None]:
sql=f"""
CREATE SCHEMA IF NOT EXISTS {DATASET};
CREATE TABLE IF NOT EXISTS {DATASET}.{TABLE}
(
  s String,
  i Integer,
  b Boolean,
  t Timestamp,
  r Struct<A String, B Integer>
);
"""
print(sql)
run_job(sql)

When a messages is published to a topic, PubSub will append the message to a table.  This means that the PubSub subsystem must be granted permissions to insert into the table.  When a subscription pointing to BigQuery is created, we can identify a service account that will be used.  The default is called `service-<PROJECT_NUMBER>@gcp-sa-pubsubsub.iam.gserviceaccount.com`.  Set the IAM permissions on the table to allow PubSub to write into it.

In [None]:
policy=fr"""
{{
  \"bindings\": [
    {{
      \"members\": [
        \"serviceAccount:{PUBSUB_SERVICE_ACCOUNT}\"
      ],
      \"role\": \"roles/bigquery.dataEditor\"
    }}
  ]
}}
"""
!echo "{policy}" > permissions.json
!bq set-iam-policy {PROJECT}:{DATASET}.{TABLE} permissions.json

We can now create our GCP artifacts.  This will include a topic and a subscription.

In [None]:
!gcloud pubsub topics create {TOPIC} --project={PROJECT}
!gcloud pubsub topics create {TOPIC_DLQ} --project={PROJECT}
!gcloud pubsub subscriptions create {SUBSCRIPTION} \
  --topic={TOPIC} \
  --dead-letter-topic={TOPIC_DLQ} \
  --bigquery-table={PROJECT}:{DATASET}.{TABLE} \
  --use-table-schema \
  --project={PROJECT}
!gcloud pubsub subscriptions create {SUBSCRIPTION_DLQ} \
  --topic={TOPIC_DLQ} \
  --project={PROJECT}
!gcloud pubsub topics add-iam-policy-binding {TOPIC_DLQ} \
  --member=serviceAccount:{PUBSUB_SERVICE_ACCOUNT} \
  --role=roles/pubsub.publisher
!gcloud pubsub subscriptions add-iam-policy-binding {SUBSCRIPTION} \
  --member=serviceAccount:{PUBSUB_SERVICE_ACCOUNT} \
  --role=roles/pubsub.subscriber

With the dataset and table created, we can now publish a message.  Our message will contain the following JSON:

```
{
  "s": "Hello World!",
  "i": 1234,
  "b": true,
  "r": {
    "A": "abc",
    "B": 9876
  }
}
```

Now we can publish a message


In [None]:
message=r'{"s":"Hello World!","i":1234,"b":true,' + f'"t":"{datetime.datetime.now().isoformat()}",' + r'"r":{"A": "abc","B":9876}}'
print(message)
!gcloud pubsub topics publish {TOPIC} \
  --message='{message}' \
  --project={PROJECT}

In [None]:
import pandas as pd
pd.set_option('display.width', 200)

sql=f"""
SELECT * FROM {DATASET}.{TABLE}
"""
print(sql)
job = run_job(sql)
print(job.result().to_dataframe())

In [None]:
message=f'This is junk - {datetime.datetime.now().isoformat()}'
print(message)
!gcloud pubsub topics publish {TOPIC} \
  --message='{message}' \
  --project={PROJECT}

In [None]:
!gcloud pubsub subscriptions pull {SUBSCRIPTION_DLQ} --auto-ack --project={PROJECT}

### Clean up
When we are done, we might want to clean up some of the resources we created.

In [None]:
!gcloud pubsub subscriptions delete {SUBSCRIPTION} --project={PROJECT}
!gcloud pubsub subscriptions delete {SUBSCRIPTION_DLQ} --project={PROJECT}
!gcloud pubsub topics delete {TOPIC} --project={PROJECT}
!gcloud pubsub topics delete {TOPIC_DLQ} --project={PROJECT}
sql=f"""
DROP TABLE IF EXISTS {DATASET}.{TABLE};
DROP SCHEMA IF EXISTS {DATASET};
"""
run_job(sql)


## References
Here are some useful references to working with PubSub delivery to BigQuery tables:

* [BigQuery subscriptions | Cloud Pub/Sub Documentation](https://cloud.google.com/pubsub/docs/bigquery)
* [Subscription properties | Pub/Sub Documentation | Google Cloud](https://cloud.google.com/pubsub/docs/subscription-properties)
* [Medium: Stream Your Data Directly From Cloud Pub/Sub to BigQuery via BigQuery Subscription | by Piyush Dhasmana (Data Engineer) | Medium - 2023-08](https://medium.com/@piyush.d005/stream-your-data-directly-from-cloud-pub-sub-to-bigquery-via-bigquery-subscription-62c4b899399c)
* [Medium: Streaming from Google Cloud Pub/Sub to Bigquery without the Middlemen | by Ravish Garg - 2022-08](https://medium.com/google-cloud/streaming-from-google-cloud-pub-sub-to-bigquery-without-the-middlemen-327ef24f4d15)
* [YouTube: PubSub BigQuery Subscription](https://www.youtube.com/watch?v=1JODJO6rLLA)