### Import Libraries

In [2]:
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION
# Misc
import json
import sys
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

In [3]:
# Create Snowflake Session object
connection_parameters = {
"account": "snowhouse",
"user": "sapatra",
"authenticator":"externalbrowser",
"role": "solution_architect",
"warehouse": "snowadhoc",
"database": "temp",
"schema": "util"
}
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))

User                        : SAPATRA
Role                        : "SOLUTION_ARCHITECT"
Warehouse                   : "SNOWADHOC"
Snowflake version           : 7.11.6
Snowpark for Python version : 1.0.0
Database                    : "TEMP"
Schema                      : "UTIL"


In [128]:
def account_usage_view_deploy(session: Session , deployment_name : str ,replication_group_value : str , schema_name : str ) -> str:   
    pandas_df = session.table('temp.util.account_usage_view_mapping').toPandas()
    #session.use_schema(schema_name)
    #session.sql(f"""use schema {schema_name}""").collect()
    try:
        for index,row in pandas_df.iterrows():
            vw_def=row['VIEW_DEFINITION']
            vw_name=row['VIEW_NAME']
            vw_name_schema=f"{schema_name}.{vw_name}"
            new_vw_def=vw_def.replace(vw_name,vw_name_schema)
            new_vw_def=new_vw_def.replace('deployment_name',deployment_name)
            new_vw_def=new_vw_def.replace('replication_group_value',replication_group_value)
            session.sql(new_vw_def).collect()
    except:
        print("error executin", sys.exc_info()[0], "occurred.")
    else:
        return new_vw_def
        
session.sproc.register(
  func=account_usage_view_deploy,
  name="account_usage_view_deploy",
  packages=['snowflake-snowpark-python'],
  is_permanent=True,
  #execute_as="CALLER",
  stage_location="@account_usage_sproc",
  replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7ff028e5dc70>

In [111]:
pandas_df = session.table('temp.util.account_usage_view_mapping').toPandas()
schema='spatra'
for index,row in pandas_df.iterrows():
    vw_def=row['VIEW_DEFINITION']
    vw_name=row['VIEW_NAME']
    vw_name_schema=f"{schema}.{vw_name}"
    def1=vw_def.replace('deployment_name','ie')
    #def2=def1.str.lstrip('create or replace  view')
    new_vw_def=def1.replace(vw_name,vw_name_schema)
    print(new_vw_def)
    #print(vw_name_schema)
    #new_vw_def=new_vw_def.replace('replication_group_value',replication_group_value)

create or replace  view spatra.WAREHOUSE_EVENTS_HISTORY(
	TIMESTAMP,
	WAREHOUSE_ID,
	WAREHOUSE_NAME,
	CLUSTER_NUMBER,
	EVENT_NAME,
	EVENT_REASON,
	EVENT_STATE,
	USER_NAME,
	ROLE_NAME,
	QUERY_ID,
    ACCOUNT_ID,
    ACCOUNT_NAME
) as
with cust_accounts as (select id from snowhouse_import.ie.account_etl_v where replication_group = 'replication_group_value')
    , event_types AS
    (
        SELECT index::int AS id,
          value::string AS name
        FROM table(flatten(input => SELECT parse_json(system$dump_enum('WarehouseLogEventType'))))
    ),
    event_reasons AS
    (
        SELECT index::int AS id,
             value::string AS name
        FROM table(flatten(input => SELECT parse_json(system$dump_enum('WarehouseOperationReasons'))))
    )
     SELECT 
       (warehouselog.dpo:"WarehouseLogDPO:primary".timestamp::int/1000)::timestamp_ltz AS timestamp,
       SNOWHOUSE.PS_TAM.extract_local_id(warehouselog.dpo:"WarehouseLogDPO:primary".warehouseId::int) AS warehouse_id,
       

In [129]:
print('Schema                      : {}'.format(session.get_current_schema()))

Schema                      : "UTIL"


In [4]:
session.sql("call account_usage_view_deploy('ie','1015_5bd69c04-5f11-49fa-844b-55fb5cebd9a0','spatra')").collect()

[Row(ACCOUNT_USAGE_VIEW_DEPLOY='\ncreate or replace  view spatra.LOGIN_HISTORY(\n\tEVENT_ID,\n\tEVENT_TIMESTAMP,\n\tEVENT_TYPE,\n\tUSER_NAME,\n\tCLIENT_IP,\n\tREPORTED_CLIENT_TYPE,\n\tREPORTED_CLIENT_VERSION,\n\tFIRST_AUTHENTICATION_FACTOR,\n\tSECOND_AUTHENTICATION_FACTOR,\n\tIS_SUCCESS,\n\tERROR_CODE,\n\tERROR_MESSAGE,\n\tRELATED_EVENT_ID,\n\tCONNECTION,\n    ACCOUNT_ID,\n    ACCOUNT_NAME\n) as\n    with \ncust_accounts as (select id,name from snowhouse_import.ie.account_etl_v where replication_group = \'1015_5bd69c04-5f11-49fa-844b-55fb5cebd9a0\'), \n    error_code as (\n        select key::int as id\n          , value::string as error_code\n        from table(flatten(input => select parse_json(system$dump_enum(\'ErrorCode\')) as error_code))\n      )\n    , client_type as (\n        select key::int as id\n          , value::string as client_type\n        from table(flatten(input => select parse_json(system$dump_enum(\'ClientType\')) as client_type))\n      )\n    , authn_factor_type