The purpose of this little notebook is to test my ideas on piping as implemented by langchain.

In [6]:
class CompoundPipeable:
    def __init__(self, first, second):
        self.first = first
        self.second = second
        
    def __or__(self, other):
        return CompoundPipeable(self, other)
    
    def invoke(self, value):
        return self.second.invoke(self.first.invoke(value))
    
    def __str__(self):
        return f'CompoundPipeable({self.first}, {self.second})'
    
class Pipeable:
    def __init__(self, value):
        self.value = value
    
    def __or__(self, other):
        return CompoundPipeable(self, other)

    def invoke(self, value):
        return f'{value} | {self.value}'
    
    def __str__(self) -> str:
        return f'Pipeable({self.value})'
    
testpipe = Pipeable('a') | Pipeable('b') | Pipeable('pipe') | Pipeable('end')

print(testpipe.invoke('test'))
print(testpipe)

test | a | b | pipe | end
CompoundPipeable(CompoundPipeable(CompoundPipeable(Pipeable(a), Pipeable(b)), Pipeable(pipe)), Pipeable(end))


That worked well. Lets extend the example by making it interruptable.

In [14]:
class PipeException(Exception):
    def __init__(self, message):
        super().__init__(message)

class CompoundPipeable:
    def __init__(self, first, second):
        self.first = first
        self.second = second
        
    def __or__(self, other):
        if not isinstance(other, (Pipeable, CompoundPipeable)):
            raise TypeError('Operands must be of type Pipeable or CompoundPipeable')
        return CompoundPipeable(self, other)
    
    def invoke(self, value):
        try:
            first_result = self.first.invoke(value)
        except PipeException as e:
            return f'{e}'
        
        return self.second.invoke(first_result)
    
class Pipeable:
    def __init__(self, value, stop_on = None):
        self.value = value
        self.stop_on = stop_on
    
    def __or__(self, other):
        return CompoundPipeable(self, other)

    def invoke(self, value):
        if self.stop_on and self.value == self.stop_on:
            raise PipeException(f'Execution interrupted at {self.value}')
        return f'{value} | {self.value}'
    
testpipe = Pipeable('a') | Pipeable('b') | Pipeable('pipe', stop_on = 'pipe') | Pipeable('end')

testpipe.invoke('test')

'Execution interrupted at pipe'

OK. That is all very good. Lets see if we can now extend this to langchain chains.

In [22]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable
from langchain_core.runnables.utils import Input, Output
from langchain_core.prompt_values import ChatPromptValue
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage

class CompoundRunnable(Runnable[Input, Output]):
    def __init__(self, first, second):
        self.first = first
        self.second = second
        
    def __or__(self, other):
        if not isinstance(other, Runnable):
            raise TypeError('Operands must be of type Runnable')
        return CompoundRunnable(self, other)
    
    def invoke(self, input: Input, config) -> Output:
        try:
            first_result = self.first.invoke(input, config)
        except PipeException as e:
            return f'{e}'
        
        return self.second.run(first_result, config)
    
class TestLengthRunnable(Runnable[Input, Output]):
    def __init__(self, length):
        self.length = length
        
    def __or__(self, other):
        if not isinstance(other, Runnable):
            raise TypeError('Operands must be of type Runnable')
        return CompoundRunnable(self, other)

    def _messages_to_string(self, _input):    
        messages = ''
        if isinstance(_input, ChatPromptValue):
            for message in _input.messages:
                if isinstance(message, HumanMessage):
                    messages += f'Human: {message.content}\n'
                elif isinstance(message, AIMessage):
                    messages += f'AI: {message.content}\n'
                elif isinstance(message, SystemMessage):
                    messages += f'System: {message.content}\n'
                else:
                    raise ValueError(f'Unexpected message type: {type(message)}')
        return messages


    def invoke(self, input: Input, config) -> Output:
        msg = self._messages_to_string(input)
        print(msg)
        if len(msg) > self.length:
            raise PipeException(f'Message too long: {len(msg)}')
        return Output()


In [23]:
chat_prompt = ChatPromptTemplate.from_messages([ 
    ('system', 'You are a very nice chatbot ' ),
    ('human', 'I am a very rough human' ),
    ('ai', 'Why are you so rough?')
    ])

chain = chat_prompt | TestLengthRunnable(10) | TestLengthRunnable(20)

chain.invoke({})


System: You are a very nice chatbot 
Human: I am a very rough human
AI: Why are you so rough?



PipeException: Message too long: 94

OK. So the problem is that the ChatPrompt returns a type of RunnableSequence - which does not catch our exception.

Lets try and do it using the @chain decorator

In [25]:
from langchain_core.runnables import chain

@chain
def test_chain_length(_input):    
    messages = ''
    if isinstance(_input, ChatPromptValue):
        for message in _input.messages:
            if isinstance(message, HumanMessage):
                messages += f'Human: {message.content}\n'
            elif isinstance(message, AIMessage):
                messages += f'AI: {message.content}\n'
            elif isinstance(message, SystemMessage):
                messages += f'System: {message.content}\n'
            else:
                raise ValueError(f'Unexpected message type: {type(message)}')
    if len(messages) > 20:
        raise PipeException(f'Message too long: {len(messages)}')
    return _input

chain = chat_prompt | test_chain_length

try:
    chain.invoke({})
except PipeException as e:
    print(e)


Message too long: 94


Now lets use RunnableLambdas

In [26]:
from langchain_core.runnables import RunnableLambda

def test_chain_length_lambda(_input):    
    messages = ''
    if isinstance(_input, ChatPromptValue):
        for message in _input.messages:
            if isinstance(message, HumanMessage):
                messages += f'Human: {message.content}\n'
            elif isinstance(message, AIMessage):
                messages += f'AI: {message.content}\n'
            elif isinstance(message, SystemMessage):
                messages += f'System: {message.content}\n'
            else:
                raise ValueError(f'Unexpected message type: {type(message)}')
    if len(messages) > 20:
        raise PipeException(f'Message too long: {len(messages)}')
    return _input

chain = chat_prompt | RunnableLambda(test_chain_length_lambda)

try:
    chain.invoke({})
except PipeException as e:
    print(e)


Message too long: 94


Great. Now lets check that the lambda could be a callable class.

In [28]:
from typing import Any


class TestLength:
    def __init__(self, length):
        self.length = length

    def __call__(self, _input):
        messages = ''
        if isinstance(_input, ChatPromptValue):
            for message in _input.messages:
                if isinstance(message, HumanMessage):
                    messages += f'Human: {message.content}\n'
                elif isinstance(message, AIMessage):
                    messages += f'AI: {message.content}\n'
                elif isinstance(message, SystemMessage):
                    messages += f'System: {message.content}\n'
                else:
                    raise ValueError(f'Unexpected message type: {type(message)}')
        if len(messages) > self.length:
            raise PipeException(f'Message too long: {len(messages)}')
        return _input
        
chain = chat_prompt | RunnableLambda(TestLength(10))

try:
    chain.invoke({})
except PipeException as e:
    print(e)


Message too long: 94
