In [1]:
import subprocess
import json
import pandas as pd

def get_projects():
    command = ["uctl", "get", "project", "-o", "json"]

    try:
        projects_raw = subprocess.run(command, capture_output=True, text=True, check=True)
        # print(projects_raw.stdout)

        projects_json = json.loads(projects_raw.stdout)
        
        out_dict = {
            "id": [],
            "name": [],
            "domains": [],
            "description": [],
            "labels": [],
        }
        
        for this_project in projects_json:
            # print(str(this_project))
            
            out_dict["id"].append(this_project["id"])
            
            if "name" in this_project.keys():
                out_dict["name"].append(this_project["name"])
            else:
                out_dict["name"].append("")
                
            domains_list = []
            for i in range(0, len(this_project["domains"])):
                domains_list.append(this_project["domains"][i]["id"])
            out_dict["domains"].append(str(domains_list))
                
            if "description" in this_project.keys():
                out_dict["description"].append(this_project["description"])
            else:
                out_dict["description"].append("")
                            
            labels_list = []
            if "labels" in this_project.keys():
                if "values" in this_project["labels"].keys():
                    for key in this_project["labels"]["values"].keys():
                        labels_list.append(str(key) + ":" + this_project["labels"]["values"][key])
            out_dict["labels"].append(str(labels_list))    
                
        out_df = pd.DataFrame(out_dict)
        return out_df

    except subprocess.SubprocessError as e:
        print("Error:", e)
        
# projects_df = get_projects()
# projects_df

In [2]:
import subprocess
import json
import pandas as pd

def get_workflows(project: str, domain: str):
    command = ["uctl", "get", "workflows", "-p", project, "-d", domain, "-o", "json"]

    try:
        workflows_raw = subprocess.run(command, capture_output=True, text=True, check=True)
        # print(workflows_raw.stdout)

        workflows_json = json.loads(workflows_raw.stdout)

        out_dict = {
            "project": [],
            "domain": [],
            "workflow_name": [],
        }

        if isinstance(workflows_json, list):
            for this_workflow in workflows_json:
                out_dict["project"].append(this_workflow["id"]["project"])
                out_dict["domain"].append(this_workflow["id"]["domain"])
                out_dict["workflow_name"].append(this_workflow["id"]["name"])
        else:
            out_dict["project"].append(workflows_json["id"]["project"])
            out_dict["domain"].append(workflows_json["id"]["domain"])
            out_dict["workflow_name"].append(workflows_json["id"]["name"])
            
        out_df = pd.DataFrame(out_dict)
        return out_df

    except subprocess.SubprocessError as e:
        print("Error:", e)
        
# workflow_names_df = get_workflows("my-new-project", "development")
# workflow_names_df

In [3]:
import subprocess
import json
import pandas as pd

def get_workflow_versions(workflow_names_df: pd.DataFrame):
    
    # workflow_names_df = workflow_names_df[:10]
    
    out_dict = {
        "project": [],
        "domain": [],
        "workflow_name": [],
        "workflow_version": [],
        "created_at": [],
    }
    
    for i in range(0, workflow_names_df.shape[0]):
        this_project = workflow_names_df.at[i, "project"]
        this_domain = workflow_names_df.at[i, "domain"]
        this_workflow_name = workflow_names_df.at[i, "workflow_name"]
    
        command = ["uctl", "get", "workflow", "-p", this_project, "-d", this_domain, this_workflow_name, "-o", "json"]
        
        try:
            workflow_versions_raw = subprocess.run(command, capture_output=True, text=True, check=True)
            # print(workflow_versions_raw.stdout)
    
            workflow_versions_json = json.loads(workflow_versions_raw.stdout)
            
            if isinstance(workflow_versions_json, list):
                for this_workflow in workflow_versions_json: 
                    out_dict["project"].append(this_workflow["id"]["project"])
                    out_dict["domain"].append(this_workflow["id"]["domain"])
                    out_dict["workflow_name"].append(this_workflow["id"]["name"])
                    out_dict["workflow_version"].append(this_workflow["id"]["version"])
                    out_dict["created_at"].append(this_workflow["closure"]["createdAt"])
            else: # special case where there is only a single workflow version
                out_dict["project"].append(workflow_versions_json["id"]["project"])
                out_dict["domain"].append(workflow_versions_json["id"]["domain"])
                out_dict["workflow_name"].append(workflow_versions_json["id"]["name"])
                out_dict["workflow_version"].append(workflow_versions_json["id"]["version"])
                out_dict["created_at"].append(workflow_versions_json["closure"]["createdAt"])
                

        except subprocess.SubprocessError as e:
            print("Error:", e)
            
    out_df = pd.DataFrame(out_dict)
    return out_df
        
