In [1]:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Dissecting the Data Mesh technical platform : `polyexpose` end to end notebook demo

## Overview
This demo showcases and end to end example of runnig `polyexpose` for exposing a dataset via different interfaces (polyglot data):
* SQL
* GQL
* Events
* Plain text file (CSV)

### Set up your local development environment

If you are  Google Cloud Notebooks, your environment already meets all the requirements to run this notebook. You can skip this step.

Otherwise, make sure your environment meets this notebook's requirements. You need the following:

* The Google Cloud SDK
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to Setting up a Python development environment and the Jupyter installation guide provide detailed instructions for meeting these requirements. 


### Set up `polyexpose` infraestructure on GCP

Prior executing this notebook, some infra. needs to be deployed on GCP, in particular:

* A SPARK cluster (dataproc)
* hasura GQL engine running on GKE + Cloud SQL database for metadata
* GCS buckets
* Service accounts

The easiest way to deploy the infraestructure is running the bootstrap script from the terminal in a Google Cloud Notebooks.

In order to deploy the infra:

* Generate a `config.yaml` file with the following structure, or just edit the supplied template:
```yaml
bigquery:
  #BigQuery dataset name for hasura staging
  dataset: <TO_DO_DEVELOPER>
  #BigQuery dataset location (e.g. US) for hasura staging
  dataset_location: <TO_DO_DEVELOPER>
gcp:
  #Google Cloud Project ID
  gcp_project_id: <TO_DO_DEVELOPER>
  #Google Project number
  gcp_project_number: <TO_DO_DEVELOPER>
  #Google default region
  gcp_region: <TO_DO_DEVELOPER>
  #Google default zone
  gcp_zone: <TO_DO_DEVELOPER>
  #General Service Account name, it needs enough privigies to create/edit resources .. (e.g. Project Owner)
  sa_name : <TO_DO_DEVELOPER>
  #General Service Account name key location where the JSON file will be generated
  sa_key: <TO_DO_DEVELOPER>
  #Terrafrom Service Account name, it needs enough privigies to create/edit resources ..
  sa_terraform_name : <TO_DO_DEVELOPER>
  #Terraform Service Account name key location where the JSON file will be generated
  sa_terraform_key : <TO_DO_DEVELOPER>
gke:
   #GKE cluster name where graphQL engine will be deployed
   gke_cluster_name : <TO_DO_DEVELOPER>
   #hasura needs a metadata database, this will be the prefix name of a Postgres 9.6 database
   dbname_prefix: <TO_DO_DEVELOPER>
gcs:
  #GCS bucket location
  bucket_location: <TO_DO_DEVELOPER>
  #GCS bucket name
  bucket_name: <TO_DO_DEVELOPER>
  #GCS path where sample data will be deployed (e.g. /stg_parquet/ )
  sample_data_folder: <TO_DO_DEVELOPER>
  #GCS path where the SPARK templates will be deployed (e.g. /scripts/ )
  scripts_folder: <TO_DO_DEVELOPER>
  #GCS path Iceberg SPARK catalog (e.g. /warehouse/)
  #TODO (velascoluis) : Add support for metastore service
  warehouse_dir_folder: <TO_DO_DEVELOPER>
hasura:
  #Leave this filed unchanged for now
  url: <UNKNOWN>
pubsublite:
  #PubSubLite Topic region, the name is autogenerated from the base table
  pubsublite_gcp_region: <TO_DO_DEVELOPER>
  #This is the time the stream service will be running in seconds (e.g. 3600) During this time the dataproc SPARK cluster will be up listening for changes on the base Iceberg table
  stream_time: <TO_DO_DEVELOPER>
  #This is the offset time in miliseconds on where to look for table changes. It can 0 to listen for new changes or can be negative (e.g. -1000000000), to start generating events from that point in the past
  timestamp_offset_mili: <TO_DO_DEVELOPER>
spark:
  #SPARK cluster name (single node cluster for now)
  dataproc_cluster_name: <TO_DO_DEVELOPER>
  #SPARK dataproc GCP region
  dataproc_gcp_region: <TO_DO_DEVELOPER>
  spark_templates:
  #Do not edit these fields
  - iceberg_show_catalog: show_catalog.py
  - iceberg_exec_sql: exec_sql.py
  - iceberg_export_csv: export_csv.py
  - iceberg_export_bigquery: export_bigquery.py
  - iceberg_stream_pubsub: stream_pubsub.py
  - create_sample_table: create_sample_table.py
```

