In [1]:
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

### workflow custom

#### difine workflow   my_workflow and my_unused_workflow
Sets up the list of custom workflows that can be used in a pipeline
The idea being that you can have a pool of workflows that can be used in any number of
your pipelines

In [2]:
import pandas as pd
# our fake dataset
dataset = pd.DataFrame([{"col1": 2, "col2": 4}, {"col1": 5, "col2": 10}])



In [3]:
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
from graphrag.index.workflows import WorkflowDefinitions
  
custom_workflows: WorkflowDefinitions = {
    "my_workflow": lambda config: [
        {
            "verb": "derive",
            "args": {
                "column1": "col1",  # looks for col1 in the dataset
                "column2": "col2",  # looks for col2 in the dataset
                "to": config.get(
                    # Allow the user to specify the output column name,
                    # otherwise default to "output_column"
                    "derive_output_column",
                    "output_column",
                ),  # new column name,
                "operator": "*",
            },
        }
    ],
    "my_unused_workflow": lambda _config: [
        {
            "verb": "derive",
            "args": {
                "column1": "col1",  # looks for col1 in the dataset
                "column2": "col2",  # looks for col2 in the dataset
                "to": "unused_output_column",
                "operator": "*",
            },
        }
    ],
}


#### create pipeline workflows file

In [4]:


from pathlib import Path

PIPELINE_YAML = """
workflows:
  - name: my_workflow
    config:
      derive_output_column: "col_1_multiplied" 

"""
pipeline_file =  Path().cwd() / "pipeline.yaml"
with pipeline_file.open("w") as file:
    file.write(PIPELINE_YAML)
    

In [5]:
# Load your config without the input section
config_path = str(pipeline_file)
config_path

'/media/gpt4-pdf-chatbot-langchain/graphrag/examples_notebooks/4_custom_set_of_available_workflows/pipeline.yaml'

#### Grab the last result from the pipeline, should be our entity extraction

In [6]:
from graphrag.index import run_pipeline_with_config

tables = []
async for table in run_pipeline_with_config(
    config_or_path=config_path,
    dataset=dataset,
    additional_workflows=custom_workflows,
):
    tables.append(table)
pipeline_result = tables[-1]

if pipeline_result.result is not None:
    # Should look something like this:
    #    col1  col2  col_1_multiplied
    # 0     2     4                 8
    # 1     5    10                50
    print(pipeline_result.result)
else:
    print("No results!")

   col1  col2  col_1_multiplied
0     2     4                 8
1     5    10                50


#### use Python API custom_workflow_definitions my_workflow 

In [7]:

from graphrag.index.config import PipelineWorkflowReference
"""Run a pipeline using the python API"""
# Define the actual workflows to be run, this is identical to the python api
# but we're defining the workflows to be run via python instead of via a config file
workflows: list[PipelineWorkflowReference] = [
    # run my_workflow against the dataset, notice we're only using the "my_workflow" workflow
    # and not the "my_unused_workflow" workflow
    PipelineWorkflowReference(
        name="my_workflow",  # should match the name of the workflow in the custom_workflows dict above
        config={  # pass in a config
            # set the derive_output_column to be "col_1_multiplied",  this will be passed to the workflow definition above
            "derive_output_column": "col_1_multiplied"
        },
    ),
]


##### Run the pipeline

In [8]:
from graphrag.index import run_pipeline


# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline(
    workflows, dataset=dataset, additional_workflows=custom_workflows
):
    tables.append(table)
pipeline_result = tables[-1]

if pipeline_result.result is not None:
    # Should look something like this:
    #    col1  col2  col_1_multiplied
    # 0     2     4                 8
    # 1     5    10                50
    print(pipeline_result.result)
else:
    print("No results!")


   col1  col2  col_1_multiplied
0     2     4                 8
1     5    10                50
