<a href="https://colab.research.google.com/gist/mpuig/d4a69eb1f08f24a7f5f36748bc300edb/chatbot-summit-berlin-2024-workshop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

The processing and analysis of customer service interactions are essential for understanding customer needs and improving service delivery. This project aims to use LLMs to analyze customer call transcripts effectively.

## Use Case: Insights from Call Transcripts

The goal is to analyze customer interactions to understand the nuances of customer interactions. By doing so, we aim to:

- **Objective**: Analyze customer conversations to uncover underlying needs, concerns, and perceptions.
- **Benefit**: Provide insights for data-driven decisions and strategy refinement.
- **Application**: Employ LLMs to efficiently process, analyze, and interpret the vast amounts of data contained in call transcripts. This will enable us to identify key trends, gauge sentiment, and detect compliance or training gaps.

## User Story: Predictive Analytics for Risk Management

Consider the perspective of a Data Scientist working in the Risk Management Department, tasked with a critical challenge:

```
As a Data Scientist in the Risk Management Department,
I need to apply predictive analytics to customer call transcripts,
To predict client compliance with credit obligations.
```

Success in this area means better-targeted interventions and risk management strategies, leading to more personalized communication and efficient resource allocation.

## Key Concepts

A brief overview of relevant terms:

- **EMI (Equated Monthly Installment)**: Regular payments made to repay a loan over a designated period.



# Environment setup

- Install necessary python packages
- Get some useful secrets from Google Colab userdata
- Set the environment variables to use OpenAI LLMs

In [1]:
!pip3 install langchain==0.1.11 langchain-core==0.1.29 langchain-community==0.0.25 langchain-openai==0.0.8 --quiet
!pip3 install simplejson pydantic --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m807.5/807.5 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m252.6/252.6 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.9/70.9 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.0/53.0 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m257.5/257.5 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.4/49.4 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import os
import simplejson as json

from google.colab import userdata
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# Language models to use
GPT4_0125 = "gpt-4-0125-preview"
GPT35TURBO0125 = "gpt-3.5-turbo-0125"

# OpenAI SECRETS
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
OPENAI_API_ORG = userdata.get('OPENAI_API_ORG')

print(f"OpenAI API Key: sk-...{OPENAI_API_KEY[-5:]}")
print(f"OpenAI API Org: org-...{OPENAI_API_ORG[-5:]}")

OpenAI API Key: sk-...3kTNI
OpenAI API Org: org-...TrCdV


# Sample Data

Below is a sample call transcript from a financial services call center interaction between an agent and a client. This transcript will serve as our primary example throughout the workshop.

In [3]:
CALL_TRANSCRIPT = """\
Agent: I'm calling from Global United Bank regarding your two-wheeler loan.
Customer: Who is speaking?
Agent: My name is Emily Roberts, calling from Global United Bank.
Customer: Yes.
Agent: To confirm, I'm speaking with John Smith?
Customer: Yes, yes.
Agent: This call is being recorded for quality and training purposes. It appears you have an outstanding two-wheeler loan payment.
Customer: Yes, yes.
Agent: The last four digits of your loan account are 6059. Your EMI was due on the 3rd, and you have yet to make a payment of 120€.
Customer: I understand. I've been out of town, but my son will return in two days to make the payment.
Agent: It's important to make the payment as soon as possible. Have you considered using Phone Pay or Google Pay for an immediate payment?
Customer: I will discuss this with my son as soon as I return home.
Agent: Please, let's try to resolve this now. Can we initiate the payment during this call?
Customer: I will speak with my son and ensure it's done.
Agent: I urge you to make the payment now for immediate resolution. Shall we proceed with setting up the payment?
Customer: I will handle it, thank you.
"""

In this example the customer does not explicitly demonstrate financial inability, express dissatisfaction with the bank or the product, or refuse to make a payment.

Instead, the customer acknowledges the payment due and mentions a plan to make the payment ("*my son will return in two days to make the payment*"). This response suggests an intention to pay but indicates a delay due to personal circumstances, possibly indicating forgetfulness or a temporary inability to make the payment immediately due to being out of town.