# workflow_versions_df = get_workflow_versions(workflow_names_df)
# workflow_versions_df

In [4]:
import subprocess
import json
import pandas as pd

def get_tasks(project: str, domain: str):
    command = ["uctl", "get", "tasks", "-p", project, "-d", domain, "-o", "json"]

    try:
        tasks_raw = subprocess.run(command, capture_output=True, text=True, check=True)
        # print(tasks_raw.stdout)

        tasks_json = json.loads(tasks_raw.stdout)

        out_dict = {
            "project": [],
            "domain": [],
            "task_name": [],
            "task_version": [],
        }
        
        if isinstance(tasks_json, list):
            for this_task in tasks_json:
                out_dict["project"].append(this_task["id"]["project"])
                out_dict["domain"].append(this_task["id"]["domain"])
                out_dict["task_name"].append(this_task["id"]["name"])
                out_dict["task_version"].append(this_task["id"]["version"])
        else:
            out_dict["project"].append(tasks_json["id"]["project"])
            out_dict["domain"].append(tasks_json["id"]["domain"])
            out_dict["task_name"].append(tasks_json["id"]["name"])
            out_dict["task_version"].append(tasks_json["id"]["version"])
            
        out_df = pd.DataFrame(out_dict)
        return out_df

    except subprocess.SubprocessError as e:
        print("Error:", e)
        
# tasks_and_versions_df = get_tasks("my-new-project", "development")
# task_names_df = tasks_and_versions_df[["project", "domain", "task_name"]].drop_duplicates().reset_index(drop=True)
# task_names_df

In [5]:
import subprocess
import json
import pandas as pd

def get_task_versions(task_names_df: pd.DataFrame):
    
    # task_names_df = task_names_df[:2]
    
    out_dict = {
        "project": [],
        "domain": [],
        "task_name": [],
        "task_version": [],
        "created_at": [],
    }
    
    for i in range(0, task_names_df.shape[0]):
        this_project = task_names_df.at[i, "project"]
        this_domain = task_names_df.at[i, "domain"]
        this_task_name = task_names_df.at[i, "task_name"]
    
        command = ["uctl", "get", "task", "-p", this_project, "-d", this_domain, this_task_name, "-o", "json"]
        
        try:
            task_versions_raw = subprocess.run(command, capture_output=True, text=True, check=True)
            # print(task_versions_raw.stdout)
    
            task_versions_json = json.loads(task_versions_raw.stdout)
            
            if isinstance(task_versions_json, list):
                for this_task in task_versions_json:
                    # print(str(this_task))
                    out_dict["project"].append(this_task["id"]["project"])
                    out_dict["domain"].append(this_task["id"]["domain"])
                    out_dict["task_name"].append(this_task["id"]["name"])
                    out_dict["task_version"].append(this_task["id"]["version"])
                    out_dict["created_at"].append(this_task["closure"]["createdAt"])
            else: # special case where only one version exists
                out_dict["project"].append(task_versions_json["id"]["project"])
                out_dict["domain"].append(task_versions_json["id"]["domain"])
                out_dict["task_name"].append(task_versions_json["id"]["name"])
                out_dict["task_version"].append(task_versions_json["id"]["version"])
                out_dict["created_at"].append(task_versions_json["closure"]["createdAt"])
                
        except subprocess.SubprocessError as e:
            print("Error:", e)
            
    out_df = pd.DataFrame(out_dict)
    return out_df
        
# task_versions_df = get_task_versions(task_names_df)
# task_versions_df

In [6]:
import subprocess
import json
import pandas as pd

def get_launchplans(project: str, domain: str):
    command = ["uctl", "get", "launchplan", "-p", project, "-d", domain, "-o", "json"]

    try:
        launplans_raw = subprocess.run(command, capture_output=True, text=True, check=True)
        # print(launplans_raw.stdout)

        launchplans_json = json.loads(launplans_raw.stdout)

        out_dict = {
            "project": [],
            "domain": [],
            "launchplan_name": [],
            "launchplan_version": [],
        }
        
        if isinstance(launchplans_json, list):
            for this_launchplan in launchplans_json:
                out_dict["project"].append(this_launchplan["id"]["project"])
                out_dict["domain"].append(this_launchplan["id"]["domain"])
                out_dict["launchplan_name"].append(this_launchplan["id"]["name"])
                out_dict["launchplan_version"].append(this_launchplan["id"]["version"])
        else:
            out_dict["project"].append(launchplans_json["id"]["project"])
            out_dict["domain"].append(launchplans_json["id"]["domain"])
            out_dict["launchplan_name"].append(launchplans_json["id"]["name"])
            out_dict["launchplan_version"].append(launchplans_json["id"]["version"])

        out_df = pd.DataFrame(out_dict)
        return out_df

    except subprocess.SubprocessError as e:
        print("Error:", e)
        