If you need more control on the infra (e.g. dataproc/gke cluster number of nodes), feel free to edit the terraform script `main.tf`. The default values are cost conscious.
* Start a shell terminal and execute the `bootstrap_polyexpose_infra.sh config.yaml` script


### Install `polyexpose` package

In [2]:
!pip3 install polyexpose==0.0.5
!pip3 install pyyaml



### Generate configuration for `polyexpose`

Lets create the basic config file for `polyexpose`. We can basically reuse the config file we used for the boostraping step. We just need to edit and adjust:
```yaml
gcp:
  sa_key: infra_deploy/sa-hasura.json
hasura:
  url: http://34.94.177.87/
```

To get the hasura url, just execute:

```bash
jupyter@python-polyexpose:~/infra_deploy$ kubectl get service
NAME         TYPE           CLUSTER-IP     EXTERNAL-IP    PORT(S)        AGE
hasura       LoadBalancer   10.3.249.109   34.94.177.87   80:30453/TCP   2m11s
kubernetes   ClusterIP      10.3.240.1     <none>         443/TCP        16m
```
Copy the EXTERNAL-IP field

In [3]:
"""
Creates the basic config file for polyexpose in YAML format
"""
import yaml
CONFIG_FILE_NAME='config.yaml'
config_contents = yaml.safe_load("""
bigquery:
  dataset: polyexpose
  dataset_location: US
gcp:
  gcp_project_id: my-gcp-project
  gcp_project_number: '1063524325524'
  gcp_region: us-west2
  gcp_zone: us-west2-a
  sa_name : sa-hasura
  sa_key: infra_deploy/sa-hasura.json
  sa_terraform_name : sa-tf-deployer 
  sa_terraform_key : infra_deploy/sa-tf-deployer.json
gke:
   gke_cluster_name : gke-hasura-cluster
   dbname_prefix: sql-hasura-database
gcs:
  bucket_location: US
  bucket_name: gcs-my-project-dev-sandbox
  sample_data_folder: /stg_parquet/
  scripts_folder: /scripts/
  warehouse_dir_folder: /warehouse/
hasura:
  url: http://34.94.177.87/
pubsublite:
  pubsublite_gcp_region: us-west2
  stream_time: '60'
  timestamp_offset_mili: '-1000000000'
spark:
  dataproc_cluster_name: spark-cluster
  dataproc_gcp_region: us-west2
  spark_templates:
  - iceberg_show_catalog: show_catalog.py
  - iceberg_exec_sql: exec_sql.py
  - iceberg_export_csv: export_csv.py
  - iceberg_export_bigquery: export_bigquery.py
  - iceberg_stream_pubsub: stream_pubsub.py
  - create_sample_table: create_sample_table.py
""")
with open(CONFIG_FILE_NAME, 'w') as file:
    documents = yaml.dump(config_contents, file)

### Basic functionality

Generating a client and loading a base table.
The basic abstraction of polyexpose are tables, in order to generate one we need to read the internal catalog and create a reference using the `get_table`function.
Generating the tables and registering in the catalog are responsability of other componets in the data product, but we provide a simple `create_sample_table` function for testing purposes

In [4]:
"""Uncomment for enabling package INFO messages"""
#import logging
#logging.basicConfig()
#logging.getLogger().setLevel(logging.INFO)

'Uncomment for enabling package INFO messages'

In [5]:
from polyexpose import polyexpose

In [6]:
client = polyexpose.PolyExpose(yaml_config=CONFIG_FILE_NAME)

In [7]:
client.init()