# 1st Approach - Brute Force with GPT-4

The initial strategy is to use brute force analysis with GPT-4, since it is (apparently) super smart.


## Prompts

In [4]:
SYSTEM_PROMPT_TEMPLATE = """\
You are a top expert in analyzing and understanding customers' call responses
to bank agents with regards to the customer not yet paying their loan dues.
You are also an expert in accurately identifying a customer's promise to pay
and the corresponding details of the promise.
"""

USER_PROMPT_TEMPLATE = """\
Examine the following call transcript between a bank agent and a customer (delimited by <ctx></ctx>).

<ctx>
{transcript}
</ctx>

The goal is to categorize the call strictly into one of the following list of categories:
- Inability to pay: the customer demonstrated financial inability to pay the outstanding loan dues
- Discipline to pay: the customer demonstrated discipline to pay outstanding loan dues.
- Unwilling to pay: the customer demonstrated unwillingness to pay outstanding loan dues.

Just return the category, without any additional details. For example: Inability to pay
"""

## Chains

For the following examples we'll use LCEL (LangChain Expression Language), that makes it easy to build complex chains from basic components.

More info here: https://python.langchain.com/docs/expression_language

In [5]:
prompt = ChatPromptTemplate.from_messages([
    ("system", SYSTEM_PROMPT_TEMPLATE),
    ("user", USER_PROMPT_TEMPLATE)
])

llm = ChatOpenAI(
    model=GPT35TURBO0125,
    openai_api_key=OPENAI_API_KEY,
    openai_organization=OPENAI_API_ORG,
    temperature=0,
    model_kwargs={"seed": 42},
)

# The output of a ChatModel (and therefore, of this chain) is a message.
# However, it's often much more convenient to work with strings.
# Let's add a simple output parser to convert the chat message to a string.
output_parser = StrOutputParser()

# We can now combine these into a simple LLM chain:
chain = prompt | llm | output_parser

# In this chain the user input is passed to the prompt template, then the prompt template output is passed to
# the model, then the model output is passed to the output parser.
result = chain.invoke({"transcript": CALL_TRANSCRIPT})

print(f"Response -> {result}")

Response -> Discipline to pay


## Function calls

Using OpenAI function calls for parsing results offers a blend of flexibility that can significantly enhance the ability to process and analyze text data. More info: https://platform.openai.com/docs/guides/function-calling

In our endeavor to understand customer interactions and commitments regarding loan payments, we introduce a structured function definition, `FUNCTION_REASON_NO_PAY`. This function is designed to assess whether a customer has made any form of commitment or promise to pay their outstanding loan dues.


In [6]:
FUNCTION_REASON_NO_PAY = {
    "name": "reason_no_pay",
    "description": "Determine whether the customer made some form of a promise to pay the loan dues",
    "parameters": {
        "type": "object",
        "properties": {
            "answer": {
                "type": "string",
                "enum": ["yes", "no"],
                "description": "Answer Yes or No as to whether a promise to pay the outstanding loan dues was made",
            },
            "details": {
                "type": "string",
                "description": "Medium-form detailed summary of the customer's promise to pay outstanding loan dues this month",
            }
        },
        "required": ["answer", "details"]
    }
}

To apply the function `reason_no_pay` as part of a sequence of operations or "Runnables" we can use `Runnable.bind()`. It invokes a Runnable within a Runnable sequence with constant arguments that are not part of the output of the preceding Runnable in the sequence, and which are not part of the user input.

In [7]:
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser

openai_functions = [FUNCTION_REASON_NO_PAY]

bind = llm.bind(
    function_call={"name": "reason_no_pay"},
    functions=openai_functions,
)

chain = prompt | bind | JsonOutputFunctionsParser()

result = chain.invoke({"transcript": CALL_TRANSCRIPT})

print(json.dumps(result, indent=4, sort_keys=True))

{
    "answer": "no",
    "details": "The customer mentioned that their son will return in two days to make the payment and they will handle it."
}


## Function Calls based on Pydantic Models

Since we're in an OpenAI Context, we can use Pydantic models to define a function call and also to validate the obtained results.