# launchplans_and_versions_df = get_launchplans("flytesnacks", "development")
# launchplan_names_df = launchplans_and_versions_df[["project", "domain", "launchplan_name"]].drop_duplicates().reset_index(drop=True)
# launchplan_names_df

In [7]:
import subprocess
import json
import pandas as pd

def get_launchplan_versions(launchplan_names_df: pd.DataFrame):
    
    # launchplan_names_df = launchplan_names_df[:2]
    
    out_dict = {
        "project": [],
        "domain": [],
        "launchplan_name": [],
        "launchplan_version": [],
        "created_at": [],
    }
    
    for i in range(0, launchplan_names_df.shape[0]):
        this_project = launchplan_names_df.at[i, "project"]
        this_domain = launchplan_names_df.at[i, "domain"]
        this_workflow_name = launchplan_names_df.at[i, "launchplan_name"]
    
        command = ["uctl", "get", "launchplan", "-p", this_project, "-d", this_domain, this_workflow_name, "-o", "json"]
        
        try:
            launchplan_versions_raw = subprocess.run(command, capture_output=True, text=True, check=True)
            # print(launchplan_versions_raw.stdout)
    
            launchplan_versions_json = json.loads(launchplan_versions_raw.stdout)
            
            if isinstance(launchplan_versions_json, list):
                for this_launchplan in launchplan_versions_json: 
                    out_dict["project"].append(this_launchplan["id"]["project"])
                    out_dict["domain"].append(this_launchplan["id"]["domain"])
                    out_dict["launchplan_name"].append(this_launchplan["id"]["name"])
                    out_dict["launchplan_version"].append(this_launchplan["id"]["version"])
                    out_dict["created_at"].append(this_launchplan["closure"]["createdAt"])
            else: # special case where there is only a single workflow version
                out_dict["project"].append(launchplan_versions_json["id"]["project"])
                out_dict["domain"].append(launchplan_versions_json["id"]["domain"])
                out_dict["launchplan_name"].append(launchplan_versions_json["id"]["name"])
                out_dict["launchplan_version"].append(launchplan_versions_json["id"]["version"])
                out_dict["created_at"].append(launchplan_versions_json["closure"]["createdAt"])
                

        except subprocess.SubprocessError as e:
            print("Error:", e)
            
    out_df = pd.DataFrame(out_dict)
    return out_df
        
# launchplan_versions_df = get_launchplan_versions(launchplan_names_df)
# launchplan_versions_df

In [9]:
import ast

def get_all_entities():
    all_projects_df = get_projects()
    
    # # testing
    # all_projects_df = all_projects_df[all_projects_df["id"] == "flytesnacks"].reset_index(drop=True)
    
    workflow_versions_df_list = []
    task_versions_df_list = []
    launchplan_versions_df_list = []
    
    for i in range(0, all_projects_df.shape[0]):
        this_id = all_projects_df.at[i, "id"]
        domains_string_list = all_projects_df.at[i, "domains"]
        domains_list = ast.literal_eval(domains_string_list)
        for this_domain in domains_list:
            print("id: {}; domain: {}".format(this_id, this_domain))
            
            workflow_names_df = get_workflows(this_id, this_domain)
            if isinstance(workflow_names_df, pd.DataFrame):
                
                # # testing
                # workflow_names_df = workflow_names_df[:1]
                
                workflow_versions_df = get_workflow_versions(workflow_names_df)
                workflow_versions_df_list.append(workflow_versions_df)
            
            task_names_df = get_tasks(this_id, this_domain)
            if isinstance(task_names_df, pd.DataFrame):
                
                # # testing
                # task_names_df = task_names_df[:1]
                
                task_versions_df = get_task_versions(task_names_df)
                task_versions_df_list.append(task_versions_df)
            
            launchplan_names_df = get_launchplans(this_id, this_domain)
            if isinstance(launchplan_names_df, pd.DataFrame):
                
                # # testing
                # launchplan_names_df = launchplan_names_df[:1]
                
                launchplan_versions_df = get_launchplan_versions(launchplan_names_df)
                launchplan_versions_df_list.append(launchplan_versions_df)
                
    all_worfkflows_df = pd.concat(workflow_versions_df_list)
    all_tasks_df = pd.concat(task_versions_df_list)
    all_launchplans_df = pd.concat(launchplan_versions_df_list)
    
    return all_worfkflows_df, all_tasks_df, all_launchplans_df

                        