Package initialized!


In [8]:
client.create_sample_table('polytable03')

22/07/10 13:41:06 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/07/10 13:41:06 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/07/10 13:41:06 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/07/10 13:41:06 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/07/10 13:41:06 INFO org.sparkproject.jetty.util.log: Logging initialized @4092ms to org.sparkproject.jetty.util.log.Slf4jLog
22/07/10 13:41:06 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_332-b09
22/07/10 13:41:06 INFO org.sparkproject.jetty.server.Server: Started @4221ms
22/07/10 13:41:06 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@2f76deac{HTTP/1.1, (http/1.1)}{0.0.0.0:44591}
22/07/10 13:41:07 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.168.0.42:8032
22/07/

In [9]:
#force_reload bypass a local cache
client.load_catalog(force_reload=True)
client.show_tables()

Tables avaliable in catalog:['default.polytable03']


In [10]:
table_name = 'default.polytable03'
table = client.get_table(table_name)

### Expose mode: SQL

For each expose mode, we first need to enable it using the `expose_as` function. In the SQL case, we just need to run `expose_as("SQL")`on the desired table. This will unlock executing the `query_table` that takes two parameters : [1] list of projections ids, [2] list of filters 

In [11]:
table.expose_as("SQL")

In [12]:
table.show_schema()

22/07/10 13:42:54 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/07/10 13:42:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/07/10 13:42:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/07/10 13:42:54 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/07/10 13:42:54 INFO org.sparkproject.jetty.util.log: Logging initialized @4026ms to org.sparkproject.jetty.util.log.Slf4jLog
22/07/10 13:42:54 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_332-b09
22/07/10 13:42:54 INFO org.sparkproject.jetty.server.Server: Started @4159ms
22/07/10 13:42:54 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@7b3210e5{HTTP/1.1, (http/1.1)}{0.0.0.0:33739}
22/07/10 13:42:55 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.168.0.42:8032
22/07/

In [13]:
table.query_table(['id','first_name','salary','country'], ['salary>50000'])

22/07/10 13:43:28 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/07/10 13:43:28 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/07/10 13:43:28 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/07/10 13:43:28 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/07/10 13:43:28 INFO org.sparkproject.jetty.util.log: Logging initialized @4143ms to org.sparkproject.jetty.util.log.Slf4jLog
22/07/10 13:43:28 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_332-b09
22/07/10 13:43:28 INFO org.sparkproject.jetty.server.Server: Started @4265ms
22/07/10 13:43:28 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@564d22c4{HTTP/1.1, (http/1.1)}{0.0.0.0:43105}
22/07/10 13:43:29 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.168.0.42:8032
22/07/

### Expose mode: CSV

This expose mode exports the data into CSV format, we just need to run `expose_as("CSV")`on the desired table. 

In [14]:
table.expose_as("CSV")

22/07/10 13:44:17 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/07/10 13:44:17 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/07/10 13:44:17 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/07/10 13:44:17 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/07/10 13:44:17 INFO org.sparkproject.jetty.util.log: Logging initialized @4332ms to org.sparkproject.jetty.util.log.Slf4jLog
22/07/10 13:44:17 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_332-b09
22/07/10 13:44:17 INFO org.sparkproject.jetty.server.Server: Started @4457ms
22/07/10 13:44:17 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@61c69bf1{HTTP/1.1, (http/1.1)}{0.0.0.0:37473}
22/07/10 13:44:18 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.168.0.42:8032
22/07/

Lets inspect the first bytes of the exported data in GCS:

In [15]:
!gsutil cat -r 0-512 gs://gcs-velascoluis-dev-sandbox/default.polytable03/*csv

2016-02-03T07:55:29.000Z,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116,Indonesia,3/8/1971,49756.53,Internal Auditor,1E+02
2016-02-03T17:04:03.000Z,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,"",Canada,1/16/1968,150280.17,Accountant IV,""
2016-02-03T01:09:31.000Z,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597,Russia,2/1/1960,144972.51,Structural Engineer,""
2016-02-03T00:36:21.000Z,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,35760315989656