The `pydantic` model serves as a structured way to validate and document the expected input and output for such operations, ensuring that data passed to and from the API meets the application's requirements.

In [8]:
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import Literal


class QuestionReasonNoPay(BaseModel):
    """Identify and summarize reason for the customer not yet paying the loan dues for this month"""

    answer: Literal["yes", "no"] = Field(
        description="Whether a reason was provided by the customer for them not yet paying the loan dues"
    )
    details: str = Field(
        description="Medium-form detailed summary of the customer's reason for not yet paying loan dues"
    )

In [9]:
from langchain_core.utils.function_calling import convert_to_openai_function

openai_functions = [convert_to_openai_function(QuestionReasonNoPay)]

bind = llm.bind(
    function_call={"name": "QuestionReasonNoPay"},
    functions=openai_functions
)

chain = prompt | bind | JsonOutputFunctionsParser()

result = chain.invoke({"transcript": CALL_TRANSCRIPT})

QuestionReasonNoPay(**result)

# Check that we obtain the same function call object than in the previous step
# print(json.dumps(openai_functions, indent=4, sort_keys=True))

QuestionReasonNoPay(answer='no', details='The customer did not provide a reason for not yet paying the loan dues.')

## Extending the scope

We want to add more questions to the initial one:
 - Whether customer demonstrated discipline to pay outstanding loan dues
 - Whether customer demonstrated financial inability to pay outstanding loan due
 - Whether customer demonstrated unwillingness to pay outstanding loan dues

 The first attempt will consist in implementing it as a function call, but we can guess that this won't be the final option.

In [10]:
class QuestionReasonNoPayExtended(BaseModel):
    """Identify and summarize reason for the customer not yet paying the loan dues for this month"""

    # Original question, renamed
    reason_no_pay: Literal["yes", "no"] = Field(
        description="Whether a reason was provided by the customer for them not yet paying the loan dues"
    )
    reason_no_pay_details: str = Field(
        description="Mid-form detailed summary of the customer's reason for not yet paying loan dues"
    )

    # New questions here:

    # Inability to pay
    inability_pay: Literal["yes", "no"] = Field(
        description="Whether customer demonstrated inability to pay outstanding loan dues"
    )
    inability_pay_details: str = Field(
        description="Explanation for the inability_pay answer"
    )

    # Discipline to pay
    discipline_pay: Literal["yes", "no"] = Field(
        description="Whether customer demonstrated discipline to pay outstanding loan dues"
    )
    discipline_pay_details: str = Field(
        description="Explanation for the discipline_pay answer"
    )

    # Unwilling to Pay
    unwilling_pay: Literal["yes", "no"] = Field(
        description="Whether customer demonstrated unwillingness to pay outstanding loan dues"
    )
    unwilling_pay_details: str = Field(
        description="Explanation for the unwilling_pay answer"
    )

In [11]:
openai_functions = [convert_to_openai_function(QuestionReasonNoPayExtended)]

bind = llm.bind(
    function_call={"name": "QuestionReasonNoPayExtended"},
    functions=openai_functions
)

chain = prompt | bind | JsonOutputFunctionsParser()

result = chain.invoke({"transcript": CALL_TRANSCRIPT})

QuestionReasonNoPayExtended(**result)

QuestionReasonNoPayExtended(reason_no_pay='yes', reason_no_pay_details='Customer mentioned being out of town but son will return in two days to make the payment.', inability_pay='no', inability_pay_details='Customer indicated son will make the payment in two days.', discipline_pay='no', discipline_pay_details='Customer did not show immediate willingness to make the payment during the call.', unwilling_pay='no', unwilling_pay_details='Customer expressed intention to handle the payment later.')

## Outcome

- High computational demands leading to slow response times.
- Difficulty in extracting specific insights due to the broad analysis scope.

# 2nd Approach - Divide and Conquer

However, a one-size-fits-all approach to questioning can lead to inefficiencies and may not capture the nuanced perspectives of different customers. To address this, we propose introducing a more dynamic questioning strategy, focusing on directing follow-up questions that are relevant based on the customer's initial responses. By tailoring our inquiries to align with the customer's specific concerns—as indicated by their previous "YES" answers—we can gather more targeted and meaningful insights.

