In [None]:
%pip install snowflake-snowpark-python
%pip install boto3

In [1]:
import json
import os
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc
import snowflake.snowpark
from snowflake.snowpark.types import IntegerType
import boto3
import logging
import sys
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.types import IntegerType

In [2]:
# Load snowflake secrets and. create the session
with open("./secrets/GZ45853.json", "r") as f:
    connection_parameters = json.loads(f.read())
test_session = Session.builder.configs(connection_parameters).create()

In [3]:
# load greenhouse apikey
with open("./secrets/greenhouse.json", "r") as f:
    greenhouse_apikey = json.loads(f.read())["api_key"]

In [4]:
bucket = "airbyte.alex"
connector = "source-pokeapi"
aws_role_arn = "arn:aws:iam::168714685353:role/snowflake-api-gateway-test"
api_gateway_url = "https://w72cfwmned.execute-api.us-west-1.amazonaws.com/stage"
api_integration_name = f"{connector}_api_integration".replace("-", "_")

In [5]:
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

In [6]:
# Create a stage to store the code
create_stage_result = test_session.sql(f"create or REPLACE stage mystage url = 's3://{bucket}'").collect()

INFO:snowflake.connector.cursor:query: [create or REPLACE stage mystage url = 's3://airbyte.alex']
INFO:snowflake.connector.cursor:query execution done


In [7]:
# Create the integration
# WARNING: The gateway's trust relationship must be updated if the api integration is recreated.
def create_api_integration(api_integration_name, aws_role_arn, api_gateway_url):
    return test_session.sql(f"""
    create or replace api integration {api_integration_name}
      api_provider = aws_api_gateway
      api_aws_role_arn = '{aws_role_arn}'
      api_allowed_prefixes = ('{api_gateway_url}')
      enabled = true;
    """).collect()
def describe_api_integration(api_integration_name):
    return test_session.sql(f"describe integration {api_integration_name}").collect()


#create_api_integration(api_integration_name, aws_role_arn, api_gateway_url)
describe_api_integration(api_integration_name)

INFO:snowflake.connector.cursor:query: [describe integration source_pokeapi_api_integration]
INFO:snowflake.connector.cursor:query execution done