wf, task, lp = get_all_entities()  

id: stress-test; domain: development
Error: Command '['uctl', 'get', 'workflows', '-p', 'stress-test', '-d', 'development', '-o', 'json']' returned non-zero exit status 1.
Error: Command '['uctl', 'get', 'tasks', '-p', 'stress-test', '-d', 'development', '-o', 'json']' returned non-zero exit status 1.
Error: Command '['uctl', 'get', 'launchplan', '-p', 'stress-test', '-d', 'development', '-o', 'json']' returned non-zero exit status 1.
id: stress-test; domain: staging
Error: Command '['uctl', 'get', 'workflows', '-p', 'stress-test', '-d', 'staging', '-o', 'json']' returned non-zero exit status 1.
Error: Command '['uctl', 'get', 'tasks', '-p', 'stress-test', '-d', 'staging', '-o', 'json']' returned non-zero exit status 1.
Error: Command '['uctl', 'get', 'launchplan', '-p', 'stress-test', '-d', 'staging', '-o', 'json']' returned non-zero exit status 1.
id: stress-test; domain: production
Error: Command '['uctl', 'get', 'workflows', '-p', 'stress-test', '-d', 'production', '-o', 'json']' r

In [10]:
wf.to_pickle('workflow_versions_df.pkl')
task.to_pickle('task_versions_df.pkl')
lp.to_pickle('launchplan_versions_df.pkl')

In [11]:
wf

Unnamed: 0,project,domain,workflow_name,workflow_version,created_at
0,my-new-project,development,workflows.example.wf,MCxs3EE5x2S8JsBYpQ-lDg,2024-02-13T23:08:17.690157Z
0,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.no_gpu_acceler...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:33.391902Z
1,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.with_defined_g...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:31.079066Z
2,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.with_defined_g...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:32.540036Z
0,hoover-test-project,development,child_directory.python_file_workflow.classname...,kq00XHtdzIo35caDHWv0fA,2024-02-13T21:06:39.763311Z
...,...,...,...,...,...
493,flytesnacks,development,basics.deck.wf,v0.3.212,2023-07-17T20:52:49.553819Z
494,flytesnacks,development,basics.deck.wf,v0.3.216,2023-08-07T18:22:20.119722Z
495,flytesnacks,development,basics.deck.wf,v0.3.218,2023-08-15T21:47:21.425102Z
496,flytesnacks,development,basics.deck.wf,v0.3.219,2023-08-25T20:18:56.063843Z


In [13]:
task

Unnamed: 0,project,domain,task_name,task_version,created_at
0,my-new-project,development,workflows.example.greeting_length,MCxs3EE5x2S8JsBYpQ-lDg,2024-02-13T23:08:17.525720Z
1,my-new-project,development,workflows.example.say_hello,MCxs3EE5x2S8JsBYpQ-lDg,2024-02-13T23:08:17.386040Z
2,my-new-project,development,workflows.single_task.single_task_demo,q1HylXqel9m7HeH3Xtg_ag,2024-02-13T23:05:10.103033Z
0,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.no_gpu_acceler...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:33.122006Z
1,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.with_defined_g...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:32.101509Z
...,...,...,...,...,...
641,flytesnacks,development,sam.workflow.convert_to_tar_and_upload,_4pugFhiKiYjXnAgyuaZaw,2024-02-27T16:00:32.935891Z
642,flytesnacks,development,sam.workflow.convert_to_tar_and_upload,Z7nl--qTR661w4TRRCyx3Q,2024-02-27T14:27:20.698538Z
643,flytesnacks,development,sam.workflow.convert_to_tar_and_upload,HM3BtzUsG5KO9JWkyLb8nQ,2024-02-27T13:57:28.354485Z
644,flytesnacks,development,sam.workflow.convert_to_tar_and_upload,ZLaXA4vjjTIw9iJ4jZIL3w,2024-02-27T13:52:04.419695Z


In [14]:
lp