## Defining Follow-up Questions

We define a new user prompt, more generic than the previous one, and that accepts a `question` as a parameter.

In [12]:
USER_PROMPT_TEMPLATE_WITH_QUESTION = """\
Examine the following call transcript between a bank agent and a customer (delimited by <ctx></ctx>).

<ctx>
{transcript}
</ctx>

Think step-by-step in accurately answering the following question, and answer yes or no:

```
{question}
```

Do not ever give or use any information not mentioned in the transcript in your answers.
"""

prompt_with_question = ChatPromptTemplate.from_messages([
    ("system", SYSTEM_PROMPT_TEMPLATE),
    ("user", USER_PROMPT_TEMPLATE_WITH_QUESTION)
])

The `Question` class provides a blueprint for constructing questions and allows for a uniform approach to generating follow-up questions.

In [13]:
class Question(BaseModel):
    name: str
    description: str
    answer_description: str
    details_description: str

The `BinaryAnswer` model, is a Pydantic model designed to capture and structure the outcomes of our function calls:


In [14]:
class BinaryAnswer(BaseModel):
    name: str = Field(description="The identifier of the question being addressed")
    answer: Literal["yes", "no"] = Field(description="A binary response to the question, restricted to yes or no.")
    details: str = Field(description="Further elaboration on the answer provided.")

An array, `QUESTIONS`, contains instances of the `Question` class, each representing a specific follow-up question that may be relevant based on the initial response.

In [15]:
QUESTIONS = [
    Question(
        name="inability_to_pay",
        description="Determine whether customer demonstrated financial inability to pay the outstanding loan dues",
        answer_description="Whether customer demonstrated discipline to pay outstanding loan dues",
        details_description="Explanation for the answer"
    ),
    Question(
        name="discipline_to_pay",
        description="Determine whether customer demonstrated discipline to pay the outstanding loan dues",
        answer_description="Whether customer demonstrated discipline to pay outstanding loan dues",
        details_description="Explanation for the answer",
    ),
    Question(
        name="unwilling_to_pay",
        description="Determine whether customer demonstrated unwillingness to pay the outstanding loan dues",
        answer_description="Whether customer demonstrated unwillingness to pay outstanding loan dues",
        details_description="Explanation for the answer",
    )
]

The function `generate_function` takes a `Question` object as its input and produces a structured dictionary that corresponds to the OpenAI function call format:

In [16]:
def generate_function(question: Question) -> dict:
    return {
        "name": question.name,
        "description": question.description,
        "parameters": {
            "type": "object",
            "properties": {
                "answer": {
                    "type": "string",
                    "enum": ["yes", "no"],
                    "description": question.answer_description,
                },
                "details": {
                    "type": "string",
                    "description": question.details_description,
                }
            },
            "required": ["answer", "details"]
        }
    }

With the aim to obtain structured insights from the customer call transcript when the initial analysis yields a "yes" response, we proceed to integrate the previously defined `questions` and the `generate_function` function into a practical workflow. This workflow will dynamically generate and execute OpenAI function calls based on specific follow-up questions and processes the results.



## Processing Workflow

The workflow proceeds as follows, assuming that an initial question (`question1`) has been asked and answered with "yes":

1. **Iterate Through Follow-up Questions**: For each question in our predefined list (`QUESTIONS`), we dynamically construct a function call using the `generate_function` utility.

2. **Binding and Execution**:
   - For each question, we bind its corresponding OpenAI function call (including name and structured parameters) using `llm.bind`.
   - We then set up a chain of operations starting with a prompt that includes the question, followed by the bound function call, and finally, a parser (`JsonOutputFunctionsParser`) to interpret the JSON output.

3. **Invoke and Collect Results**:
   - The chain is invoked with the current transcript and the specific question being addressed.
   - The results are then structured according to the `BinaryAnswer` model, capturing the name of the question and the details of the response.


In [17]:
# We go back to the QuestionReasonNoPay, with just one yes/no question
openai_functions = [convert_to_openai_function(QuestionReasonNoPay)]

