In [1]:
from datetime import timedelta

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.core import Root, CreateMode
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage
from snowflake.core.table import Table, TableColumn, PrimaryKey
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from snowflake.core.warehouse import Warehouse #abcabc


In [2]:
connection_params = {
    "connection_name": "default"
}

In [3]:
session = Session.builder.configs(connection_params).create()

In [4]:
connection_params

{'connection_name': 'default'}

In [5]:
root = Root(session)

In [6]:
database = root.databases.create(
  Database(
    name="PYTHON_API_DB"),
    mode=CreateMode.or_replace
  )

In [7]:
schema = database.schemas.create(
  Schema(
    name="PYTHON_API_SCHEMA"),
    mode=CreateMode.or_replace,
  )

In [8]:
table = schema.tables.create(
  Table(
    name="PYTHON_API_TABLE",
    columns=[
      TableColumn(
        name="TEMPERATURE",
        datatype="int",
        nullable=False,
      ),
      TableColumn(
        name="LOCATION",
        datatype="string",
      ),
    ],
  ),
mode=CreateMode.or_replace
)

In [9]:
table_details = table.fetch()
table_details.to_dict()

{'name': 'PYTHON_API_TABLE',
 'kind': 'PERMANENT',
 'enable_schema_evolution': False,
 'change_tracking': False,
 'data_retention_time_in_days': 1,
 'max_data_extension_time_in_days': 14,
 'default_ddl_collation': '',
 'columns': [{'name': 'TEMPERATURE',
   'datatype': 'NUMBER(38,0)',
   'nullable': False},
  {'name': 'LOCATION', 'datatype': 'VARCHAR(16777216)', 'nullable': True}],
 'created_on': datetime.datetime(2024, 10, 30, 0, 9, 24, 65000, tzinfo=TzInfo(UTC)),
 'database_name': 'PYTHON_API_DB',
 'schema_name': 'PYTHON_API_SCHEMA',
 'rows': 0,
 'bytes': 0,
 'owner': 'ACCOUNTADMIN',
 'automatic_clustering': False,
 'search_optimization': False,
 'owner_role_type': 'ROLE',
 'table_type': 'NORMAL'}

In [10]:
table_details.columns.append(
    TableColumn(
      name="elevation",
      datatype="int",
      nullable=False,
      constraints=[PrimaryKey()],
    )
)

In [11]:
table.create_or_alter(table_details)

In [12]:
table.fetch().to_dict()

{'name': 'PYTHON_API_TABLE',
 'kind': 'PERMANENT',
 'enable_schema_evolution': False,
 'change_tracking': False,
 'data_retention_time_in_days': 1,
 'max_data_extension_time_in_days': 14,
 'default_ddl_collation': '',
 'columns': [{'name': 'TEMPERATURE',
   'datatype': 'NUMBER(38,0)',
   'nullable': False},
  {'name': 'LOCATION', 'datatype': 'VARCHAR(16777216)', 'nullable': True},
  {'name': 'ELEVATION', 'datatype': 'NUMBER(38,0)', 'nullable': False}],
 'constraints': [{'name': 'ELEVATION',
   'column_names': ['ELEVATION'],
   'constraint_type': 'PRIMARY KEY'}],
 'created_on': datetime.datetime(2024, 10, 30, 0, 9, 24, 65000, tzinfo=TzInfo(UTC)),
 'database_name': 'PYTHON_API_DB',
 'schema_name': 'PYTHON_API_SCHEMA',
 'rows': 0,
 'bytes': 0,
 'owner': 'ACCOUNTADMIN',
 'automatic_clustering': False,
 'search_optimization': False,
 'owner_role_type': 'ROLE',
 'table_type': 'NORMAL'}

In [13]:
warehouses = root.warehouses


In [14]:
python_api_wh = Warehouse(
    name="PYTHON_API_WH",
    warehouse_size="SMALL",
    auto_suspend=500,
)

warehouse = warehouses.create(python_api_wh,mode=CreateMode.or_replace)

In [15]:
warehouse_details = warehouse.fetch()
warehouse_details.to_dict()

{'name': 'PYTHON_API_WH',
 'warehouse_type': 'STANDARD',
 'warehouse_size': 'Small',
 'max_cluster_count': 1,
 'min_cluster_count': 1,
 'scaling_policy': 'STANDARD',
 'auto_suspend': 500,
 'auto_resume': 'true',
 'enable_query_acceleration': 'false',
 'query_acceleration_max_scale_factor': 8,
 'max_concurrency_level': 8,
 'statement_queued_timeout_in_seconds': 0,
 'statement_timeout_in_seconds': 172800,
 'type': 'STANDARD',
 'size': 'Small',
 'state': 'STARTED',
 'started_clusters': 1,
 'running': 0,
 'queued': 0,
 'is_default': False,
 'is_current': True,
 'available': ' 100',
 'provisioning': '0',
 'quiescing': '0',
 'other': '0',
 'created_on': datetime.datetime(1970, 1, 21, 0, 37, 26, 965000, tzinfo=TzInfo(UTC)),
 'resumed_on': datetime.datetime(1970, 1, 21, 0, 37, 26, 965000, tzinfo=TzInfo(UTC)),
 'updated_on': datetime.datetime(1970, 1, 21, 0, 37, 26, 965000, tzinfo=TzInfo(UTC)),
 'owner': 'ACCOUNTADMIN',
 'owner_role_type': 'ROLE'}