Unnamed: 0,project,domain,launchplan_name,launchplan_version,created_at
0,my-new-project,development,workflows.example.wf,MCxs3EE5x2S8JsBYpQ-lDg,2024-02-13T23:08:18.292186Z
1,my-new-project,development,.flytegen.workflows.single_task.single_task_demo,q1HylXqel9m7HeH3Xtg_ag,2024-02-13T23:05:10.594171Z
0,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.no_gpu_acceler...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:34.176303Z
1,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.with_defined_g...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:33.010875Z
2,gpu-accelerator-tests,development,workflows.gpu_accelerator_tests.with_defined_g...,670mOt9zqt3gajGPYtIwUg==,2024-01-19T18:05:31.985061Z
...,...,...,...,...,...
820,flytesnacks,development,src.tasks.lipsync.facerender.animate.animate_f...,ZxHnHqhRpXoJLJScprqqgQ,2024-02-26T16:42:52.566916Z
821,flytesnacks,development,src.tasks.lipsync.facerender.animate.animate_f...,g5xVGaf8RNi1gsbsXnHMuw,2024-02-26T14:04:13.554999Z
822,flytesnacks,development,src.tasks.lipsync.facerender.animate.animate_f...,r4HRErescJxgc4bV3wR_RQ,2024-02-26T13:35:01.284452Z
823,flytesnacks,development,src.tasks.lipsync.facerender.animate.animate_f...,VlRDGo53INjbceHywPiB6Q,2024-02-26T13:07:14.812347Z


In [83]:
wf_latest_df = wf.copy()
wf_latest_df["timestamp"] = pd.to_datetime(wf_latest_df["created_at"])
wf_latest_df = wf_latest_df.sort_values(by=["project", "domain", "workflow_name", "timestamp"]).reset_index(drop=True)
latest_indices = wf_latest_df.groupby(["project", "domain", "workflow_name"])["timestamp"].idxmax()
wf_latest_df = wf_latest_df.loc[latest_indices].reset_index(drop=True)
wf_latest_df

Unnamed: 0,project,domain,workflow_name,workflow_version,created_at,timestamp
0,autodoc,development,docai.generate.generate_cloud,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:48.259912Z,2024-01-11 22:15:48.259912+00:00
1,autodoc,development,docai.generate.generate_local,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:47.521984Z,2024-01-11 22:15:47.521984+00:00
2,flyte-attendant,development,flyte_attendant.workflows.chat_support.ask,zd-GIZj7TpYqRfleJCLnbQ==,2023-03-15T16:04:28.576431Z,2023-03-15 16:04:28.576431+00:00
3,flytesnacks,development,add.wf,9-J01oEmiPE8YsfHxxPn8g,2024-02-12T14:59:01.690505Z,2024-02-12 14:59:01.690505+00:00
4,flytesnacks,development,advanced.wf,zBBExGyE2tFuAxjZerd6cQ,2024-02-27T15:06:57.612396Z,2024-02-27 15:06:57.612396+00:00
...,...,...,...,...,...,...
295,zeryx-demo,development,workflows.mnist_training_example.grid_search,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:57.579404Z,2024-01-02 19:49:57.579404+00:00
296,zeryx-demo,development,workflows.mnist_training_example.mnist_workflo...,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:56.462475Z,2024-01-02 19:49:56.462475+00:00
297,zeryx-demo,development,workflows.neuron.resnet50_infer_wf,Mb5m6j1cc3m9lsGw2me32A==,2023-11-16T19:17:37.636150Z,2023-11-16 19:17:37.636150+00:00
298,zeryx-demo,development,workflows.ray_example.ray_workflow,M_5qOFZQOR2DxbfpYWPdsw==,2023-11-27T17:48:29.811317Z,2023-11-27 17:48:29.811317+00:00


In [84]:
task_latest_df = task.copy()
task_latest_df["timestamp"] = pd.to_datetime(task_latest_df["created_at"])
task_latest_df = task_latest_df.sort_values(by=["project", "domain", "task_name", "timestamp"]).reset_index(drop=True)
latest_indices = task_latest_df.groupby(["project", "domain", "task_name"])["timestamp"].idxmax()
task_latest_df = task_latest_df.loc[latest_indices].reset_index(drop=True)
task_latest_df

Unnamed: 0,project,domain,task_name,task_version,created_at,timestamp
0,autodoc,development,docai.generate.download_and_unzip_github_repo,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:46.789629Z,2024-01-11 22:15:46.789629+00:00
1,autodoc,development,docai.generate.generate_html,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:47.354163Z,2024-01-11 22:15:47.354163+00:00
2,autodoc,development,docai.generate.get_and_set_open_ai_key,6HxwAn2dhJByF1FjMiHj-g==,2024-01-10T23:26:58.800831Z,2024-01-10 23:26:58.800831+00:00
3,autodoc,development,docai.generate.get_github_access_token,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:46.678860Z,2024-01-11 22:15:46.678860+00:00
4,autodoc,development,docai.generate.parse,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:46.891243Z,2024-01-11 22:15:46.891243+00:00
...,...,...,...,...,...,...
384,zeryx-demo,development,workflows.mnist_training_example.mnist_task_gpu,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:56.250244Z,2024-01-02 19:49:56.250244+00:00
385,zeryx-demo,development,workflows.mnist_training_example.train,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:56.011848Z,2024-01-02 19:49:56.011848+00:00
386,zeryx-demo,development,workflows.mnist_training_example.validation_loss,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:56.129522Z,2024-01-02 19:49:56.129522+00:00
387,zeryx-demo,staging,workflows.example.greeting_length,2,2023-12-05T19:06:21.498265Z,2023-12-05 19:06:21.498265+00:00


