From 4bb8fa39aa23cf899d5110f16eb7353bb810cb49 Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 12:19:59 +0100 Subject: [PATCH 1/8] openai streaming support --- langfuse/openai.py | 80 +++++++++++++++++++++++++++++++++++++---- tests/test_openai.py | 86 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 158 insertions(+), 8 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 6a1e6bc58..377315554 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -1,5 +1,7 @@ +import logging import threading from datetime import datetime +import types from typing import Optional @@ -10,6 +12,8 @@ import openai from wrapt import wrap_function_wrapper +logging = logging.getLogger("langfuse") + class OpenAiDefinition: module: str @@ -101,6 +105,8 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus if metadata is not None and not isinstance(metadata, dict): raise TypeError("metadata must be a dictionary") + model = kwargs.get("model", None) + prompt = None if resource.type == "completion": prompt = kwargs.get("prompt", None) @@ -123,10 +129,59 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus "presence_penalty": kwargs.get("presence_penalty", 0), } - return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters) + return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters, model=model) + + +def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response: openai.Stream, generation: InitialGeneration, langfuse: Langfuse): + final_response = [] if resource.type == "chat" else "" + model = None + for i in response: + logging.warning(f"response, {i}") + + if _is_openai_v1(): + i = i.__dict__ + + model = i.get("model", None) if model is None else model + + logging.info(f"choices, {i.get('choices')}") + choices = i.get("choices", []) + + for choice in choices: + if _is_openai_v1(): + choice = choice.__dict__ + if resource.type == "chat": + delta = choice.get("delta", None) + if _is_openai_v1(): + delta = delta.__dict__ -def _get_langfuse_data_from_response(resource: OpenAiDefinition, response): + if delta.get("role", None) is not None: + final_response.append({"role": delta.get("role", None), "function_call": None, "tool_calls": None, "content": None}) + + elif delta.get("content", None) is not None: + final_response[-1]["content"] = delta.get("content", None) if final_response[-1]["content"] is None else final_response[-1]["content"] + delta.get("content", None) + + elif delta.get("function_call", None) is not None: + final_response[-1]["function_call"] = ( + delta.get("function_call", None) if final_response[-1]["function_call"] is None else final_response[-1]["function_call"] + delta.get("function_call", None) + ) + elif delta.get("tools_call", None) is not None: + final_response[-1]["tool_calls"] = delta.get("tools_call", None) if final_response[-1]["tool_calls"] is None else final_response[-1]["tool_calls"] + delta.get("tools_call", None) + if resource.type == "completion": + final_response += choice.get("text", None) + + print("final_response", final_response) + yield i + + print(final_response) + new_generation = generation.copy(update={"end_time": datetime.now(), "completion": {"choices": final_response} if resource.type == "chat" else final_response}) + if model is not None: + new_generation = new_generation.copy(update={"model": model}) + logging.warning(f"new_generation, {new_generation}") + langfuse.generation(new_generation) + + +def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): model = response.get("model", None) completion = None @@ -151,6 +206,10 @@ def _is_openai_v1(): return StrictVersion(openai.__version__) >= StrictVersion("1.0.0") +def _is_streaming_response(response): + return isinstance(response, types.GeneratorType) or (_is_openai_v1() and isinstance(response, openai.Stream)) + + @_with_tracer_wrapper def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wrapped, instance, args, kwargs): new_langfuse = initialize() @@ -161,11 +220,18 @@ def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wr generation = _get_langfuse_data_from_kwargs(open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()) updated_generation = generation try: - result = wrapped(**arg_extractor.get_openai_args()) - model, completion, usage = _get_langfuse_data_from_response(open_ai_resource, result.__dict__ if _is_openai_v1() else result) - updated_generation = generation.copy(update={"model": model, "completion": completion, "end_time": datetime.now(), "usage": usage}) - new_langfuse.generation(updated_generation) - return result + logging.warning(f"wrapped {wrapped}, {kwargs}, {_is_openai_v1()}") + openai_response = wrapped(**arg_extractor.get_openai_args()) + + if _is_streaming_response(openai_response): + logging.warning(f"streaming response {openai_response}") + return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, updated_generation, new_langfuse) + + else: + model, completion, usage = _get_langfuse_data_from_default_response(open_ai_resource, openai_response.__dict__ if _is_openai_v1() else openai_response) + updated_generation = generation.copy(update={"model": model, "completion": completion, "end_time": datetime.now(), "usage": usage}) + new_langfuse.generation(updated_generation) + return openai_response except Exception as ex: model = kwargs.get("model", None) new_langfuse.generation(updated_generation.copy(update={"end_time": datetime.now(), "status_message": str(ex), "level": "ERROR", "model": model})) diff --git a/tests/test_openai.py b/tests/test_openai.py index f889a96d1..56d357e4f 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -2,7 +2,7 @@ import pytest from langfuse.client import Langfuse from langfuse.model import CreateTrace -from langfuse.openai import _is_openai_v1, openai +from langfuse.openai import _is_openai_v1, _is_streaming_response, openai from tests.utils import create_uuid, get_api @@ -51,6 +51,48 @@ def test_openai_chat_completion(): assert generation.data[0].total_tokens is not None +def test_openai_chat_completion_stream(): + api = get_api() + generation_name = create_uuid() + completion = chat_func( + name=generation_name, + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "1 + 1 = "}], + temperature=0, + metadata={"someKey": "someResponse"}, + stream=True, + ) + + assert _is_streaming_response(completion) + for i in completion: + print(i) + + openai.flush_langfuse() + + generation = api.observations.get_many(name=generation_name, type="GENERATION") + + assert len(generation.data) != 0 + assert generation.data[0].name == generation_name + assert generation.data[0].metadata == {"someKey": "someResponse"} + + assert generation.data[0].input == [{"content": "1 + 1 = ", "role": "user"}] + assert generation.data[0].type == "GENERATION" + assert generation.data[0].model == "gpt-3.5-turbo-0613" + assert generation.data[0].start_time is not None + assert generation.data[0].end_time is not None + assert generation.data[0].start_time < generation.data[0].end_time + assert generation.data[0].model_parameters == { + "temperature": 0, + "top_p": 1, + "frequency_penalty": 0, + "maxTokens": "inf", + "presence_penalty": 0, + } + assert generation.data[0].prompt_tokens is not None + assert generation.data[0].completion_tokens is not None + assert generation.data[0].total_tokens is not None + + def test_openai_chat_completion_with_trace(): api = get_api() generation_name = create_uuid() @@ -207,6 +249,48 @@ def test_openai_completion(): assert generation.data[0].total_tokens is not None +def test_openai_completion_stream(): + api = get_api() + generation_name = create_uuid() + completion = completion_func( + name=generation_name, + model="gpt-3.5-turbo-instruct", + prompt="1 + 1 = ", + temperature=0, + metadata={"someKey": "someResponse"}, + stream=True, + ) + + assert _is_streaming_response(completion) + for i in completion: + print(i) + + openai.flush_langfuse() + + generation = api.observations.get_many(name=generation_name, type="GENERATION") + + assert len(generation.data) != 0 + assert generation.data[0].name == generation_name + assert generation.data[0].metadata == {"someKey": "someResponse"} + + assert generation.data[0].input == "1 + 1 = " + assert generation.data[0].type == "GENERATION" + assert generation.data[0].model == "gpt-3.5-turbo-instruct" + assert generation.data[0].start_time is not None + assert generation.data[0].end_time is not None + assert generation.data[0].start_time < generation.data[0].end_time + assert generation.data[0].model_parameters == { + "temperature": 0, + "top_p": 1, + "frequency_penalty": 0, + "maxTokens": "inf", + "presence_penalty": 0, + } + assert generation.data[0].prompt_tokens is not None + assert generation.data[0].completion_tokens is not None + assert generation.data[0].total_tokens is not None + + def test_openai_completion_fail(): api = get_api() generation_name = create_uuid() From f22abd001683ceb971a9a107143c659f06405336 Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 12:21:08 +0100 Subject: [PATCH 2/8] rename things --- langfuse/openai.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 377315554..944c8c73b 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -75,16 +75,14 @@ def get_openai_args(self): return self.kwargs -def _with_tracer_wrapper(func): - """Helper for providing tracer for wrapper functions.""" - - def _with_tracer(open_ai_definitions, langfuse, initialize): +def _langfuse_wrapper(func): + def _with_langfuse(open_ai_definitions, langfuse, initialize): def wrapper(wrapped, instance, args, kwargs): return func(open_ai_definitions, langfuse, initialize, wrapped, instance, args, kwargs) return wrapper - return _with_tracer + return _with_langfuse def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs): @@ -210,7 +208,7 @@ def _is_streaming_response(response): return isinstance(response, types.GeneratorType) or (_is_openai_v1() and isinstance(response, openai.Stream)) -@_with_tracer_wrapper +@_langfuse_wrapper def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wrapped, instance, args, kwargs): new_langfuse = initialize() From fe8fe98b8910bc1ae66178a5bec066244084daa7 Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 13:28:55 +0100 Subject: [PATCH 3/8] changes --- langfuse/openai.py | 26 ++++++------- poetry.lock | 73 ++++++----------------------------- pyproject.toml | 2 +- tests/test_openai.py | 92 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 75 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 944c8c73b..2554c6495 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -1,4 +1,3 @@ -import logging import threading from datetime import datetime import types @@ -12,8 +11,6 @@ import openai from wrapt import wrap_function_wrapper -logging = logging.getLogger("langfuse") - class OpenAiDefinition: module: str @@ -106,6 +103,7 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus model = kwargs.get("model", None) prompt = None + if resource.type == "completion": prompt = kwargs.get("prompt", None) elif resource.type == "chat": @@ -130,18 +128,15 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters, model=model) -def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response: openai.Stream, generation: InitialGeneration, langfuse: Langfuse): +def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: InitialGeneration, langfuse: Langfuse): final_response = [] if resource.type == "chat" else "" model = None for i in response: - logging.warning(f"response, {i}") - if _is_openai_v1(): i = i.__dict__ model = i.get("model", None) if model is None else model - logging.info(f"choices, {i.get('choices')}") choices = i.get("choices", []) for choice in choices: @@ -168,14 +163,21 @@ def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, respo if resource.type == "completion": final_response += choice.get("text", None) - print("final_response", final_response) yield i - print(final_response) - new_generation = generation.copy(update={"end_time": datetime.now(), "completion": {"choices": final_response} if resource.type == "chat" else final_response}) + def get_response_for_chat(): + if len(final_response) > 0: + if final_response[-1].get("content", None) is not None: + return final_response[-1]["content"] + elif final_response[-1].get("function_call", None) is not None: + return final_response[-1]["function_call"] + elif final_response[-1].get("tool_calls", None) is not None: + return final_response[-1]["tool_calls"] + return None + + new_generation = generation.copy(update={"end_time": datetime.now(), "completion": get_response_for_chat() if resource.type == "chat" else final_response}) if model is not None: new_generation = new_generation.copy(update={"model": model}) - logging.warning(f"new_generation, {new_generation}") langfuse.generation(new_generation) @@ -218,11 +220,9 @@ def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wr generation = _get_langfuse_data_from_kwargs(open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()) updated_generation = generation try: - logging.warning(f"wrapped {wrapped}, {kwargs}, {_is_openai_v1()}") openai_response = wrapped(**arg_extractor.get_openai_args()) if _is_streaming_response(openai_response): - logging.warning(f"streaming response {openai_response}") return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, updated_generation, new_langfuse) else: diff --git a/poetry.lock b/poetry.lock index f0ad25088..e81769f21 100644 --- a/poetry.lock +++ b/poetry.lock @@ -533,38 +533,6 @@ files = [ [package.dependencies] numpy = "*" -[[package]] -name = "chromadb" -version = "0.4.14" -description = "Chroma." -optional = false -python-versions = ">=3.7" -files = [ - {file = "chromadb-0.4.14-py3-none-any.whl", hash = "sha256:c1b59bdfb4b35a40bad0b8927c5ed757adf191ff9db2b9a384dc46a76e1ff10f"}, - {file = "chromadb-0.4.14.tar.gz", hash = "sha256:0fcef603bcf9c854305020c3f8d368c09b1545d48bd2bceefd51861090f87dad"}, -] - -[package.dependencies] -bcrypt = ">=4.0.1" -chroma-hnswlib = "0.7.3" -fastapi = ">=0.95.2" -graphlib-backport = {version = ">=1.0.3", markers = "python_version < \"3.9\""} -grpcio = ">=1.58.0" -importlib-resources = "*" -numpy = {version = ">=1.22.5", markers = "python_version >= \"3.8\""} -onnxruntime = ">=1.14.1" -overrides = ">=7.3.1" -posthog = ">=2.4.0" -pulsar-client = ">=3.1.0" -pydantic = ">=1.9" -pypika = ">=0.48.9" -requests = ">=2.28" -tokenizers = ">=0.13.2" -tqdm = ">=4.65.0" -typer = ">=0.9.0" -typing-extensions = ">=4.5.0" -uvicorn = {version = ">=0.18.3", extras = ["standard"]} - [[package]] name = "chromadb" version = "0.4.17" @@ -1821,25 +1789,25 @@ sympy = "*" [[package]] name = "openai" -version = "1.3.0" -description = "The official Python library for the openai API" +version = "0.27.8" +description = "Python client library for the OpenAI API" optional = false python-versions = ">=3.7.1" files = [ - {file = "openai-1.3.0-py3-none-any.whl", hash = "sha256:b4cde12417ab7a9d5e9326ca285f1833dd31c68ac05a68d24f95f93312ef9e82"}, - {file = "openai-1.3.0.tar.gz", hash = "sha256:51d9ccd0611fd8567ff595e8a58685c20a4710763d42f6bd968e1fb630993f25"}, + {file = "openai-0.27.8-py3-none-any.whl", hash = "sha256:e0a7c2f7da26bdbe5354b03c6d4b82a2f34bd4458c7a17ae1a7092c3e397e03c"}, + {file = "openai-0.27.8.tar.gz", hash = "sha256:2483095c7db1eee274cebac79e315a986c4e55207bb4fa7b82d185b3a2ed9536"}, ] [package.dependencies] -anyio = ">=3.5.0,<4" -distro = ">=1.7.0,<2" -httpx = ">=0.23.0,<1" -pydantic = ">=1.9.0,<3" -tqdm = ">4" -typing-extensions = ">=4.5,<5" +aiohttp = "*" +requests = ">=2.20" +tqdm = "*" [package.extras] -datalib = ["numpy (>=1)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)"] +datalib = ["numpy", "openpyxl (>=3.0.7)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)"] +dev = ["black (>=21.6b0,<22.0)", "pytest (==6.*)", "pytest-asyncio", "pytest-mock"] +embeddings = ["matplotlib", "numpy", "openpyxl (>=3.0.7)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "plotly", "scikit-learn (>=1.0.2)", "scipy", "tenacity (>=8.0.1)"] +wandb = ["numpy", "openpyxl (>=3.0.7)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "wandb"] [[package]] name = "opentelemetry-api" @@ -3114,23 +3082,6 @@ brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotl secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] -[[package]] -name = "urllib3" -version = "2.0.7" -description = "HTTP library with thread-safe connection pooling, file post, and more." -optional = false -python-versions = ">=3.7" -files = [ - {file = "urllib3-2.0.7-py3-none-any.whl", hash = "sha256:fdb6d215c776278489906c2f8916e6e7d4f5a9b602ccbcfdf7f016fc8da0596e"}, - {file = "urllib3-2.0.7.tar.gz", hash = "sha256:c97dfde1f7bd43a71c8d2a58e369e9b2bf692d1334ea9f9cae55add7d0dd0f84"}, -] - -[package.extras] -brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] -secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17.1.0)", "urllib3-secure-extra"] -socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] -zstd = ["zstandard (>=0.18.0)"] - [[package]] name = "uvicorn" version = "0.24.0.post1" @@ -3600,4 +3551,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "7423d68fcb347419145ecd1d5ef9846fa146f60152146bf20f3c4f565baa454d" +content-hash = "e73acd47c24dab42d07948e7a83e0098d0c6a5dfecae11c97237574433b4335e" diff --git a/pyproject.toml b/pyproject.toml index e6cd99f21..786412bff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ pydantic = ">=1.10.7, <3.0" pytz = "^2023.3" backoff = "^2.2.1" langchain = ">=0.0.309" -openai = ">=0.27.8" +openai = "0.27.8" wrapt = "1.14" monotonic = "^1.6" diff --git a/tests/test_openai.py b/tests/test_openai.py index 56d357e4f..8cc9ce928 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -49,6 +49,7 @@ def test_openai_chat_completion(): assert generation.data[0].prompt_tokens is not None assert generation.data[0].completion_tokens is not None assert generation.data[0].total_tokens is not None + assert generation.data[0].output == "2" def test_openai_chat_completion_stream(): @@ -91,6 +92,52 @@ def test_openai_chat_completion_stream(): assert generation.data[0].prompt_tokens is not None assert generation.data[0].completion_tokens is not None assert generation.data[0].total_tokens is not None + assert generation.data[0].output == "2" + + +def test_openai_chat_completion_stream_fail(): + api = get_api() + generation_name = create_uuid() + openai.api_key = "" + + with pytest.raises(expected_err, match=expected_err_msg): + chat_func( + name=generation_name, + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "1 + 1 = "}], + temperature=0, + metadata={"someKey": "someResponse"}, + stream=True, + ) + + openai.flush_langfuse() + + generation = api.observations.get_many(name=generation_name, type="GENERATION") + + assert len(generation.data) != 0 + assert generation.data[0].name == generation_name + assert generation.data[0].metadata == {"someKey": "someResponse"} + + assert generation.data[0].input == [{"content": "1 + 1 = ", "role": "user"}] + assert generation.data[0].type == "GENERATION" + assert generation.data[0].model == "gpt-3.5-turbo" + assert generation.data[0].start_time is not None + assert generation.data[0].end_time is not None + assert generation.data[0].start_time < generation.data[0].end_time + assert generation.data[0].model_parameters == { + "temperature": 0, + "top_p": 1, + "frequency_penalty": 0, + "maxTokens": "inf", + "presence_penalty": 0, + } + assert generation.data[0].prompt_tokens is not None + assert generation.data[0].completion_tokens is not None + assert generation.data[0].total_tokens is not None + assert generation.data[0].level == "ERROR" + assert expected_err_msg in generation.data[0].status_message + + openai.api_key = os.environ["OPENAI_API_KEY"] def test_openai_chat_completion_with_trace(): @@ -332,6 +379,51 @@ def test_openai_completion_fail(): openai.api_key = os.environ["OPENAI_API_KEY"] +def test_openai_completion_stream_fail(): + api = get_api() + generation_name = create_uuid() + openai.api_key = "" + + with pytest.raises(expected_err, match=expected_err_msg): + completion_func( + name=generation_name, + model="gpt-3.5-turbo", + prompt="1 + 1 = ", + temperature=0, + metadata={"someKey": "someResponse"}, + stream=True, + ) + + openai.flush_langfuse() + + generation = api.observations.get_many(name=generation_name, type="GENERATION") + + assert len(generation.data) != 0 + assert generation.data[0].name == generation_name + assert generation.data[0].metadata == {"someKey": "someResponse"} + + assert generation.data[0].input == "1 + 1 = " + assert generation.data[0].type == "GENERATION" + assert generation.data[0].model == "gpt-3.5-turbo" + assert generation.data[0].start_time is not None + assert generation.data[0].end_time is not None + assert generation.data[0].start_time < generation.data[0].end_time + assert generation.data[0].model_parameters == { + "temperature": 0, + "top_p": 1, + "frequency_penalty": 0, + "maxTokens": "inf", + "presence_penalty": 0, + } + assert generation.data[0].prompt_tokens is not None + assert generation.data[0].completion_tokens is not None + assert generation.data[0].total_tokens is not None + assert generation.data[0].level == "ERROR" + assert expected_err_msg in generation.data[0].status_message + + openai.api_key = os.environ["OPENAI_API_KEY"] + + def test_fails_wrong_name(): with pytest.raises(TypeError, match="name must be a string"): completion_func( From 8a0468154c183451da5eb70daf19a20c81aba305 Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 13:34:07 +0100 Subject: [PATCH 4/8] improvements --- tests/test_openai.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_openai.py b/tests/test_openai.py index 8cc9ce928..e83dd468e 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -136,6 +136,7 @@ def test_openai_chat_completion_stream_fail(): assert generation.data[0].total_tokens is not None assert generation.data[0].level == "ERROR" assert expected_err_msg in generation.data[0].status_message + assert generation.data[0].output is None openai.api_key = os.environ["OPENAI_API_KEY"] @@ -203,6 +204,7 @@ def test_openai_chat_completion_fail(): "maxTokens": "inf", "presence_penalty": 0, } + assert generation.data[0].output is None openai.api_key = os.environ["OPENAI_API_KEY"] @@ -294,6 +296,7 @@ def test_openai_completion(): assert generation.data[0].prompt_tokens is not None assert generation.data[0].completion_tokens is not None assert generation.data[0].total_tokens is not None + assert generation.data[0].output == "2\n\n1 + 2 = 3\n\n2 + 3 = " def test_openai_completion_stream(): @@ -336,6 +339,7 @@ def test_openai_completion_stream(): assert generation.data[0].prompt_tokens is not None assert generation.data[0].completion_tokens is not None assert generation.data[0].total_tokens is not None + assert generation.data[0].output == "2\n\n1 + 2 = 3\n\n2 + 3 = " def test_openai_completion_fail(): @@ -375,6 +379,7 @@ def test_openai_completion_fail(): "maxTokens": "inf", "presence_penalty": 0, } + assert generation.data[0].output is None openai.api_key = os.environ["OPENAI_API_KEY"] @@ -420,6 +425,7 @@ def test_openai_completion_stream_fail(): assert generation.data[0].total_tokens is not None assert generation.data[0].level == "ERROR" assert expected_err_msg in generation.data[0].status_message + assert generation.data[0].output is None openai.api_key = os.environ["OPENAI_API_KEY"] From a6aa6520d83e383e0bd83c55347e6511fd1d0e9c Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 14:05:16 +0100 Subject: [PATCH 5/8] push --- langfuse/task_manager.py | 1 - poetry.lock | 87 +++++++++++++++++++++++++++++++--------- pyproject.toml | 2 +- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/langfuse/task_manager.py b/langfuse/task_manager.py index d5cad015d..1730cc0a2 100644 --- a/langfuse/task_manager.py +++ b/langfuse/task_manager.py @@ -66,7 +66,6 @@ def _next(self): break try: item = queue.get(block=True, timeout=self._flush_interval - elapsed) - self._log.debug("got item from queue", item) item_size = len(json.dumps(item, cls=DatetimeSerializer).encode()) self._log.debug(f"item size {item_size}") items.append(item) diff --git a/poetry.lock b/poetry.lock index e81769f21..98c16ae64 100644 --- a/poetry.lock +++ b/poetry.lock @@ -312,17 +312,17 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.29.1" +version = "1.29.2" description = "The AWS SDK for Python" optional = false python-versions = ">= 3.7" files = [ - {file = "boto3-1.29.1-py3-none-any.whl", hash = "sha256:192695305fa65012d21f78ee852b91cb56dd571e84d51fb71f756302bf19d23f"}, - {file = "boto3-1.29.1.tar.gz", hash = "sha256:20285ebf4e98b2905a88aeb162b4f77ff908b2e3e31038b3223e593789290aa3"}, + {file = "boto3-1.29.2-py3-none-any.whl", hash = "sha256:6617ac176efb21485ebc3a058a3a97feb1300141421ae3d1809562c4cac1d5f9"}, + {file = "boto3-1.29.2.tar.gz", hash = "sha256:f3024bba9ac980007ba7b5f28a9734d111fb5466e2426ac76c5edbd6dedd8db2"}, ] [package.dependencies] -botocore = ">=1.32.1,<1.33.0" +botocore = ">=1.32.2,<1.33.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.7.0,<0.8.0" @@ -331,13 +331,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.32.1" +version = "1.32.2" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">= 3.7" files = [ - {file = "botocore-1.32.1-py3-none-any.whl", hash = "sha256:1d9c0ff3eb7828a8bd8c5c7f12cd9d8c05c6fe4c616ef963fdaab538a0da3809"}, - {file = "botocore-1.32.1.tar.gz", hash = "sha256:fcf3cc2913afba8e5f7ebcc15e8f6bfae844ab64bf983bf5a6fe3bb54cce239d"}, + {file = "botocore-1.32.2-py3-none-any.whl", hash = "sha256:a68a33193d8cd59e3b2142bff632e562afc02f9c4417e3dcc81a6e1b1f47148e"}, + {file = "botocore-1.32.2.tar.gz", hash = "sha256:0e231524e9b72169fe0b8d9310f47072c245fb712778e0669f53f264f0e49536"}, ] [package.dependencies] @@ -533,6 +533,38 @@ files = [ [package.dependencies] numpy = "*" +[[package]] +name = "chromadb" +version = "0.4.14" +description = "Chroma." +optional = false +python-versions = ">=3.7" +files = [ + {file = "chromadb-0.4.14-py3-none-any.whl", hash = "sha256:c1b59bdfb4b35a40bad0b8927c5ed757adf191ff9db2b9a384dc46a76e1ff10f"}, + {file = "chromadb-0.4.14.tar.gz", hash = "sha256:0fcef603bcf9c854305020c3f8d368c09b1545d48bd2bceefd51861090f87dad"}, +] + +[package.dependencies] +bcrypt = ">=4.0.1" +chroma-hnswlib = "0.7.3" +fastapi = ">=0.95.2" +graphlib-backport = {version = ">=1.0.3", markers = "python_version < \"3.9\""} +grpcio = ">=1.58.0" +importlib-resources = "*" +numpy = {version = ">=1.22.5", markers = "python_version >= \"3.8\""} +onnxruntime = ">=1.14.1" +overrides = ">=7.3.1" +posthog = ">=2.4.0" +pulsar-client = ">=3.1.0" +pydantic = ">=1.9" +pypika = ">=0.48.9" +requests = ">=2.28" +tokenizers = ">=0.13.2" +tqdm = ">=4.65.0" +typer = ">=0.9.0" +typing-extensions = ">=4.5.0" +uvicorn = {version = ">=0.18.3", extras = ["standard"]} + [[package]] name = "chromadb" version = "0.4.17" @@ -1789,25 +1821,25 @@ sympy = "*" [[package]] name = "openai" -version = "0.27.8" -description = "Python client library for the OpenAI API" +version = "1.3.2" +description = "The official Python library for the openai API" optional = false python-versions = ">=3.7.1" files = [ - {file = "openai-0.27.8-py3-none-any.whl", hash = "sha256:e0a7c2f7da26bdbe5354b03c6d4b82a2f34bd4458c7a17ae1a7092c3e397e03c"}, - {file = "openai-0.27.8.tar.gz", hash = "sha256:2483095c7db1eee274cebac79e315a986c4e55207bb4fa7b82d185b3a2ed9536"}, + {file = "openai-1.3.2-py3-none-any.whl", hash = "sha256:97e2febbedc5f1308444d961df63aafb649efebf900d59dd3676fdede9bcd7b6"}, + {file = "openai-1.3.2.tar.gz", hash = "sha256:7904d8f029339931805a8962d88e955f9223a983a8fbd06e01ae40e14735362b"}, ] [package.dependencies] -aiohttp = "*" -requests = ">=2.20" -tqdm = "*" +anyio = ">=3.5.0,<4" +distro = ">=1.7.0,<2" +httpx = ">=0.23.0,<1" +pydantic = ">=1.9.0,<3" +tqdm = ">4" +typing-extensions = ">=4.5,<5" [package.extras] -datalib = ["numpy", "openpyxl (>=3.0.7)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)"] -dev = ["black (>=21.6b0,<22.0)", "pytest (==6.*)", "pytest-asyncio", "pytest-mock"] -embeddings = ["matplotlib", "numpy", "openpyxl (>=3.0.7)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "plotly", "scikit-learn (>=1.0.2)", "scipy", "tenacity (>=8.0.1)"] -wandb = ["numpy", "openpyxl (>=3.0.7)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)", "wandb"] +datalib = ["numpy (>=1)", "pandas (>=1.2.3)", "pandas-stubs (>=1.1.0.11)"] [[package]] name = "opentelemetry-api" @@ -3082,6 +3114,23 @@ brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotl secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] +[[package]] +name = "urllib3" +version = "2.0.7" +description = "HTTP library with thread-safe connection pooling, file post, and more." +optional = false +python-versions = ">=3.7" +files = [ + {file = "urllib3-2.0.7-py3-none-any.whl", hash = "sha256:fdb6d215c776278489906c2f8916e6e7d4f5a9b602ccbcfdf7f016fc8da0596e"}, + {file = "urllib3-2.0.7.tar.gz", hash = "sha256:c97dfde1f7bd43a71c8d2a58e369e9b2bf692d1334ea9f9cae55add7d0dd0f84"}, +] + +[package.extras] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17.1.0)", "urllib3-secure-extra"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] + [[package]] name = "uvicorn" version = "0.24.0.post1" @@ -3551,4 +3600,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "e73acd47c24dab42d07948e7a83e0098d0c6a5dfecae11c97237574433b4335e" +content-hash = "7423d68fcb347419145ecd1d5ef9846fa146f60152146bf20f3c4f565baa454d" diff --git a/pyproject.toml b/pyproject.toml index 786412bff..e6cd99f21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ pydantic = ">=1.10.7, <3.0" pytz = "^2023.3" backoff = "^2.2.1" langchain = ">=0.0.309" -openai = "0.27.8" +openai = ">=0.27.8" wrapt = "1.14" monotonic = "^1.6" From 7be88c158c52c7c76cb1c505da433925b6796919 Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 17:51:04 +0100 Subject: [PATCH 6/8] stuff --- langfuse/openai.py | 3 +-- tests/test_sdk_setup.py | 13 ++++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 2554c6495..40bac1605 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -255,9 +255,8 @@ def initialize(self): self._langfuse = Langfuse(public_key=openai.langfuse_public_key, secret_key=openai.langfuse_secret_key, host=openai.langfuse_host) return self._langfuse - @classmethod def flush(cls): - cls._instance._langfuse.flush() + cls._langfuse.flush() def register_tracing(self): resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 diff --git a/tests/test_sdk_setup.py b/tests/test_sdk_setup.py index 049bbc2af..d1bc55482 100644 --- a/tests/test_sdk_setup.py +++ b/tests/test_sdk_setup.py @@ -9,6 +9,9 @@ from langfuse.client import Langfuse from tests.test_task_manager import get_host +from langfuse.openai import _is_openai_v1, openai + +chat_func = openai.chat.completions.create if _is_openai_v1() else openai.ChatCompletion.create def test_langfuse_release(): @@ -196,18 +199,21 @@ def test_openai_default(): importlib.reload(langfuse) importlib.reload(langfuse.openai) + chat_func = openai.chat.completions.create if _is_openai_v1() else openai.ChatCompletion.create + public_key, secret_key, host = ( os.environ["LANGFUSE_PUBLIC_KEY"], os.environ["LANGFUSE_SECRET_KEY"], os.environ["LANGFUSE_HOST"], ) - openai.chat.completions.create( + chat_func( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "1 + 1 = "}], temperature=0, metadata={"someKey": "someResponse"}, ) + openai.flush_langfuse() assert modifier._langfuse.client._client_wrapper._username == public_key assert modifier._langfuse.client._client_wrapper._password == secret_key @@ -224,9 +230,10 @@ def test_openai_configured(httpserver: HTTPServer): importlib.reload(langfuse) importlib.reload(langfuse.openai) - from langfuse.openai import modifier, openai + chat_func = openai.chat.completions.create if _is_openai_v1() else openai.ChatCompletion.create + public_key, secret_key, original_host = ( os.environ["LANGFUSE_PUBLIC_KEY"], os.environ["LANGFUSE_SECRET_KEY"], @@ -241,7 +248,7 @@ def test_openai_configured(httpserver: HTTPServer): openai.langfuse_secret_key = "sk-lf-asdfghjkl" openai.langfuse_host = host - openai.chat.completions.create( + chat_func( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "1 + 1 = "}], temperature=0, From dcb7788f9befaec7c1d947084e68ee6c5f3f26d2 Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 18:06:06 +0100 Subject: [PATCH 7/8] push --- langfuse/openai.py | 11 +++++++++-- tests/test_openai.py | 2 ++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 40bac1605..a80414b3d 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -131,7 +131,12 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: InitialGeneration, langfuse: Langfuse): final_response = [] if resource.type == "chat" else "" model = None - for i in response: + completion_start_time = None + for index, i in enumerate(response): + print(index) + if index == 0: + completion_start_time = datetime.now() + if _is_openai_v1(): i = i.__dict__ @@ -175,7 +180,9 @@ def get_response_for_chat(): return final_response[-1]["tool_calls"] return None - new_generation = generation.copy(update={"end_time": datetime.now(), "completion": get_response_for_chat() if resource.type == "chat" else final_response}) + new_generation = generation.copy( + update={"end_time": datetime.now(), "completion": get_response_for_chat() if resource.type == "chat" else final_response, "completion_start_time": completion_start_time} + ) if model is not None: new_generation = new_generation.copy(update={"model": model}) langfuse.generation(new_generation) diff --git a/tests/test_openai.py b/tests/test_openai.py index e83dd468e..fe8d05520 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -93,6 +93,7 @@ def test_openai_chat_completion_stream(): assert generation.data[0].completion_tokens is not None assert generation.data[0].total_tokens is not None assert generation.data[0].output == "2" + assert generation.data[0].completion_start_time is not None def test_openai_chat_completion_stream_fail(): @@ -340,6 +341,7 @@ def test_openai_completion_stream(): assert generation.data[0].completion_tokens is not None assert generation.data[0].total_tokens is not None assert generation.data[0].output == "2\n\n1 + 2 = 3\n\n2 + 3 = " + assert generation.data[0].completion_start_time is not None def test_openai_completion_fail(): From 230eca310b3352d0553de79362ac066a0962148f Mon Sep 17 00:00:00 2001 From: Max Deichmann Date: Fri, 17 Nov 2023 18:15:34 +0100 Subject: [PATCH 8/8] use update --- langfuse/openai.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index a80414b3d..ac16a8397 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -5,12 +5,14 @@ from langfuse import Langfuse -from langfuse.client import InitialGeneration, CreateTrace +from langfuse.client import InitialGeneration, CreateTrace, StatefulGenerationClient from distutils.version import StrictVersion import openai from wrapt import wrap_function_wrapper +from langfuse.model import UpdateGeneration + class OpenAiDefinition: module: str @@ -128,7 +130,7 @@ def _get_langfuse_data_from_kwargs(resource: OpenAiDefinition, langfuse: Langfus return InitialGeneration(name=name, metadata=metadata, trace_id=trace_id, start_time=start_time, prompt=prompt, modelParameters=modelParameters, model=model) -def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: InitialGeneration, langfuse: Langfuse): +def _get_lagnfuse_data_from_streaming_response(resource: OpenAiDefinition, response, generation: StatefulGenerationClient, langfuse: Langfuse): final_response = [] if resource.type == "chat" else "" model = None completion_start_time = None @@ -180,12 +182,10 @@ def get_response_for_chat(): return final_response[-1]["tool_calls"] return None - new_generation = generation.copy( - update={"end_time": datetime.now(), "completion": get_response_for_chat() if resource.type == "chat" else final_response, "completion_start_time": completion_start_time} - ) + update = UpdateGeneration(end_time=datetime.now(), completion=get_response_for_chat() if resource.type == "chat" else final_response, completion_start_time=completion_start_time) if model is not None: - new_generation = new_generation.copy(update={"model": model}) - langfuse.generation(new_generation) + update = update.copy(update={"model": model}) + generation.update(update) def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): @@ -225,21 +225,20 @@ def _wrap(open_ai_resource: OpenAiDefinition, langfuse: Langfuse, initialize, wr arg_extractor = OpenAiArgsExtractor(*args, **kwargs) generation = _get_langfuse_data_from_kwargs(open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()) - updated_generation = generation + generation = new_langfuse.generation(generation) try: openai_response = wrapped(**arg_extractor.get_openai_args()) if _is_streaming_response(openai_response): - return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, updated_generation, new_langfuse) + return _get_lagnfuse_data_from_streaming_response(open_ai_resource, openai_response, generation, new_langfuse) else: model, completion, usage = _get_langfuse_data_from_default_response(open_ai_resource, openai_response.__dict__ if _is_openai_v1() else openai_response) - updated_generation = generation.copy(update={"model": model, "completion": completion, "end_time": datetime.now(), "usage": usage}) - new_langfuse.generation(updated_generation) + generation.update(UpdateGeneration(model=model, completion=completion, end_time=datetime.now(), usage=usage)) return openai_response except Exception as ex: model = kwargs.get("model", None) - new_langfuse.generation(updated_generation.copy(update={"end_time": datetime.now(), "status_message": str(ex), "level": "ERROR", "model": model})) + generation.update(UpdateGeneration(endTime=datetime.now(), statusMessage=str(ex), level="ERROR", model=model)) raise ex