In [None]:
from datetime import timedelta

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.core import Root, CreateMode
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage
from snowflake.core.table import Table, TableColumn, PrimaryKey
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from snowflake.core.warehouse import Warehouse

In [None]:
connection_params = {
    "connection_name": "default"
}

session = Session.builder.configs(connection_params).create()

In [None]:
root = Root(session)
database = root.databases['ascend_database']
schema = database.schemas['tar']

stages = root.databases[database.name].schemas[schema.name].stages
stages.create(Stage(name="TASKS_STAGE"))

In [None]:
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
  (
    session
    .table(from_table)
    .limit(count)
    .write.save_as_table(to_table)
  )
  return "Truncated table successfully created!"

def filter_by_city(session: Session, city: str) -> str:
  (
    session
    .table('ascend_database.tar."emp_basic"')
    .filter(col("city") == city)
    .limit(10)
    .write.save_as_table("filter_table")
  )
  return "Filter table successfully created!"

In [None]:
tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"

task1 = Task(
    name="task_python_api_trunc",
    definition=StoredProcedureCall(
      func=trunc,
      stage_location=f"@{tasks_stage}",
      packages=["snowflake-snowpark-python"],
    ),
    warehouse="ASCEND_WAREHOUSE",
    schedule=timedelta(minutes=1)
)

task2 = Task(
    name="task_python_api_filter",
    definition=StoredProcedureCall(
      func=filter_by_city,
      stage_location=f"@{tasks_stage}",
      packages=["snowflake-snowpark-python"],
    ),
    warehouse="ASCEND_WAREHOUSE"
)

In [None]:
# create the task in the Snowflake database
tasks = schema.tasks
trunc_task = tasks.create(task1, mode=CreateMode.or_replace)

task2.predecessors = [trunc_task.name]
filter_task = tasks.create(task2, mode=CreateMode.or_replace)

In [None]:
taskiter = tasks.iter()
for t in taskiter:
    print(t.name)

In [None]:
trunc_task.resume()

In [None]:
taskiter = tasks.iter()
for t in taskiter:
    print("Name: ", t.name, "| State: ", t.state)

In [None]:
trunc_task.suspend()

In [None]:
trunc_task.drop()
filter_task.drop()

In [None]:
dag_name = "python_api_dag"
dag = DAG(name=dag_name, schedule=timedelta(days=1))
with dag:
    dag_task1 = DAGTask(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
            func=trunc,
            stage_location=f"@{tasks_stage}",
            packages=["snowflake-snowpark-python"]),
        warehouse="ASCEND_WAREHOUSE",
    )
    dag_task2 = DAGTask(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
            func=filter_by_city,
            stage_location=f"@{tasks_stage}",
            packages=["snowflake-snowpark-python"]),
        warehouse="ASCEND_WAREHOUSE",
    )
    dag_task1 >> dag_task2
dag_op = DAGOperation(schema)
dag_op.deploy(dag, mode=CreateMode.or_replace)

In [None]:
taskiter = tasks.iter()
for t in taskiter:
    print("Name: ", t.name, "| State: ", t.state)

In [None]:
dag_op.run(dag)

In [None]:
# dag_op.drop(dag)