In [88]:
launchplan_latest_df = lp.copy()
launchplan_latest_df["timestamp"] = pd.to_datetime(launchplan_latest_df["created_at"])
launchplan_latest_df = launchplan_latest_df.sort_values(by=["project", "domain", "launchplan_name", "timestamp"]).reset_index(drop=True)
latest_indices = launchplan_latest_df.groupby(["project", "domain", "launchplan_name"])["timestamp"].idxmax()
launchplan_latest_df = launchplan_latest_df.loc[latest_indices].reset_index(drop=True)
launchplan_latest_df

Unnamed: 0,project,domain,launchplan_name,launchplan_version,created_at,timestamp
0,autodoc,development,.flytegen.docai.generate.get_and_set_open_ai_key,C5oxz70-jD0k6-lmhG0-NQ==,2024-01-08T22:12:12.158655Z,2024-01-08 22:12:12.158655+00:00
1,autodoc,development,docai.generate.generate_cloud,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:48.775143Z,2024-01-11 22:15:48.775143+00:00
2,autodoc,development,docai.generate.generate_local,1L9EtKryxGRLAP0lsKiOwQ==,2024-01-11T22:15:48.061499Z,2024-01-11 22:15:48.061499+00:00
3,flyte-attendant,development,flyte_attendant.workflows.chat_support.ask,zd-GIZj7TpYqRfleJCLnbQ==,2023-03-15T16:04:28.828732Z,2023-03-15 16:04:28.828732+00:00
4,flytesnacks,development,.flytegen.task.t1,Yr1wdryFqm_zHNa5DePIgw,2024-02-27T18:45:04.028195Z,2024-02-27 18:45:04.028195+00:00
...,...,...,...,...,...,...
239,zeryx-demo,development,workflows.example.specific_file_in_dir_wf,CRZjyrzA3upd5lz59Bk3CA==,2023-12-11T18:39:29.063637Z,2023-12-11 18:39:29.063637+00:00
240,zeryx-demo,development,workflows.example.wf,yrE3rfqbKcIXqysZqYwMPg==,2023-12-14T20:51:40.884573Z,2023-12-14 20:51:40.884573+00:00
241,zeryx-demo,development,workflows.mnist_training_example.grid_search,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:58.164018Z,2024-01-02 19:49:58.164018+00:00
242,zeryx-demo,development,workflows.mnist_training_example.mnist_workflo...,TqhZBvoTMeSVz2DXACHWcQ==,2024-01-02T19:49:56.983671Z,2024-01-02 19:49:56.983671+00:00


In [None]:
filtered_df = wf_latest_df[wf_latest_df["workflow_name"].str.contains(".flytegen")]
filtered_df

In [None]:
filtered_df = task_latest_df[task_latest_df["task_name"].str.contains(".flytegen")]
filtered_df

In [90]:
filtered_df = launchplan_latest_df[launchplan_latest_df["launchplan_name"].str.contains(".flytegen")]
filtered_df

Unnamed: 0,project,domain,launchplan_name,launchplan_version,created_at,timestamp
0,autodoc,development,.flytegen.docai.generate.get_and_set_open_ai_key,C5oxz70-jD0k6-lmhG0-NQ==,2024-01-08T22:12:12.158655Z,2024-01-08 22:12:12.158655+00:00
4,flytesnacks,development,.flytegen.task.t1,Yr1wdryFqm_zHNa5DePIgw,2024-02-27T18:45:04.028195Z,2024-02-27 18:45:04.028195+00:00
31,flytetester,development,.flytegen.workflows.map_task_repro.dynamic_map...,FkbMR7Uv0GkoVx1MEOZsjg==,2023-06-02T22:02:22.018151Z,2023-06-02 22:02:22.018151+00:00
32,flytetester,development,.flytegen.workflows.registration.registration_wf,wnPeKeZ8wpzAi8kiAqxJtg==,2023-05-29T16:51:46.366598Z,2023-05-29 16:51:46.366598+00:00
57,hoover-test-project,development,.flytegen.workflows.test_workflow.hello_task,K9X8xa5VKB1o2B7uAYB7yQ==,2024-02-08T22:30:57.569881Z,2024-02-08 22:30:57.569881+00:00
70,llm-fine-tuning,development,.flytegen.flyte_llama.workflows.create_dataset,-Fj61EwsOp5Y_A22NtOdYQ==,2023-11-16T20:01:55.036300Z,2023-11-16 20:01:55.036300+00:00
71,llm-fine-tuning,development,.flytegen.flyte_llama.workflows.publish_model,VENx1zSF-Ps4mhqlzRacwA==,2023-10-21T14:22:05.348069Z,2023-10-21 14:22:05.348069+00:00
72,llm-fine-tuning,development,.flytegen.flyte_llama.workflows.train,RsIpgs99a1HWAyex3BKt-w==,2024-01-25T16:09:38.161008Z,2024-01-25 16:09:38.161008+00:00
73,llm-fine-tuning,development,.flytegen.flyte_llama.workflows.tune_batch_size,VjtrtNtYHZsMgl3HAKVNXA==,2023-11-16T15:01:09.335263Z,2023-11-16 15:01:09.335263+00:00
74,llm-fine-tuning,development,.flytegen.flyte_llama.workflows.tune_batch_siz...,VxOmngjs7UNi6yHml5UONQ==,2023-11-16T13:43:27.296126Z,2023-11-16 13:43:27.296126+00:00


