In [11]:
from datetime import timedelta

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.core import Root, CreateMode
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage
from snowflake.core.table import Table, TableColumn, PrimaryKey
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from snowflake.core.warehouse import Warehouse

session = Session.builder.configs({"connection_name": "default"}).create()

root = Root(session)


database = root.databases.create(
  Database(
    name="PYTHON_API_DB"),
    mode=CreateMode.if_not_exists
  )


In [12]:
schema = database.schemas.create(
  Schema(
    name="PYTHON_API_SCHEMA"),
    mode=CreateMode.if_not_exists,
  )

table = schema.tables.create(
  Table(
    name="PYTHON_API_TABLE",
    columns=[
      TableColumn(
        name="TEMPERATURE",
        datatype="int",
        nullable=False,
      ),
      TableColumn(
        name="LOCATION",
        datatype="string",
      ),
    ],
  ),
mode=CreateMode.if_not_exists
)



In [13]:
table_details = table.fetch()
print("table_details: ", table_details)

table_details.to_dict()
print("table_details as dict: ", table_details.to_dict())

table_details:  name='PYTHON_API_TABLE' kind='PERMANENT' cluster_by=None enable_schema_evolution=False change_tracking=False data_retention_time_in_days=1 max_data_extension_time_in_days=14 default_ddl_collation='' columns=[TableColumn(name='TEMPERATURE', datatype='NUMBER(38,0)', nullable=False, collate=None, default=None, autoincrement=None, autoincrement_start=None, autoincrement_increment=None, constraints=None, comment=None), TableColumn(name='LOCATION', datatype='VARCHAR(16777216)', nullable=True, collate=None, default=None, autoincrement=None, autoincrement_start=None, autoincrement_increment=None, constraints=None, comment=None)] constraints=None comment=None created_on=datetime.datetime(2026, 1, 26, 7, 12, 40, 609000, tzinfo=TzInfo(0)) database_name='PYTHON_API_DB' schema_name='PYTHON_API_SCHEMA' rows=0 bytes=0 owner='ACCOUNTADMIN' dropped_on=None automatic_clustering=False search_optimization=False search_optimization_progress=None search_optimization_bytes=None owner_role_typ

In [14]:
# try:
#     table_details.columns.append(
#         TableColumn(
#             name="elevation",
#             datatype="int",
#             nullable=False,
#            constraints=[PrimaryKey()],
#         )
#     )
#     table.create_or_alter(table_details)
# except Exception as e:
#     print("An error occurred:", e)

In [15]:
# table_details = table.fetch()
# existing = {c.name.lower() for c in table_details.columns}

# if "elevation" not in existing:
#     table_details.columns.append(
#         TableColumn(
#             name="ELEVATION",
#             datatype="int",
#             nullable=True,   # safer if table might already have rows
#         )
#     )
#     table.create_or_alter(table_details)
# else:
#     print("elevation already exists, skipping")


In [16]:

table.fetch().to_dict()

{'name': 'PYTHON_API_TABLE',
 'kind': 'PERMANENT',
 'enable_schema_evolution': False,
 'change_tracking': False,
 'data_retention_time_in_days': 1,
 'max_data_extension_time_in_days': 14,
 'default_ddl_collation': '',
 'columns': [{'name': 'TEMPERATURE',
   'datatype': 'NUMBER(38,0)',
   'nullable': False},
  {'name': 'LOCATION', 'datatype': 'VARCHAR(16777216)', 'nullable': True}],
 'created_on': datetime.datetime(2026, 1, 26, 7, 12, 40, 609000, tzinfo=TzInfo(0)),
 'database_name': 'PYTHON_API_DB',
 'schema_name': 'PYTHON_API_SCHEMA',
 'rows': 0,
 'bytes': 0,
 'owner': 'ACCOUNTADMIN',
 'automatic_clustering': False,
 'search_optimization': False,
 'owner_role_type': 'ROLE',
 'table_type': 'NORMAL'}