bind = llm.bind(
    function_call={"name": "QuestionReasonNoPay"},
    functions=openai_functions
)

chain = prompt | bind | JsonOutputFunctionsParser()

result = chain.invoke({"transcript": CALL_TRANSCRIPT})

question1 = QuestionReasonNoPay(**result)

results = []
if question1.answer == "yes":
    for question in QUESTIONS:
        bind = llm.bind(
            function_call={"name": question.name},
            functions=[generate_function(question)],
        )
        chain = prompt_with_question | bind | JsonOutputFunctionsParser()
        result = chain.invoke({"transcript": CALL_TRANSCRIPT, "question": question})
        results.append(BinaryAnswer(name=question.name, **result))


# Print the initial question's response with context
print(f"Initial Question Response:\n{question1}\n")

# Iterate through each result in the results list, printing with added clarity
for result in results:
    print(f"Question: {result.name}")
    print(f"Answer: {result.answer}")
    # Optionally, check if 'details' has meaningful content before printing
    if result.details.strip():
        print(f"Details: {result.details}")
    print("-" * 40)  # Add a separator for readability

Initial Question Response:
answer='no' details='The customer did not provide a reason for not yet paying the loan dues.'



## Outcome

- Marginal improvements in manageability but increased operational complexity and costs.
- Still slow and expensive.
- Segmented analysis complicated the synthesis of comprehensive insights.

# 3rd Approach - Logic Flow

From the previous steps we can simplify the process, making it more manegeable and interpretable, structuring the queries into a logical flow of yes/no questions, facilitating easier evaluation.

With a logic flow, we can decide to use more affordable LLM option like GPT3.5 to some questions. It'll reduce operational costs without significantly compromising analytical capabilities, making it a better choice for large(ish)-scale analysis.




## The Pipeline Configuration

As an example we'll create the following (simple) pipeline:

```
#
#        ┌--Y--> [inability_to_pay, discipline_to_pay, unwilling_to_pay]
#        |
# --> reason_no_pay
#        |
#        └--N--> None
#
```

A more complex one can contain:

- Add extraction modules to add data processing capabilities per step
- Specify different prompt templates per step
- More complex branching strategies
- Dynamic generation of questions from different sources (e.g. csv)
- Etc

Let's use YAML to define the pipeline:

In [18]:
YAML_PIPELINE = """\
name: dummy-pipeline
initial_step: reason_no_pay
steps:
  - name: reason_no_pay
    llm: gpt4
    question: Did the customer provided any reason to not paying the loan dues?
    next_steps:
      - if_llm_answer:
          has_value: "yes"
          go_to: [inability_to_pay, discipline_to_pay, unwilling_to_pay]
      - if_llm_answer:
          has_value: "no"
          go_to: []

  - name: inability_to_pay
    llm: gpt35
    question: Did the customer demonstrated inability to pay outstanding loan dues?

  - name: discipline_to_pay
    llm: gpt35
    question: Did the customer demonstrated discipline to pay outstanding loan dues?

  - name: unwilling_to_pay
    llm: gpt35
    question: Did the customer demonstrated unwillingness to pay outstanding loan dues.?
"""

The objective is to create a pipeline that adapts its path based on the responses from the LLM.

Let's use a structured approach to construct decision-driven workflows using `Pydantic`:

- `PipelineStepConfig` & `PipelineConfig`: Specify the structure for each step in the pipeline and the overall pipeline configuration, respectively.
- `NextStepsIfLLMAnswerHasValue`: Encapsulates the conditional logic for moving between steps, leveraging the LLM's responses.
- `IfAnswerHasValueGoTo`: Defines conditions for navigating to the next steps in the pipeline, based on the LLM's answers.


In [19]:
from typing import List, Optional, Union
from pydantic import BaseModel, Field


# Defines the structure for conditional navigation based on LLM's answer
class IfAnswerHasValueGoTo(BaseModel):
    has_value: Optional[str] = Field(default=None, description="Expected answer value to determine next step")
    go_to: List[str] = Field(default=[], description="List of next steps to go to if condition is met")