In [91]:
import os

current_directory = os.getcwd()
folder_name = "Library"
library_path = os.path.join(current_directory, folder_name)

if os.path.exists(library_path):
    os.system(f"rm -rf {library_path}")

os.makedirs(library_path)

    
for i in range(0, wf_latest_df.shape[0]):
    domain_name = wf_latest_df.at[i, "domain"]
    project_name = wf_latest_df.at[i, "project"]
    workflow_name = wf_latest_df.at[i, "workflow_name"]
    if ".flytegen" in workflow_name:
        workflow_name = workflow_name.replace(".flytegen.", "") + "-auto_generated"
    
    root_path = os.path.join(library_path, domain_name, project_name)
    
    folders = workflow_name.split(".")
    file_name = folders.pop() + ".workflow"
    if len(folders) > 0:
        folder_path = os.path.join(*folders)
        folder_path = os.path.join(root_path, folder_path)
    else:
        folder_path = root_path
    file_path = os.path.join(folder_path, file_name)
    # print("folder_path: {}; file:{}; file_path: {}".format(folder_path, file_name, file_path))
    
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    if not os.path.exists(file_path):
        with open(file_path, 'w') as file:
            file.write("workflow name: {}".format(workflow_name))
            
for i in range(0, task_latest_df.shape[0]):
    domain_name = task_latest_df.at[i, "domain"]
    project_name = task_latest_df.at[i, "project"]
    task_name = task_latest_df.at[i, "task_name"]
    if ".flytegen" in task_name:
        task_name = task_name.replace(".flytegen.", "") + "-auto_generated"
    
    root_path = os.path.join(library_path, domain_name, project_name)
    
    folders = task_name.split(".")
    file_name = folders.pop() + ".task"
    if len(folders) > 0:
        folder_path = os.path.join(*folders)
        folder_path = os.path.join(root_path, folder_path)
    else:
        folder_path = root_path
    file_path = os.path.join(folder_path, file_name)
    # print("folder_path: {}; file:{}; file_path: {}".format(folder_path, file_name, file_path))
    
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    if not os.path.exists(file_path):
        with open(file_path, 'w') as file:
            file.write("task name: {}".format(task_name))
            
for i in range(0, launchplan_latest_df.shape[0]):
    domain_name = launchplan_latest_df.at[i, "domain"]
    project_name = launchplan_latest_df.at[i, "project"]
    launchplan_name = launchplan_latest_df.at[i, "launchplan_name"]
    if ".flytegen" in launchplan_name:
        launchplan_name = launchplan_name.replace(".flytegen.", "") + "-auto_generated"
    
    root_path = os.path.join(library_path, domain_name, project_name)
    
    folders = launchplan_name.split(".")
    file_name = folders.pop() + ".launchplan"
    if len(folders) > 0:
        folder_path = os.path.join(*folders)
        folder_path = os.path.join(root_path, folder_path)
    else:
        folder_path = root_path
    file_path = os.path.join(folder_path, file_name)
    # print("folder_path: {}; file:{}; file_path: {}".format(folder_path, file_name, file_path))
    
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    if not os.path.exists(file_path):
        with open(file_path, 'w') as file:
            file.write("launchplan name: {}".format(launchplan_name))


In [None]:
import pandas as pd
import os