[Row(property='ENABLED', property_type='Boolean', property_value='true', property_default='false'),
 Row(property='API_KEY', property_type='String', property_value='', property_default=''),
 Row(property='API_PROVIDER', property_type='String', property_value='AWS_API_GATEWAY', property_default=''),
 Row(property='API_AWS_IAM_USER_ARN', property_type='String', property_value='arn:aws:iam::147018273998:user/6o3f-s-ohss5467', property_default=''),
 Row(property='API_AWS_ROLE_ARN', property_type='String', property_value='arn:aws:iam::168714685353:role/snowflake-api-gateway-test', property_default=''),
 Row(property='API_AWS_EXTERNAL_ID', property_type='String', property_value='GZ45853_SFCRole=2_Q9B1fFjAUub69Ifq6flHOJwBZ/8=', property_default=''),
 Row(property='API_ALLOWED_PREFIXES', property_type='List', property_value='https://w72cfwmned.execute-api.us-west-1.amazonaws.com/stage', property_default='[]'),
 Row(property='API_BLOCKED_PREFIXES', property_type='List', property_value='', prope

In [8]:
# Create external function and translators

request_translator_name = "source_request_translator"
response_translator_name = "source_response_translator"
external_function_name = f"{connector.replace('-', '_')}_external_function"

# Create request_translator
# The same request translator can be used across multiple connectors
def create_request_translator(request_translator_name):
    return test_session.sql(f"""
    create or replace function {request_translator_name}(event object)
    returns object
    language javascript as
    '
    body = EVENT.body.data[0][1]
    suffixUrl = EVENT.body.data[0][2]
    return {{ "body": body, "urlSuffix": suffixUrl}};
    ';
    """).collect()

  
# Create response translator
# The same response translator can be used across multiple connectors
def create_response_translator(response_translator_name):
    return test_session.sql(f"""
    create or replace function {response_translator_name}(event object)
    returns object
    language javascript as
    '
    return {{ "body": {{ "data" : [[0, EVENT]] }}}};
    ';
    """).collect()

# Create external function
# One external function per connector
def create_external_function(external_function_name,
                             api_integration_name,
                             request_translator_name,
                             response_translator_name,
                             api_gateway_url):
    query = f"""
    create or replace external function {external_function_name}(body varchar, urlSuffix varchar)
      returns variant
      api_integration = {api_integration_name}
      request_translator = {request_translator_name}
      response_translator = {response_translator_name}
      as '{api_gateway_url}';
    """
    return test_session.sql(query).collect()

def describe_external_function(external_function_name):
    query = f"describe function {external_function_name} (varchar, varchar)"
    return test_session.sql(query).collect()

print(create_request_translator(request_translator_name))
print(create_response_translator(response_translator_name))
print(create_external_function(external_function_name, api_integration_name, request_translator_name, response_translator_name, api_gateway_url))

describe_external_function(external_function_name)

INFO:snowflake.connector.cursor:query: [create or replace function source_request_translator(event object) returns objec...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Function SOURCE_REQUEST_TRANSLATOR successfully created.')]
INFO:snowflake.connector.cursor:query: [create or replace function source_response_translator(event object) returns obje...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Function SOURCE_RESPONSE_TRANSLATOR successfully created.')]
INFO:snowflake.connector.cursor:query: [create or replace external function source_pokeapi_external_function(body varcha...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Function SOURCE_POKEAPI_EXTERNAL_FUNCTION successfully created.')]
INFO:snowflake.connector.cursor:query: [describe function source_pokeapi_external_function (varchar, varchar)]
INFO:snowflake.connector.cursor:query execution done


[Row(property='signature', value='(BODY VARCHAR, URLSUFFIX VARCHAR)'),
 Row(property='returns', value='VARIANT'),
 Row(property='language', value='EXTERNAL'),
 Row(property='null handling', value='CALLED ON NULL INPUT'),
 Row(property='volatility', value='VOLATILE'),
 Row(property='body', value='https://w72cfwmned.execute-api.us-west-1.amazonaws.com/stage'),
 Row(property='headers', value='null'),
 Row(property='context_headers', value='null'),
 Row(property='max_batch_rows', value='not set'),
 Row(property='request_translator', value='SOURCE_POKEAPI_APP.PUBLIC.SOURCE_REQUEST_TRANSLATOR'),
 Row(property='response_translator', value='SOURCE_POKEAPI_APP.PUBLIC.SOURCE_RESPONSE_TRANSLATOR'),
 Row(property='compression', value='AUTO')]

In [9]:
# Create a database to be used by the application
def create_database(source_name):
    database_query = f"""
    CREATE DATABASE if not exists {source_name}_app;
    """
    schema_query = f"""
    CREATE SCHEMA if not exists {source_name}_app.app_schema;
    """
    test_session.sql(database_query).collect()
    test_session.sql(schema_query).collect()
create_database("source_pokeapi")

INFO:snowflake.connector.cursor:query: [CREATE DATABASE if not exists source_pokeapi_app;]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE SCHEMA if not exists source_pokeapi_app.app_schema;]
INFO:snowflake.connector.cursor:query execution done


In [10]:
def list_files_to_load(connector):
    s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')

    def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
        prefix = prefix[1:] if prefix.startswith(delimiter) else prefix
        start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
        for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
            for content in page.get('Contents', ()):
                key = content['Key']
                yield key

    keys = list(keys(bucket, prefix=f"{connector}"))

    # Compute the list of files to import in the proc
    # For now we're just loading everything
    files_to_load = [f"@mystage/{k}" for k in keys if "pendulum" not in k]
    return files_to_load

In [11]:
# Create a stage to store the code
create_stage_result = test_session.sql(f"create or REPLACE stage mystage url = 's3://{bucket}'").collect()

INFO:snowflake.connector.cursor:query: [create or REPLACE stage mystage url = 's3://airbyte.alex']
INFO:snowflake.connector.cursor:query execution done


In [12]:
# Internal stored procedure that will not be exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], imports=list_files_to_load(connector), name="sync_connector_to_table", replace=True, is_permanent=True, stage_location="@mystage")
def compute(session: snowflake.snowpark.Session, to_table: str, config: dict) -> str:
    from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
    from airbyte_cdk.models import SyncMode, DestinationSyncMode
    import pandas as pd
    
    from source_pokeapi import SourcePokeapi
    logger = logging.getLogger("logger")
    source = SourcePokeapi()

    catalog = source.discover(logger, config)
    configured_catalog = ConfiguredAirbyteCatalog(
        streams=[ConfiguredAirbyteStream(stream=s, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append) for s
                 in catalog.streams])
    
    # hack: get the base url from the first stream...
    base_url = source.streams(config)[0].url_base
        
    def patch_send(session):
        import requests

        def convert_request_to_external_function_input(request: requests.PreparedRequest):
            body = request.body
            headers = request.headers
            url = request.url
            return {"body": body, "headers": headers, "url": url}

        def convert_external_function_output_to_response(output) -> requests.Response:
            response = requests.Response()
            response.status_code = 200
            actual_output = list(output[0].as_dict().items())[0][1]
            
            response_as_json = json.loads(actual_output)
            body = response_as_json["body"]
            
            response._content = json.dumps(body).encode("ascii")
            return response

        def new_session_send(self, request, **kwargs):
            # convert to external function arguments
            args = convert_request_to_external_function_input(request)

            # call external function
            if session:
                #FIXME: No error handling...
                path = "/" + args["url"].replace(base_url, "")
                output_from_external_function = session.sql(f"select source_pokeapi_external_function('{args['body']}', '{path}');").collect()
                response = convert_external_function_output_to_response(output_from_external_function)
            else:
                # This block is just fo testing...
                content = b'{"data": "hello"}'
                response = requests.Response()
                response.status_code = 200
                response._content = content
            return response

        requests.sessions.Session.send = new_session_send
    patch_send(session)
    
    # Filter the columns to avoid running out of memory :(
    keys_to_keep = ["id", "name", "base_experience", "height", "weight"]
    for m in source.read(logger, config, configured_catalog, {}):
        data = m.record.data
        #filtered_data = dict((key, value) for key, value in data.items() if key in keys_to_keep)
        session.sql(f"insert into {to_table} select parse_json('{json.dumps(data)}')").collect()
        #session.create_dataframe([filtered_data]).write.mode('append').save_as_table(to_table)
    return str(data)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:snowflake.connector.cursor:query: [ls '@mystage']
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01a7030c-0000-bc60-00...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [select package_name, version from information_schema.packages where language='py...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE OR REPLACE PROCEDURE sync_connector_to_table(arg1 STRING,arg2 OBJECT) RET...]
INFO:snowflake.connector.cursor:query execution done


In [13]:
# reset output table
test_session.sql("create table if not exists public.test_pokeapi (data variant)").collect()
test_session.sql("delete from public.test_pokeapi;").collect()

INFO:snowflake.connector.cursor:query: [create table if not exists public.test_pokeapi (data variant)]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [delete from public.test_pokeapi;]
INFO:snowflake.connector.cursor:query execution done


[Row(number of rows deleted=4)]

In [14]:
test_session.call("sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "articuno"})
test_session.call("sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "zapdos"})
test_session.call("sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "moltres"})

INFO:snowflake.connector.cursor:query: [CALL sync_connector_to_table('public.test_pokeapi', parse_json('{"pokemon_name":...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CALL sync_connector_to_table('public.test_pokeapi', parse_json('{"pokemon_name":...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CALL sync_connector_to_table('public.test_pokeapi', parse_json('{"pokemon_name":...]
INFO:snowflake.connector.cursor:query execution done


"{'abilities': [{'ability': {'name': 'pressure', 'url': 'https://pokeapi.co/api/v2/ability/46/'}, 'is_hidden': False, 'slot': 1}, {'ability': {'name': 'flame-body', 'url': 'https://pokeapi.co/api/v2/ability/49/'}, 'is_hidden': True, 'slot': 3}], 'base_experience': 290, 'forms': [{'name': 'moltres', 'url': 'https://pokeapi.co/api/v2/pokemon-form/146/'}], 'game_indices': [{'game_index': 73, 'version': {'name': 'red', 'url': 'https://pokeapi.co/api/v2/version/1/'}}, {'game_index': 73, 'version': {'name': 'blue', 'url': 'https://pokeapi.co/api/v2/version/2/'}}, {'game_index': 73, 'version': {'name': 'yellow', 'url': 'https://pokeapi.co/api/v2/version/3/'}}, {'game_index': 146, 'version': {'name': 'gold', 'url': 'https://pokeapi.co/api/v2/version/4/'}}, {'game_index': 146, 'version': {'name': 'silver', 'url': 'https://pokeapi.co/api/v2/version/5/'}}, {'game_index': 146, 'version': {'name': 'crystal', 'url': 'https://pokeapi.co/api/v2/version/6/'}}, {'game_index': 146, 'version': {'name': 'r

In [15]:
test_session.sql("select * from public.test_pokeapi").collect()

INFO:snowflake.connector.cursor:query: [select * from public.test_pokeapi]
INFO:snowflake.connector.cursor:query execution done


[Row(DATA='{\n  "abilities": [\n    {\n      "ability": {\n        "name": "pressure",\n        "url": "https://pokeapi.co/api/v2/ability/46/"\n      },\n      "is_hidden": false,\n      "slot": 1\n    },\n    {\n      "ability": {\n        "name": "snow-cloak",\n        "url": "https://pokeapi.co/api/v2/ability/81/"\n      },\n      "is_hidden": true,\n      "slot": 3\n    }\n  ],\n  "base_experience": 290,\n  "forms": [\n    {\n      "name": "articuno",\n      "url": "https://pokeapi.co/api/v2/pokemon-form/144/"\n    }\n  ],\n  "game_indices": [\n    {\n      "game_index": 74,\n      "version": {\n        "name": "red",\n        "url": "https://pokeapi.co/api/v2/version/1/"\n      }\n    },\n    {\n      "game_index": 74,\n      "version": {\n        "name": "blue",\n        "url": "https://pokeapi.co/api/v2/version/2/"\n      }\n    },\n    {\n      "game_index": 74,\n      "version": {\n        "name": "yellow",\n        "url": "https://pokeapi.co/api/v2/version/3/"\n      }\n    }

In [20]:
test_session.sql("create table if not exists source_pokeapi_app.app_schema.configs (consumer_id varchar, output_table varchar, config variant);").collect()

INFO:snowflake.connector.cursor:query: [create table if not exists source_pokeapi_app.app_schema.configs (consumer_id va...]
INFO:snowflake.connector.cursor:query execution done


[Row(status='CONFIGS already exists, statement succeeded.')]

In [21]:
# Let's insert and read a config to/from a table
def insert_or_update_config(config_table, consumer_id, output_table, config):
    test_session.sql(f"""
    merge into {config_table} a using (select '{consumer_id}' as consumer_id, '{output_table}' as output_table, parse_json('{json.dumps(config)}') as config) as b on a.CONSUMER_ID=b.CONSUMER_ID
      when matched then update set a.config=b.config
      when not matched then insert (consumer_id, output_table, config) values (b.CONSUMER_ID, b.output_table, b.config);
""").collect()
insert_or_update_config("source_pokeapi_app.app_schema.configs", 'id0', "public.test_pokeapi", {"pokemon_name": "abomasnow"})

INFO:snowflake.connector.cursor:query: [merge into source_pokeapi_app.app_schema.configs a using (select 'id0' as consum...]
INFO:snowflake.connector.cursor:query execution done


In [22]:
test_session.sql("select * from source_pokeapi_app.app_schema.configs;").collect()

INFO:snowflake.connector.cursor:query: [select * from source_pokeapi_app.app_schema.configs;]
INFO:snowflake.connector.cursor:query execution done


[Row(CONSUMER_ID='id0', OUTPUT_TABLE='public.test_pokeapi', CONFIG='{\n  "pokemon_name": "abomasnow"\n}')]

In [23]:
# Internal stored procedure that will not be exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], name="sync_consumer_id", replace=True, is_permanent=True, stage_location="@mystage")
def compute_consumer(session: snowflake.snowpark.Session, consumer_id: str) -> str:
    row = session.sql(f"select output_table, config from source_pokeapi_app.app_schema.configs where consumer_id = '{consumer_id}';").collect()[0]
    output_table = row["OUTPUT_TABLE"]
    config = json.loads(row["CONFIG"])
    return session.call("sync_connector_to_table", output_table, config)

INFO:snowflake.connector.cursor:query: [ls '@mystage']
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01a702e9-0000-bc60-00...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [select package_name, version from information_schema.packages where language='py...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE OR REPLACE PROCEDURE sync_consumer_id(arg1 STRING) RETURNS STRING LANGUAG...]
INFO:snowflake.connector.cursor:query execution done


In [24]:
test_session.call("sync_consumer_id", "id0")

INFO:snowflake.connector.cursor:query: [CALL sync_consumer_id('id0')]
INFO:snowflake.connector.cursor:query execution done




In [25]:
test_session.sql("select * from public.test_pokeapi").collect()

INFO:snowflake.connector.cursor:query: [select * from public.test_pokeapi]
INFO:snowflake.connector.cursor:query execution done


[Row(DATA='{\n  "pokemon_name": "articuno"\n}'),
 Row(DATA='{\n  "pokemon_name": "zapdos"\n}'),
 Row(DATA='{\n  "pokemon_name": "moltres"\n}'),
 Row(DATA='{\n  "pokemon_name": "abomasnow"\n}')]

In [26]:
test_session.sql("delete from source_pokeapi_app.app_schema.configs").collect()

INFO:snowflake.connector.cursor:query: [delete from source_pokeapi_app.app_schema.configs]
INFO:snowflake.connector.cursor:query execution done


[Row(number of rows deleted=1)]

In [27]:
# Procedure that WILL BE exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], name="register_config", replace=True, is_permanent=True, stage_location="@mystage")
def register_config(session: snowflake.snowpark.Session, consumer_id: str, config: dict, output_table: str) -> str:
    def insert_or_update_config(session, config_table, consumer_id, output_table, config):
        session.sql(f"""
        merge into {config_table} a using (select '{consumer_id}' as consumer_id, '{output_table}' as output_table, parse_json('{json.dumps(config)}') as config) as b on a.CONSUMER_ID=b.CONSUMER_ID
          when matched then update set a.config=b.config
          when not matched then insert (consumer_id, output_table, config) values (b.CONSUMER_ID, b.output_table, b.config);
    """).collect()
    insert_or_update_config(session, "source_pokeapi_app.app_schema.configs", consumer_id, output_table, config)
    return "REGISTER SUCCESS"

INFO:snowflake.connector.cursor:query: [ls '@mystage']
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01a702eb-0000-bc60-00...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [select package_name, version from information_schema.packages where language='py...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE OR REPLACE PROCEDURE register_config(arg1 STRING,arg2 OBJECT,arg3 STRING)...]
INFO:snowflake.connector.cursor:query execution done


In [28]:
# Procedule that WILL BE exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], name="schedule_job", replace=True, is_permanent=True, stage_location="@mystage")
def schedule_job(session: snowflake.snowpark.Session, consumer_id: str, warehouse_name: str) -> str:
    create_task_command = f"""
    CREATE OR REPLACE TASK sync warehouse = "{warehouse_name}"
    SCHEDULE = '5 MINUTE'
    as call sync_consumer_id('{consumer_id}')
    """
    session.sql(create_task_command).collect()
    return "REGISTER SUCCESS"

INFO:snowflake.connector.cursor:query: [ls '@mystage']
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01a702eb-0000-bc60-00...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [select package_name, version from information_schema.packages where language='py...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE OR REPLACE PROCEDURE schedule_job(arg1 STRING,arg2 STRING) RETURNS STRING...]
INFO:snowflake.connector.cursor:query execution done


In [29]:
# Creating DB roles for the Snowflake Native Application
database_role = "source_pokeapi_app.shared_db_role"
print(test_session.sql(f"create or replace database role {database_role}").collect())

INFO:snowflake.connector.cursor:query: [create or replace database role source_pokeapi_app.shared_db_role]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Role SOURCE_POKEAPI_APP.SHARED_DB_ROLE successfully created.')]


In [30]:
# Grant usage
print(test_session.sql(f"grant usage on database source_pokeapi_app to database role {database_role};").collect())
print(test_session.sql(f"grant usage on schema source_pokeapi_app.app_schema to database role {database_role};").collect())
print(test_session.sql(f"grant usage on procedure register_config(string, object, string) to database role {database_role}").collect())

INFO:snowflake.connector.cursor:query: [grant usage on database source_pokeapi_app to database role source_pokeapi_app.s...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on schema source_pokeapi_app.app_schema to database role source_poke...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on procedure register_config(string, object, string) to database rol...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]


In [32]:
# Create hiddent db role
hidden_db_role = "source_pokeapi_app.hidden_db_role"
print(test_session.sql(f"create or replace database role {hidden_db_role}").collect())
print(test_session.sql(f"grant usage on database source_pokeapi_app to database role {hidden_db_role}").collect())
print(test_session.sql(f"grant usage on schema source_pokeapi_app.app_schema to database role {hidden_db_role};").collect())
print(test_session.sql(f"grant usage on procedure sync_consumer_id(string) to database role {hidden_db_role};").collect())
print(test_session.sql(f"grant usage on procedure sync_connector_to_table(string, object) to database role {hidden_db_role};").collect())

INFO:snowflake.connector.cursor:query: [create or replace database role source_pokeapi_app.hidden_db_role]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Role SOURCE_POKEAPI_APP.HIDDEN_DB_ROLE successfully created.')]
INFO:snowflake.connector.cursor:query: [grant usage on database source_pokeapi_app to database role source_pokeapi_app.h...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on schema source_pokeapi_app.app_schema to database role source_poke...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on procedure sync_consumer_id(string) to database role source_pokeap...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on procedure sync_connector_to

In [41]:
test_session.call("SOURCE_POKEAPI_APP.APP_SCHEMA.register_config", "id0", {"pokemon_name": "articuno"}, "public.test_pokeapi")

INFO:snowflake.connector.cursor:query: [CALL SOURCE_POKEAPI_APP.APP_SCHEMA.register_config('id0', parse_json('{"pokemon_...]
INFO:snowflake.connector.cursor:query execution done


'REGISTER SUCCESS'

In [44]:
test_session.sql("describe procedure sync_consumer_id(string)").collect()

INFO:snowflake.connector.cursor:query: [describe procedure sync_consumer_id(string)]
INFO:snowflake.connector.cursor:query execution done


[Row(property='signature', value='(ARG1 VARCHAR)'),
 Row(property='returns', value='VARCHAR(16777216)'),
 Row(property='language', value='PYTHON'),
 Row(property='null handling', value='CALLED ON NULL INPUT'),
 Row(property='volatility', value='VOLATILE'),
 Row(property='execute as', value='OWNER'),
 Row(property='body', value="\nimport pickle\n\nfunc = pickle.loads(bytes.fromhex('8005953e030000000000008c17636c6f75647069636b6c652e636c6f75647069636b6c65948c0d5f6275696c74696e5f747970659493948c0a4c616d6264615479706594859452942868028c08436f6465547970659485945294284b024b004b004b054b054b43433e7c00a00064017c019b0064029d03a101a001a100640319007d027c02640419007d037402a0037c0264051900a1017d047c00a00464067c037c04a103530094284e8c5c73656c656374206f75747075745f7461626c652c20636f6e6669672066726f6d20736f757263655f706f6b656170695f6170702e6170705f736368656d612e636f6e6669677320776865726520636f6e73756d65725f6964203d2027948c02273b944b008c0c4f55545055545f5441424c45948c06434f4e464947948c1773796e635f636f6e6e65

In [45]:
test_session.call("schedule_job", "id0", "COMPUTE_WH")

INFO:snowflake.connector.cursor:query: [CALL schedule_job('id0', 'COMPUTE_WH')]
INFO:snowflake.connector.cursor:query execution done


'REGISTER SUCCESS'

In [46]:
test_session.sql("describe task sync").collect()

INFO:snowflake.connector.cursor:query: [describe task sync]
INFO:snowflake.connector.cursor:query execution done


[Row(created_on=datetime.datetime(2022, 9, 16, 10, 46, 59, 691000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), name='SYNC', id='01a7030a-c9fc-6436-0000-00000000000d', database_name='SOURCE_POKEAPI_APP', schema_name='PUBLIC', owner='ACCOUNTADMIN', comment='', warehouse='COMPUTE_WH', schedule='5 MINUTE', predecessors='[]', state='suspended', definition="call sync_consumer_id('id0')", condition=None, allow_overlapping_execution='false', error_integration='null', last_committed_on=None, last_suspended_on=None)]

In [47]:
test_session.sql("ALTER TASK sync resume;").collect()

INFO:snowflake.connector.cursor:query: [ALTER TASK sync resume;]
INFO:snowflake.connector.cursor:query execution done


[Row(status='Statement executed successfully.')]

In [48]:
test_session.sql("describe task sync").collect()

INFO:snowflake.connector.cursor:query: [describe task sync]
INFO:snowflake.connector.cursor:query execution done


[Row(created_on=datetime.datetime(2022, 9, 16, 10, 46, 59, 691000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), name='SYNC', id='01a7030a-c9fc-6436-0000-00000000000d', database_name='SOURCE_POKEAPI_APP', schema_name='PUBLIC', owner='ACCOUNTADMIN', comment='', warehouse='COMPUTE_WH', schedule='5 MINUTE', predecessors='[]', state='started', definition="call sync_consumer_id('id0')", condition=None, allow_overlapping_execution='false', error_integration='null', last_committed_on=datetime.datetime(2022, 9, 16, 10, 47, 0, 268000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), last_suspended_on=None)]

In [49]:
test_session.sql("select * from source_pokeapi_app.app_schema.configs").collect()

INFO:snowflake.connector.cursor:query: [select * from source_pokeapi_app.app_schema.configs]
INFO:snowflake.connector.cursor:query execution done


[Row(CONSUMER_ID='id0', OUTPUT_TABLE='public.test_pokeapi', CONFIG='{\n  "pokemon_name": "articuno"\n}')]

In [50]:
test_session.sql("select * from public.test_pokeapi").collect()

INFO:snowflake.connector.cursor:query: [select * from public.test_pokeapi]
INFO:snowflake.connector.cursor:query execution done


[Row(DATA='{\n  "pokemon_name": "articuno"\n}'),
 Row(DATA='{\n  "pokemon_name": "zapdos"\n}'),
 Row(DATA='{\n  "pokemon_name": "moltres"\n}'),
 Row(DATA='{\n  "pokemon_name": "abomasnow"\n}')]

In [52]:
# Creating an installer script
# FIXME: I think the schema should be specific to the connector...
test_session.sql(f"""
CREATE OR REPLACE PROCEDURE source_pokeapi_app.app_schema.installer()
RETURNS STRING
LANGUAGE SQL
EXECUTE AS OWNER
AS $$
  begin
    CREATE SCHEMA app_schema;
    CREATE TABLE app_schema.output_table(data variant);
    GRANT USAGE ON schema app_schema TO DATABASE ROLE APP_EXPORTER;
    GRANT SELECT ON TABLE app_schema.output_table TO
    DATABASE ROLE APP_EXPORTER;
    GRANT DATABASE ROLE shared_db_role TO DATABASE ROLE APP_EXPORTER;
    return 'installer script Done';
  end;
$$;
""").collect()

INFO:snowflake.connector.cursor:query: [CREATE OR REPLACE PROCEDURE source_pokeapi_app.app_schema.installer() RETURNS ST...]
INFO:snowflake.connector.cursor:query execution done


[Row(status='Function INSTALLER successfully created.')]

In [56]:
# Setup a share for the application
test_session.sql(f"""
CREATE SHARE IF NOT EXISTS source_pokeapi_share installer = source_pokeapi_app.app_schema.installer();
""").collect()

INFO:snowflake.connector.cursor:query: [CREATE SHARE IF NOT EXISTS source_pokeapi_share installer = source_pokeapi_app.a...]
INFO:snowflake.connector.cursor:query execution done


[Row(status='SOURCE_POKEAPI_SHARE already exists, statement succeeded.')]

In [57]:
# Add objects to the share
print(test_session.sql(f"""grant usage on database source_pokeapi_app to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant usage on schema source_pokeapi_app.app_schema to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant usage on procedure source_pokeapi_app.app_schema.installer() to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant database role source_pokeapi_app.hidden_db_role to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant database role source_pokeapi_app.shared_db_role to share source_pokeapi_share;""").collect())

INFO:snowflake.connector.cursor:query: [grant usage on database source_pokeapi_app to share source_pokeapi_share;]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on schema source_pokeapi_app.app_schema to share source_pokeapi_shar...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant usage on procedure source_pokeapi_app.app_schema.installer() to share sour...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant database role source_pokeapi_app.hidden_db_role to share source_pokeapi_sh...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Statement executed successfully.')]
INFO:snowflake.connector.cursor:query: [grant database role source_pokeapi_app.shared_db_role to share

In [58]:
# Open customer session...
with open("./secrets/config.json", "r") as f:
    connection_parameters = json.loads(f.read())
customer_session = Session.builder.configs(connection_parameters).create()

INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.12, Python Version: 3.8.9, Platform: macOS-12.2.1-arm64-arm-64bit
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
INFO:snowflake.snowpark.session:Snowpark Session information: 
"version" : 0.9.0,
"python.version" : 3.8.9,
"python.connector.version" : 2.7.12,
"python.connector.session.id" : 774769150677010,
"os.name" : Darwin