# Represents the conditional logic for moving between steps based on LLM's answers
class NextStepsIfLLMAnswerHasValue(BaseModel):
    if_llm_answer: IfAnswerHasValueGoTo


# Configuration for each step in the pipeline
class PipelineStepConfig(BaseModel):
    name: str = Field(description="The name of the step")
    llm: str = Field(description="The LLM to use for this step")
    question: str = Field(description="Question to be processed")
    next_steps: Optional[Union[str, List[Union[str, NextStepsIfLLMAnswerHasValue]]]] = Field(
        default=None, description="Conditional next steps based on LLM's answers")


# Overall pipeline configuration
class PipelineConfig(BaseModel):
    name: str = Field(description="Name of the pipeline")
    initial_step: str = Field(description="Identifier of the initial step")
    steps: List[PipelineStepConfig] = Field(description="Configuration for all steps in the pipeline")


To efficiently manage our pipeline's configuration, we introduce two critical processes: **compiling** the configuration from YAML and **loading** it for use in our application.

**NOTE**:

**These steps ensure our pipeline is both flexible and easily maintainable. In this example the content of the JSON file is minimal, but in a real case, it can contain user and system templates, function calls, extended params, and all kind of information needed to run the pipeline.**

### Compiling the Pipeline Configuration

The `build_config` function takes a YAML representation of our pipeline configuration and the target filename as inputs. It performs the following actions:

1. Parses the YAML string to a Python dictionary using `yaml.safe_load`.
2. Converts the dictionary to a `PipelineConfig` object, ensuring all data is valid according to our predefined model.
3. Saves this configuration object as a JSON file, making it easily accessible and modifiable for future use.

In [20]:
import yaml

def build_config(yaml_pipeline: str, filename: str) -> PipelineConfig:
    """Parses YAML pipeline configuration and saves it as a JSON file."""
    pipeline_config = PipelineConfig(**yaml.safe_load(yaml_pipeline))
    with open(filename, "w") as fp:
        json.dump(pipeline_config.model_dump(), fp, indent=4)
    print(f"{filename} file generated successfully!")
    return pipeline_config

### Loading the Pipeline Configuration

The `load_config` function reverses the process, reading the JSON file and converting it back into a `PipelineConfig` object. This enables us to work with a strongly typed configuration in our Python code, providing better error checking and editor support.

In [21]:
def load_config(filename: str = "pipeline.json") -> PipelineConfig:
    """Reads a JSON file and converts it into a PipelineConfig object."""
    with open(filename, "r") as fp:
        json_data = json.load(fp)
    return PipelineConfig.parse_obj(json_data)


To demonstrate these processes, we first compile our YAML pipeline configuration into a JSON file and then load this file to obtain a `PipelineConfig` object ready for use:

In [22]:
# Save the pipeline configuration as a JSON file
build_config(yaml_pipeline=YAML_PIPELINE, filename="pipeline.json")

# Load the configuration for use
pipeline = load_config(filename="pipeline.json")

pipeline.json file generated successfully!


## Running the Pipeline

### Step Execution

The `execute_step` function is designed to represent a crucial phase in our pipeline's operation, acting as a stand-in for interactions with LLMs. Within this context, it would typically dispatch queries to the LLM and process the responses received. However, for illustrative purposes, the current implementation of this function runs a very basic logic.

Actual implementation would involve intricate interactions with LLMs, encompassing data processing, application of business logic, and more. Ideally, this function would be dynamically instantiated through a factory pattern, enabling the seamless integration of diverse modules based on specific pipeline configurations.

In [23]:
def execute_step(step_name: str, question: str, llm: str, callbacks=[]) -> BinaryAnswer:
    q = Question(
        name=step_name,
        description="Answer and explanation to the question relating to the customer's non-payment of loan dues",
        answer_description=f"Yes/No answer to the question: {question}",
        details_description="Short explanation for the answer provided",
    )

    model = {
        "gpt4": GPT4_0125,
        "gpt35": GPT35TURBO0125,
    }[llm]

    _llm = ChatOpenAI(
      model=model,
      openai_api_key=OPENAI_API_KEY,
      openai_organization=OPENAI_API_ORG,
      temperature=0,
      model_kwargs={"seed": 42},
      streaming=True,
    )

    bind = _llm.bind(
        function_call={"name": q.name},
        functions=[generate_function(q)],
    )

    chain = prompt_with_question | bind | JsonOutputFunctionsParser()

    result = chain.invoke(
        {"transcript": CALL_TRANSCRIPT, "question": question},
        {"callbacks": callbacks}
    )

    return BinaryAnswer(name=step_name, **result)