def generate_directory(wf: pd.DataFrame, task: pd.DataFrame, lp: pd.DataFrame):
    
    wf_latest_df = wf.copy()
    wf_latest_df["timestamp"] = pd.to_datetime(wf_latest_df["created_at"])
    wf_latest_df = wf_latest_df.sort_values(by=["project", "domain", "workflow_name", "timestamp"]).reset_index(drop=True)
    latest_indices = wf_latest_df.groupby(["project", "domain", "workflow_name"])["timestamp"].idxmax()
    wf_latest_df = wf_latest_df.loc[latest_indices].reset_index(drop=True)
    
    task_latest_df = task.copy()
    task_latest_df["timestamp"] = pd.to_datetime(task_latest_df["created_at"])
    task_latest_df = task_latest_df.sort_values(by=["project", "domain", "task_name", "timestamp"]).reset_index(drop=True)
    latest_indices = task_latest_df.groupby(["project", "domain", "task_name"])["timestamp"].idxmax()
    task_latest_df = task_latest_df.loc[latest_indices].reset_index(drop=True)
    
    launchplan_latest_df = lp.copy()
    launchplan_latest_df["timestamp"] = pd.to_datetime(launchplan_latest_df["created_at"])
    launchplan_latest_df = launchplan_latest_df.sort_values(by=["project", "domain", "launchplan_name", "timestamp"]).reset_index(drop=True)
    latest_indices = launchplan_latest_df.groupby(["project", "domain", "launchplan_name"])["timestamp"].idxmax()
    launchplan_latest_df = launchplan_latest_df.loc[latest_indices].reset_index(drop=True)
    
    current_directory = os.getcwd()
    folder_name = "Library"
    library_path = os.path.join(current_directory, folder_name)
    
    if os.path.exists(library_path):
        os.system(f"rm -rf {library_path}")
    
    os.makedirs(library_path)
    
        
    for i in range(0, wf_latest_df.shape[0]):
        domain_name = wf_latest_df.at[i, "domain"]
        project_name = wf_latest_df.at[i, "project"]
        workflow_name = wf_latest_df.at[i, "workflow_name"]
        if ".flytegen" in workflow_name:
            workflow_name = workflow_name.replace(".flytegen.", "") + "-auto_generated"
        
        root_path = os.path.join(library_path, domain_name, project_name)
        
        folders = workflow_name.split(".")
        file_name = folders.pop() + ".workflow"
        if len(folders) > 0:
            folder_path = os.path.join(*folders)
            folder_path = os.path.join(root_path, folder_path)
        else:
            folder_path = root_path
        file_path = os.path.join(folder_path, file_name)
        # print("folder_path: {}; file:{}; file_path: {}".format(folder_path, file_name, file_path))
        
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
    
        if not os.path.exists(file_path):
            with open(file_path, 'w') as file:
                file.write("workflow name: {}".format(workflow_name))
                
    for i in range(0, task_latest_df.shape[0]):
        domain_name = task_latest_df.at[i, "domain"]
        project_name = task_latest_df.at[i, "project"]
        task_name = task_latest_df.at[i, "task_name"]
        if ".flytegen" in task_name:
            task_name = task_name.replace(".flytegen.", "") + "-auto_generated"
        
        root_path = os.path.join(library_path, domain_name, project_name)
        
        folders = task_name.split(".")
        file_name = folders.pop() + ".task"
        if len(folders) > 0:
            folder_path = os.path.join(*folders)
            folder_path = os.path.join(root_path, folder_path)
        else:
            folder_path = root_path
        file_path = os.path.join(folder_path, file_name)
        # print("folder_path: {}; file:{}; file_path: {}".format(folder_path, file_name, file_path))
        
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
    
        if not os.path.exists(file_path):
            with open(file_path, 'w') as file:
                file.write("task name: {}".format(task_name))
                
    for i in range(0, launchplan_latest_df.shape[0]):
        domain_name = launchplan_latest_df.at[i, "domain"]
        project_name = launchplan_latest_df.at[i, "project"]
        launchplan_name = launchplan_latest_df.at[i, "launchplan_name"]
        if ".flytegen" in launchplan_name:
            launchplan_name = launchplan_name.replace(".flytegen.", "") + "-auto_generated"
        
        root_path = os.path.join(library_path, domain_name, project_name)
        
        folders = launchplan_name.split(".")
        file_name = folders.pop() + ".launchplan"
        if len(folders) > 0:
            folder_path = os.path.join(*folders)
            folder_path = os.path.join(root_path, folder_path)
        else:
            folder_path = root_path
        file_path = os.path.join(folder_path, file_name)
        # print("folder_path: {}; file:{}; file_path: {}".format(folder_path, file_name, file_path))
        
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
    
        if not os.path.exists(file_path):
            with open(file_path, 'w') as file:
                file.write("launchplan name: {}".format(launchplan_name))
    
    
    