In [16]:
warehouse_list = warehouses.iter(like="PYTHON_API_WH")
result = next(warehouse_list)
result.to_dict()

{'name': 'PYTHON_API_WH',
 'warehouse_type': 'STANDARD',
 'warehouse_size': 'Small',
 'max_cluster_count': 1,
 'min_cluster_count': 1,
 'scaling_policy': 'STANDARD',
 'auto_suspend': 500,
 'auto_resume': 'true',
 'enable_query_acceleration': 'false',
 'query_acceleration_max_scale_factor': 8,
 'max_concurrency_level': 8,
 'statement_queued_timeout_in_seconds': 0,
 'statement_timeout_in_seconds': 172800,
 'type': 'STANDARD',
 'size': 'Small',
 'state': 'STARTED',
 'started_clusters': 1,
 'running': 0,
 'queued': 0,
 'is_default': False,
 'is_current': True,
 'available': ' 100',
 'provisioning': '0',
 'quiescing': '0',
 'other': '0',
 'created_on': datetime.datetime(1970, 1, 21, 0, 37, 26, 965000, tzinfo=TzInfo(UTC)),
 'resumed_on': datetime.datetime(1970, 1, 21, 0, 37, 26, 965000, tzinfo=TzInfo(UTC)),
 'updated_on': datetime.datetime(1970, 1, 21, 0, 37, 26, 965000, tzinfo=TzInfo(UTC)),
 'owner': 'ACCOUNTADMIN',
 'owner_role_type': 'ROLE'}

In [17]:
warehouse.fetch().size

'Small'

In [18]:
#warehouse.drop()

In [19]:
stages = root.databases[database.name].schemas[schema.name].stages
stages.create(Stage(name="TASKS_STAGE"))

<snowflake.core.stage._stage.StageResource at 0x1394f8ac0>

In [20]:
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
  (
    session
    .table(from_table)
    .limit(count)
    .write.save_as_table(to_table)
  )
  return "Truncated table successfully created!"

def filter_by_shipmode(session: Session, mode: str) -> str:
  (
    session
    .table("snowflake_sample_data.tpch_sf100.lineitem")
    .filter(col("L_SHIPMODE") == mode)
    .limit(10)
    .write.save_as_table("filter_table")
  )
  return "Filter table successfully created!"

In [21]:
#session.sql('use warehouse compute_wh');

In [22]:
tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"

task1 = Task(
    name="task_python_api_trunc",
    definition=StoredProcedureCall(
      func=trunc,
      stage_location=f"@{tasks_stage}",
      packages=["snowflake-snowpark-python"],
    ),
    warehouse="COMPUTE_WH",
    schedule=timedelta(minutes=1)
)

task2 = Task(
    name="task_python_api_filter",
    definition=StoredProcedureCall(
      func=filter_by_shipmode,
      stage_location=f"@{tasks_stage}",
      packages=["snowflake-snowpark-python"],
    ),
    warehouse="COMPUTE_WH"
)

In [23]:
# create the task in the Snowflake database
session.sql('use warehouse compute_wh')

tasks = schema.tasks
trunc_task = tasks.create(task1, mode=CreateMode.or_replace)

task2.predecessors = [trunc_task.name]
filter_task = tasks.create(task2, mode=CreateMode.or_replace)

The version of package 'snowflake-snowpark-python' in the local environment is 1.24.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.
The version of package 'snowflake-snowpark-python' in the local environment is 1.24.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.


In [26]:
taskiter = tasks.iter()
for t in taskiter:
    print(t.name)

TASK_PYTHON_API_FILTER
TASK_PYTHON_API_TRUNC


In [27]:
trunc_task.resume()

In [28]:
taskiter = tasks.iter()
for t in taskiter:
    print("Name: ", t.name, "| State: ", t.state)

Name:  TASK_PYTHON_API_FILTER | State:  suspended
Name:  TASK_PYTHON_API_TRUNC | State:  started


In [29]:
trunc_task.suspend()

In [30]:
trunc_task.drop()
filter_task.drop()

In [31]:
dag_name = "python_api_dag"
dag = DAG(name=dag_name, schedule=timedelta(days=1))
with dag:
    dag_task1 = DAGTask(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
            func=trunc,
            stage_location=f"@{tasks_stage}",
            packages=["snowflake-snowpark-python"]),
        warehouse="COMPUTE_WH",
    )
    dag_task2 = DAGTask(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
            func=filter_by_shipmode,
            stage_location=f"@{tasks_stage}",
            packages=["snowflake-snowpark-python"]),
        warehouse="COMPUTE_WH",
    )
    dag_task1 >> dag_task2
dag_op = DAGOperation(schema)
dag_op.deploy(dag, mode=CreateMode.or_replace)

The version of package 'snowflake-snowpark-python' in the local environment is 1.24.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.
The version of package 'snowflake-snowpark-python' in the local environment is 1.24.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.


In [32]:
taskiter = tasks.iter()
for t in taskiter:
    print("Name: ", t.name, "| State: ", t.state)

Name:  PYTHON_API_DAG | State:  started
Name:  PYTHON_API_DAG$TASK_PYTHON_API_FILTER | State:  started
Name:  PYTHON_API_DAG$TASK_PYTHON_API_TRUNC | State:  started


In [33]:
dag_op.run(dag)

In [34]:
dag_op.drop(dag)

In [None]:
#database.drop()