In [1]:
import dynamic_singer as dsinger
from dynamic_singer.extra import postgres
from google.cloud import secretmanager
import json
import psycopg2

dsinger.__version__

'0.0.9'

In [3]:
# create your own connection
def get_connection():
    
    connection = psycopg2.connect(
        user = postgres_user,
        password = postgres_password,
        host = 'localhost',
        port = 5432,
        database = postgres_database,
    )
    return connection

connection = get_connection()

## Before use postgres.Tap

We need to create persistent class to save and pull last primary key state.

It must an object with method `pull` and `push`.

In this example, we simply write to a local file, but in deployment, it is better we store in GCS / S3 / persistent storage.

In [4]:
class Persistent:
    def __init__(self, schema, table):
        self.filename = f'{schema}-{table}'
    
    def pull(self):
        with open(self.filename) as fopen:
            return fopen.read()
        
    def push(self, data):
        with open(self.filename, 'w') as fopen:
            fopen.write(data)

In [5]:
schema = ''
table = ''
primary_key = 'updated_at'
batch_size = 100

Tap definition as here,

```python

class Tap:
    @check_type
    def __init__(
        self,
        schema: str,
        table: str,
        primary_key: str,
        connection,
        persistent,
        batch_size: int = 100,
        rest_time: int = 10,
        filter: str = '',
        debug: bool = True,
    ):

        """
        Postgres Tap using query statement. If prefer logical replication, use original Tap-Postgres from SingerIO.

        Parameters
        ----------
        schema: str
            postgres schema.
        table: str
            table name.
        primary_key: str
            column acted as primary key.
        connection: object
            psycopg2 connection object.
        persistent: object
            a python object that must has `pull` and `push` method to persist primary_key state.
        batch_size: int, (default=100)
            size of rows for each pull from postgres.
        rest_time: int, (default=10)
            rest for rest_time seconds after done pulled.
        filter: str, (default='')
            sql where statement for additional filter. Example, 'price > 0 and discount > 10', depends on table definition.
        debug: bool, (default=True)
            if true, will print important information.

        """
```

Only `schema`, `table` and `primary_key` are important.

In [6]:
persistent = Persistent(schema, table)
tap = postgres.Tap(schema, table, primary_key, connection, persistent, 
                   batch_size = batch_size, rest_time = 60)

## generate bigquery schema

If you are not confident with auto-generate schema from dynamic-singer, we can use `bigquery_schema` to generate schema based on table definition.

In [7]:
table_schema = postgres.bigquery_schema(schema, table, connection)

In [8]:
config = {
    'project_id': '',
    'dataset_id': 'dynamicsinger',
    'validate_records': False,
    'stream_data': False,
    'batch_size': batch_size
}

with open('bigquery-config.json', 'w') as fopen:
    json.dump(config, fopen)

In [9]:
source = dsinger.Source(tap, tap_name = 'example', tap_key = primary_key, tap_schema = table_schema)

In [10]:
source.add('target-bigquery --config bigquery-config.json')

In [11]:
def transformation(row):
    row['extra'] = 1
    return row, {'extra': 'integer'}

In [None]:
source.start(transformation = transformation, ignore_null = False)