In [None]:
warehouses = root.warehouses

python_api_wh = Warehouse(
    name="PYTHON_API_WH",
    warehouse_size="SMALL",
    auto_suspend=500,
)

warehouse = warehouses.create(python_api_wh,mode=CreateMode.or_replace)

print("warehouse: ",warehouse)


warehouse_details = warehouse.fetch()
warehouse_details.to_dict()
warehouse.drop()


warehouse:  <WarehouseResource: 'PYTHON_API_WH'>


: 

In [17]:
database = root.databases.create(
  Database(
    name="PYTHON_API_DB"),
    mode=CreateMode.or_replace
  )

schema = database.schemas.create(
  Schema(
    name="PYTHON_API_SCHEMA"),
    mode=CreateMode.or_replace,
  )

In [18]:
stages = root.databases[database.name].schemas[schema.name].stages
stages.create(Stage(name="TASKS_STAGE"))

"""
trunc(): Creates a truncated version of an input table.

filter_by_shipmode(): Filters the SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM table by ship mode, limits the results to 10 rows, and writes the results in a new table.
"""
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
  (
    session
    .table(from_table)
    .limit(count)
    .write.save_as_table(to_table)
  )
  return "Truncated table successfully created!"

def filter_by_shipmode(session: Session, mode: str) -> str:
  (
    session
    .table("snowflake_sample_data.tpch_sf100.lineitem")
    .filter(col("L_SHIPMODE") == mode)
    .limit(10)
    .write.save_as_table("filter_table")
  )
  return "Filter table successfully created!"

In [19]:


tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"

task1 = Task(
    name="task_python_api_trunc",
    definition=StoredProcedureCall(
      func=trunc,
      stage_location=f"@{tasks_stage}",
      packages=["snowflake-snowpark-python"],
    ),
    warehouse="COMPUTE_WH",
    schedule=timedelta(minutes=1)
)

task2 = Task(
    name="task_python_api_filter",
    definition=StoredProcedureCall(
      func=filter_by_shipmode,
      stage_location=f"@{tasks_stage}",
      packages=["snowflake-snowpark-python"],
    ),
    warehouse="COMPUTE_WH"
)

session.sql("USE WAREHOUSE COMPUTE_WH").collect()

# create the task in the Snowflake database
tasks = schema.tasks
trunc_task = tasks.create(task1, mode=CreateMode.or_replace)

task2.predecessors = [trunc_task.name]
filter_task = tasks.create(task2, mode=CreateMode.or_replace)

session.sql("SELECT CURRENT_WAREHOUSE()").show()


-------------------------
|"CURRENT_WAREHOUSE()"  |
-------------------------
|COMPUTE_WH             |
-------------------------



In [20]:
# create the task in the Snowflake database
tasks = schema.tasks
trunc_task = tasks.create(task1, mode=CreateMode.or_replace)

task2.predecessors = [trunc_task.name]
filter_task = tasks.create(task2, mode=CreateMode.or_replace)

taskiter = tasks.iter()
for t in taskiter:
    print(t.name)

TASK_PYTHON_API_FILTER
TASK_PYTHON_API_TRUNC


In [22]:
taskiter = tasks.iter()
print("Tasks haven't started yet")
for t in taskiter:
    print("Name: ", t.name, "| State: ", t.state)

trunc_task.resume()
print("Resumed the trunc_task")
taskiter = tasks.iter()
for t in taskiter:
    print("Name: ", t.name, "| State: ", t.state)

Tasks haven't started yet
Name:  TASK_PYTHON_API_FILTER | State:  suspended
Name:  TASK_PYTHON_API_TRUNC | State:  suspended
Resumed the trunc_task
Name:  TASK_PYTHON_API_FILTER | State:  suspended
Name:  TASK_PYTHON_API_TRUNC | State:  started


In [23]:
trunc_task.suspend()

In [None]:
trunc_task.drop()
filter_task.drop()

# tasks were successfully created and dropped