### Determining Next Steps

Based on the response from executing a step, `get_next_steps` decides which steps should follow. It examines the current step's configuration for next steps, checking conditions against the LLM's response to select the appropriate path forward.

In [24]:
def get_next_steps(step: str, response: BinaryAnswer):
    next_steps_result = []
    next_steps = step.next_steps

    if isinstance(next_steps, str):
        next_steps_result.append(next_steps)
    elif isinstance(next_steps, list):
        for next_step_condition in next_steps:
            if next_step_condition.if_llm_answer.has_value == response.answer:
                next_steps_result.extend(next_step_condition.if_llm_answer.go_to)
    return next_steps_result

### Running a Pipeline Step

The `run_pipeline_step` function integrates executing a step and determining its subsequent steps into a cohesive operation, returning the response and the next steps to be executed.

In [25]:
def run_pipeline_step(step: PipelineStepConfig, callbacks=[]):
    response = execute_step(step.name, step.question, step.llm, callbacks)
    next_steps = get_next_steps(step, response)
    return response, next_steps

### Orchestrating the Pipeline

Finally, `run_pipeline` orchestrates the entire pipeline execution process, managing the sequence of steps to be executed based on the initial configuration and adapting the flow dynamically in response to each step's outcome.

In [26]:
def run_pipeline(transcript: str, pipeline: PipelineConfig, callbacks=[]):
    steps_to_execute: List[str] = [pipeline.initial_step]
    responses = []

    while steps_to_execute:
        current_step_name = steps_to_execute.pop(0)
        # Find the step configuration by name
        current_step_config = next(filter(lambda s: s.name == current_step_name, pipeline.steps), None)
        if current_step_config is None:
            print(f"Step '{current_step_name}' not found in pipeline configuration.")
            continue

        response, next_steps = run_pipeline_step(current_step_config, callbacks)
        responses.append(response)
        steps_to_execute.extend(next_steps)

    return responses

## Final Execution of the Pipeline

After defining our pipeline's structure and the logic to execute it, we proceed to the final step: running the pipeline with an actual transcript. This process involves loading our previously compiled pipeline configuration from a JSON file and then executing the pipeline with a given transcript. The outcome is a list of responses based on the decisions made at each step in the pipeline.

Here's how you can perform this final execution:

In [27]:
# Load the compiled pipeline configuration
pipeline_config = load_config(filename="pipeline.json")

# Assuming 'CALL_TRANSCRIPT' contains the transcript to be processed
# Execute the pipeline with the loaded configuration and the transcript
responses = run_pipeline(transcript=CALL_TRANSCRIPT, pipeline=pipeline_config)

# Iterate through each result in the results list, printing with added clarity
for result in responses:
    print(f"Question: {result.name}")
    print(f"Answer: {result.answer}")
    # Optionally, check if 'details' has meaningful content before printing
    if result.details.strip():
        print(f"Details: {result.details}")
    print("-" * 40)  # Add a separator for readability

Question: reason_no_pay
Answer: yes
Details: The customer mentioned being out of town as the reason for not making the loan payment.
----------------------------------------
Question: inability_to_pay
Answer: no
Details: The customer did not demonstrate inability to pay outstanding loan dues. Instead, the customer mentioned that their son will return in two days to make the payment and also considered using Phone Pay or Google Pay for an immediate payment.
----------------------------------------
Question: discipline_to_pay
Answer: no
Details: The customer did not demonstrate immediate action or commitment to pay the outstanding loan dues during the call. Although the customer mentioned that their son will make the payment in two days, there was no clear promise or commitment to pay immediately or set up the payment during the call.
----------------------------------------
Question: unwilling_to_pay
Answer: no
Details: The customer did not demonstrate unwillingness to pay outstanding l

