In [1]:
import typing
import pandas as pd
import pytz
from datetime import datetime

from typing_extensions import Annotated

from flytekit.remote.remote import FlyteRemote
from flytekit import task, workflow, LaunchPlan
from flytekit.configuration import Config
from flytekit.core.artifact import Artifact
from flytekit.extend import TypeEngine

from flytekit.core.context_manager import FlyteContextManager
from flytekit.types.structured.structured_dataset import StructuredDataset

In [8]:
r = FlyteRemote(
    Config.auto(config_file="/Users/ytong/.flyte/config-sandbox.yaml"),
    default_project="flytesnacks",
    default_domain="development",
)

### demo_ml.py

#### Getting an artifact (via a uri)

First run the `artifact_ux.demo_ml.run_gather_data` workflow. This workflow just runs one simple task three times, each producing a dataframe. Note the `RideCountData` annotation on the output of the `gather_data` task.

NB: The binding syntax for specifying that the partitions depend on an input will be changing. We don't like exposing a string based template as it feels too error prone. 

The artifacts produced by the task can be searched for by a message that we've mapped to a uri.

In [9]:
a = r.get_artifact(f"flyte://av0.1/flytesnacks/development/ride_count_data?region=SEA&ds=2023-09-05")
a

Artifact: project=flytesnacks, domain=development, name=ride_count_data, version=ahvdf9gb9q4zfkzq8gpq/n0/0/o0
  name=ride_count_data
  partitions=<flytekit.core.artifact.Partitions object at 0x1376541d0>
  tags=None
  literal_type=<FlyteLiteral structured_dataset_type { }>, literal=<FlyteLiteral scalar { structured_dataset { uri: "s3://my-s3-bucket/data/8c/ahvdf9gb9q4zfkzq8gpq-n0-0/c4ef745ce7872e7ad6389e7ba4968196" metadata { structured_dataset_type { format: "parquet" } } } }>)

This is standard flytekit code - it pulls down and converts the value from a Flyte literal to a Python literal.

In [4]:
ctx = FlyteContextManager.current_context()
v = TypeEngine.to_python_value(ctx, a.literal, pd.DataFrame)

In [5]:
v

Unnamed: 0,sectors,rides
0,SEA,731
1,SAE,358
2,ESA,877
3,EAS,199
4,ASE,643
5,AES,722


#### Using an artifact to Launch

kick off new execution with the fetched artifact, confirm it can be used and querying doesn't fail.
then kick it off again without any artifact at all.

In [4]:
wf_version = "sb3"

In [5]:
run_train_model_wf = r.fetch_workflow(
        "flytesnacks", "development", "artifact_ux.demo_ml.run_train_model", wf_version
    )

This shows how to use an output generated by an unrelated workflow/task, in a separate workflow. Here pass the artifact variable `a` to the `data` input.

In [6]:
dd = datetime(2023, 9, 5)
dd = dd.astimezone(pytz.UTC).replace(tzinfo=None)

In [10]:
r.execute(run_train_model_wf, inputs={"region": "SEA", "kickoff_time": dd, "data": a})

<FlyteLiteral id { project: "flytesnacks" domain: "development" name: "f9b4888af79b34ad987d" } spec { launch_plan { resource_type: LAUNCH_PLAN project: "flytesnacks" domain: "development" name: "artifact_ux.demo_ml.run_train_model" version: "sb3" } metadata { system_metadata { } } notifications { } labels { } annotations { } auth_role { } } closure { started_at { } duration { } created_at { seconds: 1698323792 nanos: 886465000 } updated_at { seconds: 1698323792 nanos: 886465000 } }>

Execute without specifying the data. Note that the fetched artifact should match what was picked up by the query when it was kicked off.

In [7]:
r.execute(run_train_model_wf, inputs={"region": "SEA", "kickoff_time": dd})

In [13]:
model_artifact = r.get_artifact(f"flyte://av0.1/flytesnacks/development/my-model:SEA")

In [14]:
model_artifact

Artifact: project=flytesnacks, domain=development, name=my-model, version=f74ac86748e244352829/end-node/o0
  name=my-model
  partitions=<flytekit.core.artifact.Partitions object at 0x13acca290>
  tags=['SEA']
  literal_type=<FlyteLiteral blob { }>, literal=<FlyteLiteral scalar { blob { metadata { type { } } uri: "s3://my-s3-bucket/data/w5/f74ac86748e244352829-n0-0/e41b589d7c8f17b3e59037abdd0f3473/demo_ml.py" } }>)

### demo_timeseries.py

This second example is supposed to show an ETL-centric use-case and demonstrates reactive workflows. The components here are:
* A `create_upstream_directory` workflow that creates an Artifact named `upstream_data`. In this dummy example, the task in this workflow just returns a folder with the files `1.txt`... `100.txt` with 8 of them at random removed. This provides a folder whose contents mostly stay the same day to day, with some small changes.
* A `run_update_hashes` workflow, that takes as input the upstream data above, and also a file that contains the filename and the hash for each of the 92 files in the prior step. Each day it runs, the earlier hash summary file is read, the new upstream data is read, hashes for newly seen files are added, and hashes for files that were deleted are removed.

If the prior day's hash file is not present, a new one is created (which is why the `hashes_file` input for the workflow is `Optional`.

Run the `artifact_ux.demo_timeseries.create_upstream_directory` workflow with `ds` equal to `09/01/2023 00:00:00`

It should produce a `FlyteDirectory` object with some s3 path that holds the offloaded data. Once that finishes, because of the Trigger variable `trigger_hashes` and decorated on `run_update_hashes`, that workflow should kick off. If you inspect the `upstream_data` input on the `artifact_ux.demo_timeseries.run_update_hashes` execution that gets kicked off, you should see that it matches the output from the `create_upstream_directory` workflow.

Because it was the first time this was run, the `hashes_file` input should be empty. But if you run the `create_upstream_directory` workflow again, this time with `9/2/2023 12:00:00 AM UTC`, you should see a second execution of the downstream workflow, this time with the `hashes_file` input set to the output of the first run.