### Expose mode: GQL

This expose mode generates and API endpoint where random GQL commands can be issued agains the table. First we need to enable it using `expose_mode('GQL')` and then we can send GQL statements via the `exec_gql(gql_query)` method

In [16]:
table.expose_as("GQL")

22/07/10 13:45:03 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/07/10 13:45:03 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/07/10 13:45:03 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/07/10 13:45:03 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/07/10 13:45:03 INFO org.sparkproject.jetty.util.log: Logging initialized @4171ms to org.sparkproject.jetty.util.log.Slf4jLog
22/07/10 13:45:03 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_332-b09
22/07/10 13:45:03 INFO org.sparkproject.jetty.server.Server: Started @4299ms
22/07/10 13:45:03 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@1d01f8d0{HTTP/1.1, (http/1.1)}{0.0.0.0:41147}
22/07/10 13:45:04 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.168.0.42:8032
22/07/

In [17]:
gql_query = "query  { \
polyexpose_default_polytable03(limit: 10, where: {salary: {_gt: 50000}}) { \
    id \
    country \
    first_name \
    salary \
  } \
}"
table.exec_gql(gql_query)

{"data":{"polyexpose_default_polytable03":[{"id":"157","country":"France","first_name":"","salary":"155562.05"},{"id":"282","country":"China","first_name":"","salary":"83888.12"},{"id":"404","country":"Japan","first_name":"","salary":"227502.55"},{"id":"545","country":"Japan","first_name":"","salary":"102158.99"},{"id":"653","country":"China","first_name":"","salary":"127912.54"},{"id":"760","country":"China","first_name":"","salary":"259094.47"},{"id":"865","country":"Indonesia","first_name":"","salary":"92768.07"},{"id":"983","country":"China","first_name":"","salary":"104324.94"},{"id":"195","country":"Indonesia","first_name":"Joe","salary":"239690.34"},{"id":"618","country":"Mexico","first_name":"Joe","salary":"273837.25"}]}}


### Expose mode: EVENTs

This expose mode a pubsublite topic (KAFKA compatible) that can be read from a client suscription in real time. It used SPARK Structured Streaming. It follows the same convention, so it gets activated running `expose_as("EVENT")` There are two key elements to configure at `config.yaml` time: `stream_time` and `timestamp_offset_mili` The first one controls the time the stream service will be running in seconds (e.g. 3600) During this time the dataproc SPARK cluster will be up listening for changes on the base Iceberg table. The second one is the offset time in miliseconds on where to look for table changes. It can 0 to listen for new changes or can be negative (e.g. -1000000000), to start generating events from that point in the past. All the data in the table is read and compacted into the `data`field of the  [pubsublite message](https://cloud.google.com/pubsub/lite/docs/reference/rpc/google.cloud.pubsublite.v1#pubsubmessage). 

In [18]:
table.expose_as("EVENT")

22/07/10 13:46:27 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/07/10 13:46:27 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/07/10 13:46:27 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/07/10 13:46:27 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/07/10 13:46:27 INFO org.sparkproject.jetty.util.log: Logging initialized @4318ms to org.sparkproject.jetty.util.log.Slf4jLog
22/07/10 13:46:27 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_332-b09
22/07/10 13:46:27 INFO org.sparkproject.jetty.server.Server: Started @4437ms
22/07/10 13:46:27 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@5b3cdc9{HTTP/1.1, (http/1.1)}{0.0.0.0:43187}
22/07/10 13:46:28 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.168.0.42:8032
22/07/1

Now client code, reading using a suscription

In [20]:
import os
from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)



project_number = 1063524325524
cloud_region = "us-west2"


subscription_id = "subscription_icebergtable"
timeout = 90
location = CloudRegion(cloud_region)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
per_partition_flow_control_settings = FlowControlSettings(
    messages_outstanding=1000,
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

Listening for messages on projects/1063524325524/locations/us-west2/subscriptions/subscription_icebergtable...