# Conclusions and Future Directions

The development of this dynamic pipeline represents a significant step towards creating adaptable and scalable data processing workflows. By leveraging the capabilities of LLMs, we've established a flexible foundation that can dynamically adjust its processing path based on real-time data analysis and decision-making.

## Enhancements and Extensions

To further increase the utility and efficiency of our pipeline, we propose several avenues for future development:

- **Expand Pipeline Configurations**: Introduce additional branching logic, data loading mechanisms, and prompt templates to cover a wider range of use cases and increase the pipeline's flexibility.
- **Incorporate Data Processing Modules**: By adding specialized data extraction and processing modules, each step of the pipeline can be enhanced to perform more complex operations, enriching the data as it flows through the pipeline.
- **Asynchronous LLM Calls**: Transitioning to asynchronous calls to LLMs can significantly improve the pipeline's performance, especially when scaling up to handle larger volumes of data.
- **Package Modularization**: Packaging the entire pipeline framework as a Python package will facilitate reuse, distribution, and deployment, making it easier to run at scale.
- **Evaluation Workflows**: Implementing a comprehensive evaluation workflow within the pipeline will allow for continuous monitoring and improvement of the decision-making logic based on performance metrics.

## Integration Example: Airflow DAG

To demonstrate the practical application and integration of our pipeline into larger data processing ecosystems, consider the following example of using it within an Airflow Directed Acyclic Graph (DAG):

```python
import asyncio
from airflow.decorators import task
from deepgenai import load_config, run_pipeline

with DAG(...) as dag:
  ...
  @task(task_id="transform_files")
  def transform_files(pipeline_config: str, batches: List[str]):
    ...
    loop = asyncio.get_event_loop()
    tasks = [
      run_pipeline(transcript, pipeline_config)
      for transcript in batches
    ]
    loop.run_until_complete(asyncio.gather(*tasks))

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

batches = get_file_batches()
pipeline_config = get_pipeline_config()
transform_task = transform_files.partial(pipeline_config=pipeline_config).expand(transcript_batch=batches)

start >> [batches, pipeline_config] >> transform_task >> end
```

# Extra Bonus - Real-Time feedback

In some cases we might want to get results during the pipeline execution and not waiting until the end of the run process. This can be useful in some cases where feedback needs to be provided as soon as is available, for example for real-time processing tasks.

The following block of code shows how to implement the functionality using [LangChain Callbacks](https://python.langchain.com/docs/modules/callbacks/) and calling the same `run_pipeline` function than previously, just adding the callbacks list.

In [28]:
from typing import TYPE_CHECKING, Any, Dict, Optional

from langchain_core.outputs import LLMResult
from langchain.callbacks.base import BaseCallbackHandler

class MyCustomHandler(BaseCallbackHandler):
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        if isinstance(outputs, dict) and "answer" in outputs.keys():
            print("\n\033[1m> Finished chain.\033[0m", str(outputs))

callbacks = [
    MyCustomHandler(),
]

responses = run_pipeline(transcript=CALL_TRANSCRIPT, pipeline=pipeline_config, callbacks=callbacks)


[1m> Finished chain.[0m {'answer': 'yes', 'details': 'The customer mentioned being out of town as the reason for not making the loan payment.'}

[1m> Finished chain.[0m {'answer': 'yes', 'details': 'The customer mentioned being out of town as the reason for not making the loan payment.'}

[1m> Finished chain.[0m {'answer': 'no', 'details': 'The customer did not demonstrate inability to pay outstanding loan dues. Instead, the customer mentioned that their son will return in two days to make the payment and expressed intentions to handle the payment later.'}

[1m> Finished chain.[0m {'answer': 'no', 'details': 'The customer did not demonstrate inability to pay outstanding loan dues. Instead, the customer mentioned that their son will return in two days to make the payment and expressed intentions to handle the payment later.'}

[1m> Finished chain.[0m {'answer': 'no', 'details': 'The customer did not demonstrate discipline to pay outstanding loan dues. Although the customer ac