From abfdb1c1dff11d23b891d4389eb10e95fb08f6dd Mon Sep 17 00:00:00 2001 From: Lalleh Rafeei Date: Tue, 19 Dec 2023 16:32:05 -0800 Subject: [PATCH] Modified mock server for OpenAI within LangChain Co-authored-by: Hannah Stepanek --- ...ver.py => _mock_external_openai_server.py} | 126 ++++++++------ tests/mlmodel_langchain/conftest.py | 158 +++++++++--------- tests/mlmodel_langchain/test_vectorstore.py | 70 +------- tox.ini | 2 +- 4 files changed, 165 insertions(+), 191 deletions(-) rename tests/mlmodel_langchain/{_mock_external_langchain_server.py => _mock_external_openai_server.py} (86%) diff --git a/tests/mlmodel_langchain/_mock_external_langchain_server.py b/tests/mlmodel_langchain/_mock_external_openai_server.py similarity index 86% rename from tests/mlmodel_langchain/_mock_external_langchain_server.py rename to tests/mlmodel_langchain/_mock_external_openai_server.py index e0ab34ada..b4dba7ebf 100644 --- a/tests/mlmodel_langchain/_mock_external_langchain_server.py +++ b/tests/mlmodel_langchain/_mock_external_openai_server.py @@ -14,10 +14,13 @@ import json +import pytest from testing_support.mock_external_http_server import MockExternalHTTPServer +from newrelic.common.package_version_utils import get_package_version_tuple + # This defines an external server test apps can make requests to instead of -# the real LangChain backend. This provides 3 features: +# the real OpenAI backend. This provides 3 features: # # 1) This removes dependencies on external websites. # 2) Provides a better mechanism for making an external call in a test app than @@ -27,13 +30,12 @@ # created by an external call. # 3) This app runs on a separate thread meaning it won't block the test app. - -RESPONSES = { +RESPONSES_V1 = { "9906": [ { - "Content-Type": "application/json", + "content-type": "application/json", "openai-organization": "new-relic-nkmd8b", - "openai-processing-ms": "24", + "openai-processing-ms": "23", "openai-version": "2020-10-01", "x-ratelimit-limit-requests": "3000", "x-ratelimit-limit-tokens": "1000000", @@ -43,6 +45,7 @@ "x-ratelimit-reset-tokens": "0s", "x-request-id": "058b2dd82590aa4145e97c2e59681f62", }, + 200, { "object": "list", "data": [ @@ -58,9 +61,9 @@ ], "12833": [ { - "Content-Type": "application/json", + "content-type": "application/json", "openai-organization": "new-relic-nkmd8b", - "openai-processing-ms": "16", + "openai-processing-ms": "26", "openai-version": "2020-10-01", "x-ratelimit-limit-requests": "3000", "x-ratelimit-limit-tokens": "1000000", @@ -70,6 +73,7 @@ "x-ratelimit-reset-tokens": "0s", "x-request-id": "d5d71019880e25a94de58b927045a202", }, + 200, { "object": "list", "data": [ @@ -86,61 +90,87 @@ } -def simple_get(self): - content_len = int(self.headers.get("content-length")) - content = json.loads(self.rfile.read(content_len).decode("utf-8")) +@pytest.fixture(scope="session") +def simple_get(openai_version, extract_shortened_prompt): + def _simple_get(self): + content_len = int(self.headers.get("content-length")) + content = json.loads(self.rfile.read(content_len).decode("utf-8")) - prompt = extract_shortened_prompt(content) - if not prompt: - self.send_response(500) - self.end_headers() - self.wfile.write("Could not parse prompt.".encode("utf-8")) - return + prompt = extract_shortened_prompt(content) + if not prompt: + self.send_response(500) + self.end_headers() + self.wfile.write("Could not parse prompt.".encode("utf-8")) + return + + headers, response = ({}, "") + + mocked_responses = RESPONSES_V1 + + for k, v in mocked_responses.items(): + if prompt.startswith(k): + headers, status_code, response = v + break + else: # If no matches found + self.send_response(500) + self.end_headers() + self.wfile.write(("Unknown Prompt:\n%s" % prompt).encode("utf-8")) + return - headers, response = ({}, "") - for k, v in RESPONSES.items(): - if prompt.startswith(k): - headers, response = v - break - else: # If no matches found - self.send_response(500) + # Send response code + self.send_response(status_code) + + # Send headers + for k, v in headers.items(): + self.send_header(k, v) self.end_headers() - self.wfile.write(("Unknown Prompt:\n%s" % prompt).encode("utf-8")) + + # Send response body + self.wfile.write(json.dumps(response).encode("utf-8")) return - # Send response code - self.send_response(200) + return _simple_get + + +@pytest.fixture(scope="session") +def MockExternalOpenAIServer(simple_get): + class _MockExternalOpenAIServer(MockExternalHTTPServer): + # To use this class in a test one needs to start and stop this server + # before and after making requests to the test app that makes the external + # calls. + + def __init__(self, handler=simple_get, port=None, *args, **kwargs): + super(_MockExternalOpenAIServer, self).__init__(handler=handler, port=port, *args, **kwargs) + + return _MockExternalOpenAIServer + - # Send headers - for k, v in headers.items(): - self.send_header(k, v) - self.end_headers() +@pytest.fixture(scope="session") +def extract_shortened_prompt(openai_version): + def _extract_shortened_prompt(content): + _input = content.get("input", None) + prompt = (_input and str(_input[0][0])) or content.get("messages")[0]["content"] + return prompt - # Send response body - self.wfile.write(json.dumps(response).encode("utf-8")) - return + return _extract_shortened_prompt -def extract_shortened_prompt(content): - prompt = ( - content.get("prompt", None) - or "\n".join(str(m) for m in content.get("input")[0]) - or "\n".join(m["content"] for m in content.get("messages")) - ) - return prompt.lstrip().split("\n")[0] +def get_openai_version(): + # Import OpenAI so that get package version can catpure the version from the + # system module. OpenAI does not have a package version in v0. + import openai # noqa: F401; pylint: disable=W0611 + return get_package_version_tuple("openai") -class MockExternalLangChainServer(MockExternalHTTPServer): - # To use this class in a test one needs to start and stop this server - # before and after making requests to the test app that makes the external - # calls. - def __init__(self, handler=simple_get, port=None, *args, **kwargs): - super(MockExternalLangChainServer, self).__init__(handler=handler, port=port, *args, **kwargs) +@pytest.fixture(scope="session") +def openai_version(): + return get_openai_version() if __name__ == "__main__": - with MockExternalLangChainServer() as server: - print("MockExternalLangChainServer serving on port %s" % str(server.port)) + _MockExternalOpenAIServer = MockExternalOpenAIServer() + with MockExternalOpenAIServer() as server: + print("MockExternalOpenAIServer serving on port %s" % str(server.port)) while True: pass # Serve forever diff --git a/tests/mlmodel_langchain/conftest.py b/tests/mlmodel_langchain/conftest.py index d152e21eb..32b4370b3 100644 --- a/tests/mlmodel_langchain/conftest.py +++ b/tests/mlmodel_langchain/conftest.py @@ -16,16 +16,22 @@ import os import pytest -from _mock_external_langchain_server import ( - MockExternalLangChainServer, +from _mock_external_openai_server import ( # noqa: F401; pylint: disable=W0611 + MockExternalOpenAIServer, extract_shortened_prompt, + get_openai_version, + openai_version, + simple_get, +) +from langchain_community.embeddings.openai import OpenAIEmbeddings +from testing_support.fixture.event_loop import ( # noqa: F401; pylint: disable=W0611 + event_loop as loop, ) from testing_support.fixtures import ( # noqa: F401, pylint: disable=W0611 collector_agent_registration_fixture, collector_available_fixture, ) -from newrelic.api.time_trace import current_trace from newrelic.api.transaction import current_transaction from newrelic.common.object_wrapper import wrap_function_wrapper @@ -44,12 +50,40 @@ linked_applications=["Python Agent Test (mlmodel_langchain)"], ) -LANGCHAIN_AUDIT_LOG_FILE = os.path.join(os.path.realpath(os.path.dirname(__file__)), "langchain_audit.log") -LANGCHAIN_AUDIT_LOG_CONTENTS = {} + +OPENAI_AUDIT_LOG_FILE = os.path.join(os.path.realpath(os.path.dirname(__file__)), "openai_audit.log") +OPENAI_AUDIT_LOG_CONTENTS = {} +# Intercept outgoing requests and log to file for mocking +RECORDED_HEADERS = set(["x-request-id", "content-type"]) + + +@pytest.fixture(scope="session") +def openai_clients(openai_version, MockExternalOpenAIServer): # noqa: F811 + """ + This configures the openai client and returns it for openai v1 and only configures + openai for v0 since there is no client. + """ + from newrelic.core.config import _environ_as_bool + + if not _environ_as_bool("NEW_RELIC_TESTING_RECORD_OPENAI_RESPONSES", False): + with MockExternalOpenAIServer() as server: + yield OpenAIEmbeddings( + openai_api_key="NOT-A-REAL-SECRET", openai_api_base="http://localhost:%d" % server.port + ) + else: + openai_api_key = os.environ.get("OPENAI_API_KEY") + if not openai_api_key: + raise RuntimeError("OPENAI_API_KEY environment variable required.") + + yield OpenAIEmbeddings(openai_api_key=openai_api_key) + + +@pytest.fixture(scope="session") +def embeding_openai_client(openai_clients): + embedding_client = openai_clients + return embedding_client -# In practice this changes with each run, so to account for this -# in testing, we will set it ourselves @pytest.fixture def set_trace_info(): def set_info(): @@ -57,99 +91,67 @@ def set_info(): if txn: txn.guid = "transaction-id" txn._trace_id = "trace-id" - trace = current_trace() - if trace: - trace.guid = "span-id" return set_info @pytest.fixture(autouse=True, scope="session") -def langchain_server(): +def openai_server( + openai_version, # noqa: F811 + openai_clients, + wrap_httpx_client_send, +): """ This fixture will either create a mocked backend for testing purposes, or will set up an audit log file to log responses of the real OpenAI backend to a file. - The behavior can be controlled by setting NEW_RELIC_TESTING_RECORD_LANGCHAIN_RESPONSES=1 as + The behavior can be controlled by setting NEW_RELIC_TESTING_RECORD_OPENAI_RESPONSES=1 as an environment variable to run using the real OpenAI backend. (Default: mocking) """ - from newrelic.core.config import _environ_as_bool - if not _environ_as_bool("NEW_RELIC_TESTING_RECORD_LANGCHAIN_RESPONSES", False): - # Use mocked OpenAI backend and prerecorded responses - with MockExternalLangChainServer() as server: - os.environ["OPENAI_API_BASE"] = "http://localhost:%d" % server.port - os.environ["OPENAI_API_KEY"] = "NOT-A-REAL-SECRET" - yield - else: - # Use real OpenAI backend and record responses - openai_api_key = os.environ.get("OPENAI_API_KEY", "") - if not openai_api_key: - raise RuntimeError("OPENAI_API_KEY environment variable required.") - - # Apply function wrappers to record data - wrap_function_wrapper("openai.api_requestor", "APIRequestor.request", wrap_openai_api_requestor_request) - wrap_function_wrapper( - "openai.api_requestor", "APIRequestor._interpret_response", wrap_openai_api_requestor_interpret_response - ) + if _environ_as_bool("NEW_RELIC_TESTING_RECORD_OPENAI_RESPONSES", False): + wrap_function_wrapper("httpx._client", "Client.send", wrap_httpx_client_send) yield # Run tests - # Write responses to audit log - with open(LANGCHAIN_AUDIT_LOG_FILE, "w") as audit_log_fp: - json.dump(LANGCHAIN_AUDIT_LOG_CONTENTS, fp=audit_log_fp, indent=4) - + with open(OPENAI_AUDIT_LOG_FILE, "w") as audit_log_fp: + json.dump(OPENAI_AUDIT_LOG_CONTENTS, fp=audit_log_fp, indent=4) + else: + # We are mocking openai responses so we don't need to do anything in this case. + yield -# Intercept outgoing requests and log to file for mocking -RECORDED_HEADERS = set(["x-request-id", "content-type"]) +def bind_send_params(request, *, stream=False, **kwargs): + return request -def wrap_openai_api_requestor_interpret_response(wrapped, instance, args, kwargs): - rbody, rcode, rheaders = bind_request_interpret_response_params(*args, **kwargs) - headers = dict( - filter( - lambda k: k[0].lower() in RECORDED_HEADERS - or k[0].lower().startswith("openai") - or k[0].lower().startswith("x-ratelimit"), - rheaders.items(), - ) - ) - if rcode >= 400 or rcode < 200: - rbody = json.loads(rbody) - LANGCHAIN_AUDIT_LOG_CONTENTS["error"] = headers, rcode, rbody # Append response data to audit log - return wrapped(*args, **kwargs) +@pytest.fixture(scope="session") +def wrap_httpx_client_send(extract_shortened_prompt): # noqa: F811 + def _wrap_httpx_client_send(wrapped, instance, args, kwargs): + request = bind_send_params(*args, **kwargs) + if not request: + return wrapped(*args, **kwargs) + params = json.loads(request.content.decode("utf-8")) + prompt = extract_shortened_prompt(params) -def wrap_openai_api_requestor_request(wrapped, instance, args, kwargs): - params = bind_request_params(*args, **kwargs) - if not params: - return wrapped(*args, **kwargs) + # Send request + response = wrapped(*args, **kwargs) - prompt = extract_shortened_prompt(params) + if response.status_code >= 400 or response.status_code < 200: + prompt = "error" - # Send request - result = wrapped(*args, **kwargs) + rheaders = getattr(response, "headers") - # Clean up data - data = result[0].data - headers = result[0]._headers - headers = dict( - filter( - lambda k: k[0].lower() in RECORDED_HEADERS - or k[0].lower().startswith("openai") - or k[0].lower().startswith("x-ratelimit"), - headers.items(), + headers = dict( + filter( + lambda k: k[0].lower() in RECORDED_HEADERS + or k[0].lower().startswith("openai") + or k[0].lower().startswith("x-ratelimit"), + rheaders.items(), + ) ) - ) - - # Log response - LANGCHAIN_AUDIT_LOG_CONTENTS[prompt] = headers, data # Append response data to audit log - return result - - -def bind_request_params(method, url, params=None, *args, **kwargs): - return params - + body = json.loads(response.content.decode("utf-8")) + OPENAI_AUDIT_LOG_CONTENTS[prompt] = headers, response.status_code, body # Append response data to log + return response -def bind_request_interpret_response_params(result, stream): - return result.content.decode("utf-8"), result.headers + return _wrap_httpx_client_send diff --git a/tests/mlmodel_langchain/test_vectorstore.py b/tests/mlmodel_langchain/test_vectorstore.py index 6e027a3ee..8886a01bc 100644 --- a/tests/mlmodel_langchain/test_vectorstore.py +++ b/tests/mlmodel_langchain/test_vectorstore.py @@ -15,7 +15,6 @@ import os from langchain.document_loaders import PyPDFLoader -from langchain.embeddings.openai import OpenAIEmbeddings from langchain_community.vectorstores.faiss import FAISS from testing_support.fixtures import ( reset_core_stats_engine, @@ -32,64 +31,6 @@ LANGCHAIN_VERSION = get_package_version("langchain") vectorstore_recorded_events = [ - ( - {"type": "LlmEmbedding", "timestamp": 1702052394446}, - { - "id": None, # UUID that changes with each run - "appName": "Python Agent Test (mlmodel_langchain)", - "span_id": None, - "trace_id": "trace-id", - "request_id": "058b2dd82590aa4145e97c2e59681f62", - "transaction_id": "transaction-id", - "input": "[[9906, 1917, 4999, 16]]", - "api_key_last_four_digits": "sk-CRET", - "duration": None, # Changes with every run - "request.model": "text-embedding-ada-002", - "response.model": "text-embedding-ada-002-v2", - "response.organization": "new-relic-nkmd8b", - "response.api_type": "None", - "response.usage.total_tokens": 4, - "response.usage.prompt_tokens": 4, - "response.headers.llmVersion": "2020-10-01", - "response.headers.ratelimitLimitRequests": 3000, - "response.headers.ratelimitLimitTokens": 1000000, - "response.headers.ratelimitResetTokens": "0s", - "response.headers.ratelimitResetRequests": "20ms", - "response.headers.ratelimitRemainingTokens": 999996, - "response.headers.ratelimitRemainingRequests": 2999, - "vendor": "openAI", - "ingest_source": "Python", - }, - ), - ( - {"type": "LlmEmbedding", "timestamp": 1702052394882}, - { - "id": None, # UUID that changes with each run - "appName": "Python Agent Test (mlmodel_langchain)", - "span_id": None, - "trace_id": "trace-id", - "request_id": "d5d71019880e25a94de58b927045a202", - "transaction_id": "transaction-id", - "input": "[[12833, 420, 11914, 25, 22691]]", - "api_key_last_four_digits": "sk-CRET", - "duration": None, # Changes with every run - "request.model": "text-embedding-ada-002", - "response.model": "text-embedding-ada-002-v2", - "response.organization": "new-relic-nkmd8b", - "response.api_type": "None", - "response.usage.total_tokens": 5, - "response.usage.prompt_tokens": 5, - "response.headers.llmVersion": "2020-10-01", - "response.headers.ratelimitLimitRequests": 3000, - "response.headers.ratelimitLimitTokens": 1000000, - "response.headers.ratelimitResetTokens": "0s", - "response.headers.ratelimitResetRequests": "20ms", - "response.headers.ratelimitRemainingTokens": 999994, - "response.headers.ratelimitRemainingRequests": 2999, - "vendor": "openAI", - "ingest_source": "Python", - }, - ), ( {"type": "LlmVectorSearch", "timestamp": 1702052394890}, { @@ -119,7 +60,8 @@ "vendor": "langchain", "ingest_source": "Python", "appName": "Python Agent Test (mlmodel_langchain)", - "metadata.source": "/__w/newrelic-python-agent/newrelic-python-agent/tests/mlmodel_langchain/hello.pdf", + # "metadata.source": "/__w/newrelic-python-agent/newrelic-python-agent/tests/mlmodel_langchain/hello.pdf", + "metadata.source": "/Users/lrafeei/repo/newrelic-python-agent/tests/mlmodel_langchain/hello.pdf", "metadata.page": 0, }, ), @@ -165,7 +107,7 @@ def test_vectorstore_modules_instrumented(): background_task=True, ) @background_task() -def test_pdf_pagesplitter_vectorstore_in_txn(set_trace_info): +def test_pdf_pagesplitter_vectorstore_in_txn(set_trace_info, embeding_openai_client): set_trace_info() script_dir = os.path.dirname(__file__) @@ -174,14 +116,14 @@ def test_pdf_pagesplitter_vectorstore_in_txn(set_trace_info): assert "page" in docs[0].metadata assert "source" in docs[0].metadata - faiss_index = FAISS.from_documents(docs, OpenAIEmbeddings()) + faiss_index = FAISS.from_documents(docs, embeding_openai_client) docs = faiss_index.similarity_search("Complete this sentence: Hello", k=1) assert "Hello world" in docs[0].page_content @reset_core_stats_engine() @validate_custom_event_count(count=0) -def test_pdf_pagesplitter_vectorstore_outside_txn(set_trace_info): +def test_pdf_pagesplitter_vectorstore_outside_txn(set_trace_info, embeding_openai_client): set_trace_info() script_dir = os.path.dirname(__file__) @@ -190,6 +132,6 @@ def test_pdf_pagesplitter_vectorstore_outside_txn(set_trace_info): assert "page" in docs[0].metadata assert "source" in docs[0].metadata - faiss_index = FAISS.from_documents(docs, OpenAIEmbeddings()) + faiss_index = FAISS.from_documents(docs, embeding_openai_client) docs = faiss_index.similarity_search("Complete this sentence: Hello", k=1) assert "Hello world" in docs[0].page_content diff --git a/tox.ini b/tox.ini index 7f61610fe..659384ab8 100644 --- a/tox.ini +++ b/tox.ini @@ -352,7 +352,7 @@ deps = mlmodel_openai: protobuf mlmodel_langchain: langchain mlmodel_langchain: langchain-community - mlmodel_langchain: openai[datalib]<1.0 + mlmodel_langchain: openai[datalib] ; Required for testing mlmodel_langchain: pypdf mlmodel_langchain: tiktoken