From fab1fbdfe952bdc1e0015ea058d440e58c7ed999 Mon Sep 17 00:00:00 2001 From: XinyaoWa Date: Tue, 20 Aug 2024 22:09:04 +0800 Subject: [PATCH] Add logging for unified debug (#521) Signed-off-by: Xinyao Wang --- comps/__init__.py | 3 + comps/agent/langchain/agent.py | 13 +- comps/asr/asr.py | 12 +- comps/chathistory/mongo/chathistory_mongo.py | 24 ++- comps/cores/mega/logger.py | 2 +- comps/dataprep/milvus/prepare_doc_milvus.py | 132 ++++++++++----- .../langchain/prepare_doc_pgvector.py | 88 +++++++--- .../dataprep/pinecone/prepare_doc_pinecone.py | 87 ++++++---- comps/dataprep/qdrant/prepare_doc_qdrant.py | 40 +++-- .../redis/langchain/prepare_doc_redis.py | 153 ++++++++++++------ .../langchain_ray/prepare_doc_redis_on_ray.py | 65 ++++++-- .../redis/llama_index/prepare_doc_redis.py | 49 ++++-- .../langchain-mosec/embedding_mosec.py | 10 +- comps/embeddings/langchain/embedding_tei.py | 11 +- comps/embeddings/langchain/local_embedding.py | 19 ++- comps/embeddings/llama_index/embedding_tei.py | 11 +- .../embeddings/llama_index/local_embedding.py | 11 +- .../guardrails/llama_guard/guardrails_tgi.py | 14 +- .../guardrails/pii_detection/pii_detection.py | 51 ++++-- .../langchain/knowledge_graph.py | 10 +- comps/llms/faq-generation/tgi/llm.py | 9 +- comps/llms/summarization/tgi/llm.py | 12 +- comps/llms/text-generation/native/llm.py | 9 +- comps/llms/text-generation/ollama/llm.py | 15 +- comps/llms/text-generation/tgi/llm.py | 28 +++- comps/llms/text-generation/vllm-ray/llm.py | 12 +- comps/llms/text-generation/vllm-xft/llm.py | 15 +- .../vllm/launch_microservice.sh | 1 + comps/llms/text-generation/vllm/llm.py | 20 ++- comps/llms/utils/lm-eval/self_hosted_hf.py | 13 +- comps/lvms/lvm.py | 13 +- comps/lvms/lvm_tgi.py | 16 +- comps/prompt_registry/mongo/prompt.py | 25 ++- comps/ragas/tgi/llm.py | 21 ++- comps/reranks/fastrag/local_reranking.py | 10 ++ .../langchain-mosec/reranking_mosec_xeon.py | 11 +- comps/reranks/tei/local_reranking.py | 19 ++- comps/reranks/tei/reranking_tei.py | 19 ++- .../haystack/qdrant/retriever_qdrant.py | 11 +- .../langchain/milvus/retriever_milvus.py | 8 + .../langchain/pgvector/retriever_pgvector.py | 8 + .../langchain/pinecone/retriever_pinecone.py | 13 +- .../langchain/redis/retriever_redis.py | 9 +- .../retrievers/llamaindex/retriever_redis.py | 9 +- comps/tts/tts.py | 13 +- .../langchain/chroma/retriever_chroma.py | 17 +- 46 files changed, 891 insertions(+), 270 deletions(-) diff --git a/comps/__init__.py b/comps/__init__.py index 15fb64e7b..873a7697a 100644 --- a/comps/__init__.py +++ b/comps/__init__.py @@ -46,3 +46,6 @@ # Statistics from comps.cores.mega.base_statistics import statistics_dict, register_statistics + +# Logger +from comps.cores.mega.logger import CustomLogger diff --git a/comps/agent/langchain/agent.py b/comps/agent/langchain/agent.py index fffdc8765..b0fb1b81b 100644 --- a/comps/agent/langchain/agent.py +++ b/comps/agent/langchain/agent.py @@ -12,10 +12,13 @@ comps_path = os.path.join(cur_path, "../../../") sys.path.append(comps_path) -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice from comps.agent.langchain.src.agent import instantiate_agent from comps.agent.langchain.src.utils import get_args +logger = CustomLogger("comps-react-agent") +logflag = os.getenv("LOGFLAG", False) + args, _ = get_args() @@ -28,12 +31,16 @@ input_datatype=LLMParamsDoc, ) async def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) # 1. initialize the agent - print("args: ", args) + if logflag: + logger.info("args: ", args) input.streaming = args.streaming config = {"recursion_limit": args.recursion_limit} agent_inst = instantiate_agent(args, args.strategy) - print(type(agent_inst)) + if logflag: + logger.info(type(agent_inst)) # 2. prepare the input for the agent if input.streaming: diff --git a/comps/asr/asr.py b/comps/asr/asr.py index 1f5cf2df4..f687169b0 100644 --- a/comps/asr/asr.py +++ b/comps/asr/asr.py @@ -8,6 +8,11 @@ import numpy as np import requests +from comps import CustomLogger + +logger = CustomLogger("asr") +logflag = os.getenv("LOGFLAG", False) + from comps import ( Base64ByteStrDoc, LLMParamsDoc, @@ -33,14 +38,17 @@ async def audio_to_text(audio: Base64ByteStrDoc): start = time.time() byte_str = audio.byte_str inputs = {"audio": byte_str} + if logflag: + logger.info(inputs) response = requests.post(url=f"{asr_endpoint}/v1/asr", data=json.dumps(inputs), proxies={"http": None}) - + if logflag: + logger.info(response) statistics_dict["opea_service@asr"].append_latency(time.time() - start, None) return LLMParamsDoc(query=response.json()["asr_result"]) if __name__ == "__main__": asr_endpoint = os.getenv("ASR_ENDPOINT", "http://localhost:7066") - print("[asr - router] ASR initialized.") + logger.info("[asr - router] ASR initialized.") opea_microservices["opea_service@asr"].start() diff --git a/comps/chathistory/mongo/chathistory_mongo.py b/comps/chathistory/mongo/chathistory_mongo.py index 1993503da..29f5d41cb 100644 --- a/comps/chathistory/mongo/chathistory_mongo.py +++ b/comps/chathistory/mongo/chathistory_mongo.py @@ -1,14 +1,19 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os from typing import Optional from fastapi import HTTPException from mongo_store import DocumentStore from pydantic import BaseModel +from comps import CustomLogger from comps.cores.mega.micro_service import opea_microservices, register_microservice from comps.cores.proto.api_protocol import ChatCompletionRequest +logger = CustomLogger("chathistory_mongo") +logflag = os.getenv("LOGFLAG", False) + class ChatMessage(BaseModel): data: ChatCompletionRequest @@ -50,7 +55,8 @@ async def create_documents(document: ChatMessage): Returns: The result of the operation if successful, None otherwise. """ - + if logflag: + logger.info(document) try: if document.data.user is None: raise HTTPException(status_code=500, detail="Please provide the user information") @@ -62,10 +68,12 @@ async def create_documents(document: ChatMessage): res = await store.update_document(document.id, document.data, document.first_query) else: res = await store.save_document(document) + if logflag: + logger.info(res) return res except Exception as e: # Handle the exception here - print(f"An error occurred: {str(e)}") + logger.info(f"An error occurred: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @@ -85,6 +93,8 @@ async def get_documents(document: ChatId): Returns: The retrieved documents if successful, None otherwise. """ + if logflag: + logger.info(document) try: store = DocumentStore(document.user) store.initialize_storage() @@ -92,10 +102,12 @@ async def get_documents(document: ChatId): res = await store.get_all_documents_of_user() else: res = await store.get_user_documents_by_id(document.id) + if logflag: + logger.info(res) return res except Exception as e: # Handle the exception here - print(f"An error occurred: {str(e)}") + logger.info(f"An error occurred: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @@ -115,6 +127,8 @@ async def delete_documents(document: ChatId): Returns: The result of the deletion if successful, None otherwise. """ + if logflag: + logger.info(document) try: store = DocumentStore(document.user) store.initialize_storage() @@ -122,10 +136,12 @@ async def delete_documents(document: ChatId): raise Exception("Document id is required.") else: res = await store.delete_document(document.id) + if logflag: + logger.info(res) return res except Exception as e: # Handle the exception here - print(f"An error occurred: {str(e)}") + logger.info(f"An error occurred: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/comps/cores/mega/logger.py b/comps/cores/mega/logger.py index 8cbe59dc3..b556a640f 100644 --- a/comps/cores/mega/logger.py +++ b/comps/cores/mega/logger.py @@ -35,7 +35,7 @@ def __init__(self, name: str = None): self.__dict__[key.lower()] = functools.partial(self.log_message, level) # Set up log format and handler - self.format = logging.Formatter(fmt="[%(asctime)-15s] [%(levelname)8s] - %(message)s") + self.format = logging.Formatter(fmt="[%(asctime)-15s] [%(levelname)8s] - %(name)s - %(message)s") self.handler = logging.StreamHandler() self.handler.setFormatter(self.format) diff --git a/comps/dataprep/milvus/prepare_doc_milvus.py b/comps/dataprep/milvus/prepare_doc_milvus.py index a293963c4..25640dab9 100644 --- a/comps/dataprep/milvus/prepare_doc_milvus.py +++ b/comps/dataprep/milvus/prepare_doc_milvus.py @@ -24,7 +24,7 @@ from langchain_text_splitters import HTMLHeaderTextSplitter from pyspark import SparkConf, SparkContext -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.dataprep.utils import ( create_upload_folder, document_loader, @@ -37,6 +37,9 @@ save_content_to_local_disk, ) +logger = CustomLogger("prepare_doc_milvus") +logflag = os.getenv("LOGFLAG", False) + # workaround notes: cp comps/dataprep/utils.py ./milvus/utils.py # from utils import document_loader, get_tables_result, parse_html index_params = {"index_type": "FLAT", "metric_type": "IP", "params": {}} @@ -73,7 +76,8 @@ def ingest_data_to_milvus(doc_path: DocPath): """Ingest document to Milvus.""" path = doc_path.path file_name = path.split("/")[-1] - print(f"[ ingest data ] Parsing document {path}, file name: {file_name}.") + if logflag: + logger.info(f"[ ingest data ] Parsing document {path}, file name: {file_name}.") if path.endswith(".html"): headers_to_split_on = [ @@ -92,22 +96,26 @@ def ingest_data_to_milvus(doc_path: DocPath): if doc_path.process_table and path.endswith(".pdf"): table_chunks = get_tables_result(path, doc_path.table_strategy) chunks = chunks + table_chunks - print("[ ingest data ] Done preprocessing. Created ", len(chunks), " chunks of the original pdf") + if logflag: + logger.info("[ ingest data ] Done preprocessing. Created ", len(chunks), " chunks of the original pdf") # Create vectorstore if MOSEC_EMBEDDING_ENDPOINT: # create embeddings using MOSEC endpoint service - print( - f"[ ingest data ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}" - ) + if logflag: + logger.info( + f"[ ingest data ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT}, MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}" + ) embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL) elif TEI_EMBEDDING_ENDPOINT: # create embeddings using TEI endpoint service - print(f"[ ingest data ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") + if logflag: + logger.info(f"[ ingest data ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT) else: # create embeddings using local embedding model - print(f"[ ingest data ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") + if logflag: + logger.info(f"[ ingest data ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL) # insert documents to Milvus @@ -124,10 +132,12 @@ def ingest_data_to_milvus(doc_path: DocPath): partition_key_field=partition_field_name, ) except Exception as e: - print(f"[ ingest data ] fail to ingest data into Milvus. error: {e}") + if logflag: + logger.info(f"[ ingest data ] fail to ingest data into Milvus. error: {e}") return False - print(f"[ ingest data ] Docs ingested from {path} to Milvus collection {COLLECTION_NAME}.") + if logflag: + logger.info(f"[ ingest data ] Docs ingested from {path} to Milvus collection {COLLECTION_NAME}.") return True @@ -136,23 +146,30 @@ async def ingest_link_to_milvus(link_list: List[str]): # Create vectorstore if MOSEC_EMBEDDING_ENDPOINT: # create embeddings using MOSEC endpoint service - print(f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}") + if logflag: + logger.info( + f"MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}" + ) embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL) elif TEI_EMBEDDING_ENDPOINT: # create embeddings using TEI endpoint service - print(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") + if logflag: + logger.info(f"TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT) else: # create embeddings using local embedding model - print(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") + if logflag: + logger.info(f"Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL) for link in link_list: content = parse_html([link])[0][0] - print(f"[ ingest link ] link: {link} content: {content}") + if logflag: + logger.info(f"[ ingest link ] link: {link} content: {content}") encoded_link = encode_filename(link) save_path = upload_folder + encoded_link + ".txt" - print(f"[ ingest link ] save_path: {save_path}") + if logflag: + logger.info(f"[ ingest link ] save_path: {save_path}") await save_content_to_local_disk(save_path, content) document = Document(page_content=content, metadata={partition_field_name: encoded_link + ".txt"}) @@ -174,8 +191,9 @@ async def ingest_documents( process_table: bool = Form(False), table_strategy: str = Form("fast"), ): - print(f"files:{files}") - print(f"link_list:{link_list}") + if logflag: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") if files and link_list: raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.") @@ -187,7 +205,8 @@ async def ingest_documents( save_path = upload_folder + file.filename await save_content_to_local_disk(save_path, file) uploaded_files.append(save_path) - print(f"Successfully saved file {save_path}") + if logflag: + logger.info(f"Successfully saved file {save_path}") def process_files_wrapper(files): if not isinstance(files, list): @@ -218,7 +237,10 @@ def process_files_wrapper(files): except: # Stop the SparkContext sc.stop() - return {"status": 200, "message": "Data preparation succeeded"} + results = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(results) + return results if link_list: try: @@ -226,8 +248,12 @@ def process_files_wrapper(files): if not isinstance(link_list, list): raise HTTPException(status_code=400, detail="link_list should be a list.") await ingest_link_to_milvus(link_list) - print(f"Successfully saved link list {link_list}") - return {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(f"Successfully saved link list {link_list}") + results = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(results) + return results except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") @@ -238,30 +264,39 @@ def process_files_wrapper(files): name="opea_service@prepare_doc_milvus_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6011 ) async def rag_get_file_structure(): - print("[ dataprep - get file ] start to get file structure") + if logflag: + logger.info("[ dataprep - get file ] start to get file structure") if not Path(upload_folder).exists(): - print("No file uploaded, return empty list.") + if logflag: + logger.info("No file uploaded, return empty list.") return [] file_content = get_file_structure(upload_folder) + if logflag: + logger.info(file_content) return file_content def delete_all_data(my_milvus): - print("[ delete ] deleting all data in milvus") + if logflag: + logger.info("[ delete ] deleting all data in milvus") my_milvus.delete(expr="pk >= 0") my_milvus.col.flush() - print("[ delete ] delete success: all data") + if logflag: + logger.info("[ delete ] delete success: all data") def delete_by_partition_field(my_milvus, partition_field): - print(f"[ delete ] deleting {partition_field_name} {partition_field}") + if logflag: + logger.info(f"[ delete ] deleting {partition_field_name} {partition_field}") pks = my_milvus.get_pks(f'{partition_field_name} == "{partition_field}"') - print(f"[ delete ] target pks: {pks}") + if logflag: + logger.info(f"[ delete ] target pks: {pks}") res = my_milvus.delete(pks) my_milvus.col.flush() - print(f"[ delete ] delete success: {res}") + if logflag: + logger.info(f"[ delete ] delete success: {res}") @register_microservice( @@ -274,20 +309,25 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): - file/link path (e.g. /path/to/file.txt) - "all": delete all files uploaded """ + if logflag: + logger.info(file_path) # create embedder obj if MOSEC_EMBEDDING_ENDPOINT: # create embeddings using MOSEC endpoint service - print( - f"[ dataprep - del ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}" - ) + if logflag: + logger.info( + f"[ dataprep - del ] MOSEC_EMBEDDING_ENDPOINT:{MOSEC_EMBEDDING_ENDPOINT},MOSEC_EMBEDDING_MODEL:{MOSEC_EMBEDDING_MODEL}" + ) embedder = MosecEmbeddings(model=MOSEC_EMBEDDING_MODEL) elif TEI_EMBEDDING_ENDPOINT: # create embeddings using TEI endpoint service - print(f"[ dataprep - del ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") + if logflag: + logger.info(f"[ dataprep - del ] TEI_EMBEDDING_ENDPOINT:{TEI_EMBEDDING_ENDPOINT}") embedder = HuggingFaceHubEmbeddings(model=TEI_EMBEDDING_ENDPOINT) else: # create embeddings using local embedding model - print(f"[ dataprep - del ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") + if logflag: + logger.info(f"[ dataprep - del ] Local TEI_EMBEDDING_MODEL:{TEI_EMBEDDING_MODEL}") embedder = HuggingFaceBgeEmbeddings(model_name=TEI_EMBEDDING_MODEL) # define Milvus obj @@ -301,33 +341,45 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): # delete all uploaded files if file_path == "all": - print("[ dataprep - del ] deleting all files") + if logflag: + logger.info("[ dataprep - del ] deleting all files") delete_all_data(my_milvus) remove_folder_with_ignore(upload_folder) - print("[ dataprep - del ] successfully delete all files.") + if logflag: + logger.info("[ dataprep - del ] successfully delete all files.") create_upload_folder(upload_folder) + if logflag: + logger.info({"status": True}) return {"status": True} encode_file_name = encode_filename(file_path) delete_path = Path(upload_folder + "/" + encode_file_name) - print(f"[dataprep - del] delete_path: {delete_path}") + if logflag: + logger.info(f"[dataprep - del] delete_path: {delete_path}") # partially delete files if delete_path.exists(): # file if delete_path.is_file(): - print(f"[dataprep - del] deleting file {encode_file_name}") + if logflag: + logger.info(f"[dataprep - del] deleting file {encode_file_name}") try: delete_by_partition_field(my_milvus, encode_file_name) delete_path.unlink() - print(f"[dataprep - del] file {encode_file_name} deleted") + if logflag: + logger.info(f"[dataprep - del] file {encode_file_name} deleted") + logger.info({"status": True}) return {"status": True} except Exception as e: - print(f"[dataprep - del] fail to delete file {delete_path}: {e}") + if logflag: + logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}") + logger.info({"status": False}) return {"status": False} # folder else: - print("[dataprep - del] delete folder is not supported for now.") + if logflag: + logger.info("[dataprep - del] delete folder is not supported for now.") + logger.info({"status": False}) return {"status": False} else: raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") diff --git a/comps/dataprep/pgvector/langchain/prepare_doc_pgvector.py b/comps/dataprep/pgvector/langchain/prepare_doc_pgvector.py index f46e466ba..7a02c6792 100644 --- a/comps/dataprep/pgvector/langchain/prepare_doc_pgvector.py +++ b/comps/dataprep/pgvector/langchain/prepare_doc_pgvector.py @@ -14,7 +14,7 @@ from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings from langchain_community.vectorstores import PGVector -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.dataprep.utils import ( create_upload_folder, document_loader, @@ -26,6 +26,9 @@ save_content_to_local_disk, ) +logger = CustomLogger("prepare_doc_pgvector") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_ENDPOINT") upload_folder = "./uploaded_files/" @@ -37,7 +40,8 @@ async def save_file_to_local_disk(save_path: str, file): content = await file.read() fout.write(content) except Exception as e: - print(f"Write file failed. Exception: {e}") + if logflag: + logger.info(f"Write file failed. Exception: {e}") raise HTTPException(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}") @@ -54,7 +58,9 @@ def delete_embeddings(doc_name): connection = psycopg2.connect(database=database, user=username, password=password, host=hostname, port=port) # Create a cursor object to execute SQL queries - print(f"Deleting {doc_name} from vectorstore") + + if logflag: + logger.info(f"Deleting {doc_name} from vectorstore") cur = connection.cursor() if doc_name == "all": @@ -75,26 +81,30 @@ def delete_embeddings(doc_name): return True except psycopg2.Error as e: - print(f"Error deleting document from vectorstore: {e}") + if logflag: + logger.info(f"Error deleting document from vectorstore: {e}") return False except Exception as e: - print(f"An unexpected error occurred: {e}") + if logflag: + logger.info(f"An unexpected error occurred: {e}") return False def ingest_doc_to_pgvector(doc_path: DocPath): """Ingest document to PGVector.""" doc_path = doc_path.path - print(f"Parsing document {doc_path}.") + if logflag: + logger.info(f"Parsing document {doc_path}.") text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, add_start_index=True, separators=get_separators() ) content = document_loader(doc_path) chunks = text_splitter.split_text(content) - print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") - print("PG Connection", PG_CONNECTION_STRING) + if logflag: + logger.info("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") + logger.info("PG Connection", PG_CONNECTION_STRING) metadata = [dict({"doc_name": str(doc_path)})] # Create vectorstore @@ -119,7 +129,8 @@ def ingest_doc_to_pgvector(doc_path: DocPath): collection_name=INDEX_NAME, connection_string=PG_CONNECTION_STRING, ) - print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + if logflag: + logger.info(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") return True @@ -139,11 +150,13 @@ async def ingest_link_to_pgvector(link_list: List[str]): for link in link_list: texts = [] content = parse_html([link])[0][0] - print(f"[ ingest link ] link: {link} content: {content}") + if logflag: + logger.info(f"[ ingest link ] link: {link} content: {content}") encoded_link = encode_filename(link) save_path = upload_folder + encoded_link + ".txt" doc_path = upload_folder + link + ".txt" - print(f"[ ingest link ] save_path: {save_path}") + if logflag: + logger.info(f"[ ingest link ] save_path: {save_path}") await save_content_to_local_disk(save_path, content) metadata = [dict({"doc_name": str(doc_path)})] @@ -162,7 +175,8 @@ async def ingest_link_to_pgvector(link_list: List[str]): collection_name=INDEX_NAME, connection_string=PG_CONNECTION_STRING, ) - print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + if logflag: + logger.info(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") return True @@ -176,8 +190,9 @@ async def ingest_link_to_pgvector(link_list: List[str]): async def ingest_documents( files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), link_list: Optional[str] = Form(None) ): - print(f"files:{files}") - print(f"link_list:{link_list}") + if logflag: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") if files and link_list: raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.") @@ -192,8 +207,12 @@ async def ingest_documents( await save_file_to_local_disk(save_path, file) ingest_doc_to_pgvector(DocPath(path=save_path)) - print(f"Successfully saved file {save_path}") - return {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(f"Successfully saved file {save_path}") + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(result) + return result if link_list: try: @@ -201,8 +220,12 @@ async def ingest_documents( if not isinstance(link_list, list): raise HTTPException(status_code=400, detail="link_list should be a list.") await ingest_link_to_pgvector(link_list) - print(f"Successfully saved link list {link_list}") - return {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(f"Successfully saved link list {link_list}") + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(result) + return result except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") @@ -213,13 +236,17 @@ async def ingest_documents( name="opea_service@prepare_doc_pgvector", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6007 ) async def rag_get_file_structure(): - print("[ dataprep - get file ] start to get file structure") + if logflag: + logger.info("[ dataprep - get file ] start to get file structure") if not Path(upload_folder).exists(): - print("No file uploaded, return empty list.") + if logflag: + logger.info("No file uploaded, return empty list.") return [] file_content = get_file_structure(upload_folder) + if logflag: + logger.info(file_content) return file_content @@ -235,16 +262,21 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): - "all": delete all files uploaded """ if file_path == "all": - print("[dataprep - del] delete all files") + if logflag: + logger.info("[dataprep - del] delete all files") remove_folder_with_ignore(upload_folder) assert delete_embeddings(file_path) - print("[dataprep - del] successfully delete all files.") + if logflag: + logger.info("[dataprep - del] successfully delete all files.") create_upload_folder(upload_folder) + if logflag: + logger.info({"status": True}) return {"status": True} delete_path = Path(upload_folder + "/" + encode_filename(file_path)) doc_path = upload_folder + file_path - print(f"[dataprep - del] delete_path: {delete_path}") + if logflag: + logger.info(f"[dataprep - del] delete_path: {delete_path}") # partially delete files/folders if delete_path.exists(): @@ -254,12 +286,18 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): assert delete_embeddings(doc_path) delete_path.unlink() except Exception as e: - print(f"[dataprep - del] fail to delete file {delete_path}: {e}") + if logflag: + logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}") + logger.info({"status": False}) return {"status": False} # delete folder else: - print("[dataprep - del] delete folder is not supported for now.") + if logflag: + logger.info("[dataprep - del] delete folder is not supported for now.") + logger.info({"status": False}) return {"status": False} + if logflag: + logger.info({"status": True}) return {"status": True} else: raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") diff --git a/comps/dataprep/pinecone/prepare_doc_pinecone.py b/comps/dataprep/pinecone/prepare_doc_pinecone.py index cbee8cd94..73f3e94af 100644 --- a/comps/dataprep/pinecone/prepare_doc_pinecone.py +++ b/comps/dataprep/pinecone/prepare_doc_pinecone.py @@ -16,7 +16,7 @@ from langchain_text_splitters import HTMLHeaderTextSplitter from pinecone import Pinecone, ServerlessSpec -from comps import DocPath, opea_microservices, opea_telemetry, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, opea_telemetry, register_microservice from comps.dataprep.utils import ( create_upload_folder, document_loader, @@ -29,23 +29,29 @@ save_content_to_local_disk, ) +logger = CustomLogger("prepare_doc_pinecone") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") upload_folder = "./uploaded_files/" def check_index_existance(): - print(f"[ check index existence ] checking {PINECONE_INDEX_NAME}") + if logflag: + logger.info(f"[ check index existence ] checking {PINECONE_INDEX_NAME}") pc = Pinecone(api_key=PINECONE_API_KEY) existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] if PINECONE_INDEX_NAME not in existing_indexes: - print("[ check index existence ] index does not exist") + if logflag: + logger.info("[ check index existence ] index does not exist") return None else: return True def create_index(client): - print(f"[ create index ] creating index {PINECONE_INDEX_NAME}") + if logflag: + logger.info(f"[ create index ] creating index {PINECONE_INDEX_NAME}") try: client.create_index( name=PINECONE_INDEX_NAME, @@ -53,21 +59,26 @@ def create_index(client): metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) - print(f"[ create index ] index {PINECONE_INDEX_NAME} successfully created") + if logflag: + logger.info(f"[ create index ] index {PINECONE_INDEX_NAME} successfully created") except Exception as e: - print(f"[ create index ] fail to create index {PINECONE_INDEX_NAME}: {e}") + if logflag: + logger.info(f"[ create index ] fail to create index {PINECONE_INDEX_NAME}: {e}") return False return True def drop_index(index_name): - print(f"[ drop index ] dropping index {index_name}") + if logflag: + logger.info(f"[ drop index ] dropping index {index_name}") pc = Pinecone(api_key=PINECONE_API_KEY) try: pc.delete_index(index_name) - print(f"[ drop index ] index {index_name} deleted") + if logflag: + logger.info(f"[ drop index ] index {index_name} deleted") except Exception as e: - print(f"[ drop index ] index {index_name} delete failed: {e}") + if logflag: + logger.info(f"[ drop index ] index {index_name} delete failed: {e}") return False return True @@ -75,7 +86,8 @@ def drop_index(index_name): def ingest_data_to_pinecone(doc_path: DocPath): """Ingest document to Pinecone.""" path = doc_path.path - print(f"Parsing document {path}.") + if logflag: + logger.info(f"Parsing document {path}.") if path.endswith(".html"): headers_to_split_on = [ @@ -97,7 +109,8 @@ def ingest_data_to_pinecone(doc_path: DocPath): if doc_path.process_table and path.endswith(".pdf"): table_chunks = get_tables_result(path, doc_path.table_strategy) chunks = chunks + table_chunks - print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") + if logflag: + logger.info("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") # Create vectorstore if tei_embedding_endpoint: @@ -113,7 +126,8 @@ def ingest_data_to_pinecone(doc_path: DocPath): if not check_index_existance(): # Creating the index create_index(pc) - print("Successfully created the index", PINECONE_INDEX_NAME) + if logflag: + logger.info("Successfully created the index", PINECONE_INDEX_NAME) # Batch size batch_size = 32 @@ -129,7 +143,8 @@ def ingest_data_to_pinecone(doc_path: DocPath): embedding=embedder, index_name=PINECONE_INDEX_NAME, ) - print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + if logflag: + logger.info(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") # store file_ids into index file-keys pc = Pinecone(api_key=PINECONE_API_KEY) @@ -150,15 +165,18 @@ async def ingest_link_to_pinecone(link_list: List[str]): if not check_index_existance(): # Creating the index create_index(pc) - print("Successfully created the index", PINECONE_INDEX_NAME) + if logflag: + logger.info("Successfully created the index", PINECONE_INDEX_NAME) # save link contents and doc_ids one by one for link in link_list: content = parse_html([link])[0][0] - print(f"[ ingest link ] link: {link} content: {content}") + if logflag: + logger.info(f"[ ingest link ] link: {link} content: {content}") encoded_link = encode_filename(link) save_path = upload_folder + encoded_link + ".txt" - print(f"[ ingest link ] save_path: {save_path}") + if logflag: + logger.info(f"[ ingest link ] save_path: {save_path}") await save_content_to_local_disk(save_path, content) vectorstore = PineconeVectorStore.from_texts( @@ -179,8 +197,9 @@ async def ingest_documents( process_table: bool = Form(False), table_strategy: str = Form("fast"), ): - print(f"files:{files}") - print(f"link_list:{link_list}") + if logflag: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") if files: if not isinstance(files, list): @@ -200,9 +219,12 @@ async def ingest_documents( ) ) uploaded_files.append(save_path) - print(f"Successfully saved file {save_path}") - - return {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(f"Successfully saved file {save_path}") + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(result) + return result if link_list: try: @@ -210,8 +232,11 @@ async def ingest_documents( if not isinstance(link_list, list): raise HTTPException(status_code=400, detail="link_list should be a list.") await ingest_link_to_pinecone(link_list) - print(f"Successfully saved link list {link_list}") - return {"status": 200, "message": "Data preparation succeeded"} + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(f"Successfully saved link list {link_list}") + logger.info(result) + return result except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") @@ -222,13 +247,17 @@ async def ingest_documents( name="opea_service@prepare_doc_pinecone_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6008 ) async def rag_get_file_structure(): - print("[ dataprep - get file ] start to get file structure") + if logflag: + logger.info("[ dataprep - get file ] start to get file structure") if not Path(upload_folder).exists(): - print("No file uploaded, return empty list.") + if logflag: + logger.info("No file uploaded, return empty list.") return [] file_content = get_file_structure(upload_folder) + if logflag: + logger.info(file_content) return file_content @@ -243,11 +272,15 @@ async def delete_all(file_path: str = Body(..., embed=True)): """ # delete all uploaded files if file_path == "all": - print("[dataprep - del] delete all files") + if logflag: + logger.info("[dataprep - del] delete all files") remove_folder_with_ignore(upload_folder) assert drop_index(index_name=PINECONE_INDEX_NAME) - print("[dataprep - del] successfully delete all files.") + if logflag: + logger.info("[dataprep - del] successfully delete all files.") create_upload_folder(upload_folder) + if logflag: + logger.info({"status": True}) return {"status": True} else: raise HTTPException(status_code=404, detail="Single file deletion is not implemented yet") diff --git a/comps/dataprep/qdrant/prepare_doc_qdrant.py b/comps/dataprep/qdrant/prepare_doc_qdrant.py index fb8d66571..8fe0399e2 100644 --- a/comps/dataprep/qdrant/prepare_doc_qdrant.py +++ b/comps/dataprep/qdrant/prepare_doc_qdrant.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import json +import os from typing import List, Optional, Union from config import COLLECTION_NAME, EMBED_MODEL, QDRANT_HOST, QDRANT_PORT, TEI_EMBEDDING_ENDPOINT @@ -12,7 +13,7 @@ from langchain_huggingface import HuggingFaceEndpointEmbeddings from langchain_text_splitters import HTMLHeaderTextSplitter -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.dataprep.utils import ( document_loader, encode_filename, @@ -22,13 +23,17 @@ save_content_to_local_disk, ) +logger = CustomLogger("prepare_doc_qdrant") +logflag = os.getenv("LOGFLAG", False) + upload_folder = "./uploaded_files/" def ingest_data_to_qdrant(doc_path: DocPath): """Ingest document to Qdrant.""" path = doc_path.path - print(f"Parsing document {path}.") + if logflag: + logger.info(f"Parsing document {path}.") if path.endswith(".html"): headers_to_split_on = [ @@ -51,7 +56,8 @@ def ingest_data_to_qdrant(doc_path: DocPath): if doc_path.process_table and path.endswith(".pdf"): table_chunks = get_tables_result(path, doc_path.table_strategy) chunks = chunks + table_chunks - print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") + if logflag: + logger.info("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") # Create vectorstore if TEI_EMBEDDING_ENDPOINT: @@ -61,7 +67,8 @@ def ingest_data_to_qdrant(doc_path: DocPath): # create embeddings using local embedding model embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) - print("embedder created.") + if logflag: + logger.info("embedder created.") # Batch size batch_size = 32 @@ -77,7 +84,8 @@ def ingest_data_to_qdrant(doc_path: DocPath): host=QDRANT_HOST, port=QDRANT_PORT, ) - print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + if logflag: + logger.info(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") return True @@ -98,8 +106,9 @@ async def ingest_documents( process_table: bool = Form(False), table_strategy: str = Form("fast"), ): - print(f"files:{files}") - print(f"link_list:{link_list}") + if logflag: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") if files: if not isinstance(files, list): @@ -119,9 +128,12 @@ async def ingest_documents( ) ) uploaded_files.append(save_path) - print(f"Successfully saved file {save_path}") - - return {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(f"Successfully saved file {save_path}") + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(result) + return result if link_list: link_list = json.loads(link_list) # Parse JSON string to list @@ -145,9 +157,13 @@ async def ingest_documents( except json.JSONDecodeError: raise HTTPException(status_code=500, detail="Fail to ingest data into qdrant.") - print(f"Successfully saved link {link}") + if logflag: + logger.info(f"Successfully saved link {link}") - return {"status": 200, "message": "Data preparation succeeded"} + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(result) + return result raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") diff --git a/comps/dataprep/redis/langchain/prepare_doc_redis.py b/comps/dataprep/redis/langchain/prepare_doc_redis.py index a749cd557..571859d47 100644 --- a/comps/dataprep/redis/langchain/prepare_doc_redis.py +++ b/comps/dataprep/redis/langchain/prepare_doc_redis.py @@ -20,7 +20,7 @@ from redis.commands.search.field import TextField from redis.commands.search.indexDefinition import IndexDefinition, IndexType -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.dataprep.utils import ( create_upload_folder, document_loader, @@ -33,63 +33,81 @@ save_content_to_local_disk, ) +logger = CustomLogger("prepare_doc_redis") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_ENDPOINT") upload_folder = "./uploaded_files/" redis_pool = redis.ConnectionPool.from_url(REDIS_URL) def check_index_existance(client): - print(f"[ check index existence ] checking {client}") + if logflag: + logger.info(f"[ check index existence ] checking {client}") try: results = client.search("*") - print(f"[ check index existence ] index of client exists: {client}") + if logflag: + logger.info(f"[ check index existence ] index of client exists: {client}") return results except Exception as e: - print(f"[ check index existence ] index does not exist: {e}") + if logflag: + logger.info(f"[ check index existence ] index does not exist: {e}") return None def create_index(client, index_name: str = KEY_INDEX_NAME): - print(f"[ create index ] creating index {index_name}") + if logflag: + logger.info(f"[ create index ] creating index {index_name}") try: definition = IndexDefinition(index_type=IndexType.HASH, prefix=["file:"]) client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition) - print(f"[ create index ] index {index_name} successfully created") + if logflag: + logger.info(f"[ create index ] index {index_name} successfully created") except Exception as e: - print(f"[ create index ] fail to create index {index_name}: {e}") + if logflag: + logger.info(f"[ create index ] fail to create index {index_name}: {e}") return False return True def store_by_id(client, key, value): - print(f"[ store by id ] storing ids of {key}") + if logflag: + logger.info(f"[ store by id ] storing ids of {key}") try: client.add_document(doc_id="file:" + key, file_name=key, key_ids=value) - print(f"[ store by id ] store document success. id: file:{key}") + if logflag: + logger.info(f"[ store by id ] store document success. id: file:{key}") except Exception as e: - print(f"[ store by id ] fail to store document file:{key}: {e}") + if logflag: + logger.info(f"[ store by id ] fail to store document file:{key}: {e}") return False return True def search_by_id(client, doc_id): - print(f"[ search by id ] searching docs of {doc_id}") + if logflag: + logger.info(f"[ search by id ] searching docs of {doc_id}") try: results = client.load_document(doc_id) - print(f"[ search by id ] search success of {doc_id}: {results}") + if logflag: + logger.info(f"[ search by id ] search success of {doc_id}: {results}") return results except Exception as e: - print(f"[ search by id ] fail to search docs of {doc_id}: {e}") + if logflag: + logger.info(f"[ search by id ] fail to search docs of {doc_id}: {e}") return None def drop_index(index_name, redis_url=REDIS_URL): - print(f"[ drop index ] dropping index {index_name}") + if logflag: + logger.info(f"[ drop index ] dropping index {index_name}") try: assert Redis.drop_index(index_name=index_name, delete_documents=True, redis_url=redis_url) - print(f"[ drop index ] index {index_name} deleted") + if logflag: + logger.info(f"[ drop index ] index {index_name} deleted") except Exception as e: - print(f"[ drop index ] index {index_name} delete failed: {e}") + if logflag: + logger.info(f"[ drop index ] index {index_name} delete failed: {e}") return False return True @@ -97,15 +115,18 @@ def drop_index(index_name, redis_url=REDIS_URL): def delete_by_id(client, id): try: assert client.delete_document(id) - print(f"[ delete by id ] delete id success: {id}") + if logflag: + logger.info(f"[ delete by id ] delete id success: {id}") except Exception as e: - print(f"[ delete by id ] fail to delete ids {id}: {e}") + if logflag: + logger.info(f"[ delete by id ] fail to delete ids {id}: {e}") return False return True def ingest_chunks_to_redis(file_name: str, chunks: List): - print(f"[ ingest chunks ] file name: {file_name}") + if logflag: + logger.info(f"[ ingest chunks ] file name: {file_name}") # Create vectorstore if tei_embedding_endpoint: # create embeddings using TEI endpoint service @@ -120,7 +141,8 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): file_ids = [] for i in range(0, num_chunks, batch_size): - print(f"[ ingest chunks ] Current batch: {i}") + if logflag: + logger.info(f"[ ingest chunks ] Current batch: {i}") batch_chunks = chunks[i : i + batch_size] batch_texts = batch_chunks @@ -130,9 +152,11 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): index_name=INDEX_NAME, redis_url=REDIS_URL, ) - print(f"[ ingest chunks ] keys: {keys}") + if logflag: + logger.info(f"[ ingest chunks ] keys: {keys}") file_ids.extend(keys) - print(f"[ ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + if logflag: + logger.info(f"[ ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") # store file_ids into index file-keys r = redis.Redis(connection_pool=redis_pool) @@ -143,7 +167,8 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): try: assert store_by_id(client, key=file_name, value="#".join(file_ids)) except Exception as e: - print(f"[ ingest chunks ] {e}. Fail to store chunks of file {file_name}.") + if logflag: + logger.info(f"[ ingest chunks ] {e}. Fail to store chunks of file {file_name}.") raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.") return True @@ -151,7 +176,8 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): def ingest_data_to_redis(doc_path: DocPath): """Ingest document to Redis.""" path = doc_path.path - print(f"Parsing document {path}.") + if logflag: + logger.info(f"Parsing document {path}.") if path.endswith(".html"): headers_to_split_on = [ @@ -174,7 +200,8 @@ def ingest_data_to_redis(doc_path: DocPath): if doc_path.process_table and path.endswith(".pdf"): table_chunks = get_tables_result(path, doc_path.table_strategy) chunks = chunks + table_chunks - print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") + if logflag: + logger.info("Done preprocessing. Created ", len(chunks), " chunks of the original pdf") file_name = doc_path.path.split("/")[-1] return ingest_chunks_to_redis(file_name, chunks) @@ -189,8 +216,9 @@ async def ingest_documents( process_table: bool = Form(False), table_strategy: str = Form("fast"), ): - print(f"files:{files}") - print(f"link_list:{link_list}") + if logflag: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") r = redis.Redis(connection_pool=redis_pool) client = r.ft(KEY_INDEX_NAME) @@ -208,9 +236,10 @@ async def ingest_documents( key_ids = None try: key_ids = search_by_id(client, doc_id).key_ids - print(f"[ upload file ] File {file.filename} already exists.") + if logflag: + logger.info(f"[ upload file ] File {file.filename} already exists.") except Exception as e: - print(f"[ upload file ] File {file.filename} does not exist.") + logger.info(f"[ upload file ] File {file.filename} does not exist.") if key_ids: raise HTTPException( status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name." @@ -228,7 +257,8 @@ async def ingest_documents( ) ) uploaded_files.append(save_path) - print(f"Successfully saved file {save_path}") + if logflag: + logger.info(f"Successfully saved file {save_path}") # def process_files_wrapper(files): # if not isinstance(files, list): @@ -251,8 +281,10 @@ async def ingest_documents( # except: # # Stop the SparkContext # sc.stop() - - return {"status": 200, "message": "Data preparation succeeded"} + result = {"status": 200, "message": "Data preparation succeeded"} + if logflag: + logger.info(result) + return result if link_list: link_list = json.loads(link_list) # Parse JSON string to list @@ -266,9 +298,10 @@ async def ingest_documents( key_ids = None try: key_ids = search_by_id(client, doc_id).key_ids - print(f"[ upload file ] Link {link} already exists.") + if logflag: + logger.info(f"[ upload file ] Link {link} already exists.") except Exception as e: - print(f"[ upload file ] Link {link} does not exist. Keep storing.") + logger.info(f"[ upload file ] Link {link} does not exist. Keep storing.") if key_ids: raise HTTPException( status_code=400, detail=f"Uploaded link {link} already exists. Please change another link." @@ -286,7 +319,9 @@ async def ingest_documents( table_strategy=table_strategy, ) ) - print(f"Successfully saved link list {link_list}") + if logflag: + logger.info(f"Successfully saved link list {link_list}") + logger.info({"status": 200, "message": "Data preparation succeeded"}) return {"status": 200, "message": "Data preparation succeeded"} raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") @@ -296,7 +331,8 @@ async def ingest_documents( name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6007 ) async def rag_get_file_structure(): - print("[ dataprep - get file ] start to get file structure") + if logflag: + logger.info("[ dataprep - get file ] start to get file structure") # define redis client r = redis.Redis(connection_pool=redis_pool) @@ -312,6 +348,8 @@ async def rag_get_file_structure(): # last batch if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE: break + if logflag: + logger.info(file_list) return file_list @@ -333,41 +371,50 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): # delete all uploaded files if file_path == "all": - print("[dataprep - del] delete all files") + if logflag: + logger.info("[dataprep - del] delete all files") # drop index KEY_INDEX_NAME if check_index_existance(client): try: assert drop_index(index_name=KEY_INDEX_NAME) except Exception as e: - print(f"[dataprep - del] {e}. Fail to drop index {KEY_INDEX_NAME}.") + if logflag: + logger.info(f"[dataprep - del] {e}. Fail to drop index {KEY_INDEX_NAME}.") raise HTTPException(status_code=500, detail=f"Fail to drop index {KEY_INDEX_NAME}.") else: - print(f"[dataprep - del] Index {KEY_INDEX_NAME} does not exits.") + logger.info(f"[dataprep - del] Index {KEY_INDEX_NAME} does not exits.") # drop index INDEX_NAME if check_index_existance(client2): try: assert drop_index(index_name=INDEX_NAME) except Exception as e: - print(f"[dataprep - del] {e}. Fail to drop index {INDEX_NAME}.") + if logflag: + logger.info(f"[dataprep - del] {e}. Fail to drop index {INDEX_NAME}.") raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.") else: - print(f"[dataprep - del] Index {INDEX_NAME} does not exits.") + if logflag: + logger.info(f"[dataprep - del] Index {INDEX_NAME} does not exits.") # delete files on local disk try: remove_folder_with_ignore(upload_folder) except Exception as e: - print(f"[dataprep - del] {e}. Fail to delete {upload_folder}.") + if logflag: + logger.info(f"[dataprep - del] {e}. Fail to delete {upload_folder}.") raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}.") - print("[dataprep - del] successfully delete all files.") + if logflag: + logger.info("[dataprep - del] successfully delete all files.") create_upload_folder(upload_folder) + if logflag: + logger.info({"status": True}) return {"status": True} delete_path = Path(upload_folder + "/" + encode_filename(file_path)) - print(f"[dataprep - del] delete_path: {delete_path}") + if logflag: + logger.info(f"[dataprep - del] delete_path: {delete_path}") # partially delete files if delete_path.exists(): @@ -377,7 +424,8 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): try: key_ids = search_by_id(client, doc_id).key_ids except Exception as e: - print(f"[dataprep - del] {e}, File {file_path} does not exists.") + if logflag: + logger.info(f"[dataprep - del] {e}, File {file_path} does not exists.") raise HTTPException( status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path." ) @@ -389,7 +437,8 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): try: assert delete_by_id(client, doc_id) except Exception as e: - print(f"[dataprep - del] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.") + if logflag: + logger.info(f"[dataprep - del] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.") raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.") # delete file content in db INDEX_NAME @@ -398,7 +447,8 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): try: content = search_by_id(client2, file_id).content except Exception as e: - print(f"[dataprep - del] {e}. File {file_path} does not exists.") + if logflag: + logger.info(f"[dataprep - del] {e}. File {file_path} does not exists.") raise HTTPException( status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path." ) @@ -407,17 +457,20 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): try: assert delete_by_id(client2, file_id) except Exception as e: - print(f"[dataprep - del] {e}. File {file_path} delete failed for db {INDEX_NAME}") + if logflag: + logger.info(f"[dataprep - del] {e}. File {file_path} delete failed for db {INDEX_NAME}") raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.") # delete file on local disk delete_path.unlink() - + if logflag: + logger.info({"status": True}) return {"status": True} # delete folder else: - print(f"[dataprep - del] Delete folder {file_path} is not supported for now.") + if logflag: + logger.info(f"[dataprep - del] Delete folder {file_path} is not supported for now.") raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") else: raise HTTPException(status_code=404, detail=f"File {file_path} not found. Please check file_path.") diff --git a/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py b/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py index 07ad81da8..af5095f30 100644 --- a/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py +++ b/comps/dataprep/redis/langchain_ray/prepare_doc_redis_on_ray.py @@ -40,7 +40,7 @@ from ray.data.datasource import FileBasedDatasource from tqdm import tqdm -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.dataprep.utils import ( Timer, create_upload_folder, @@ -54,6 +54,9 @@ timeout, ) +logger = CustomLogger("prepare_doc_redis") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_ENDPOINT") debug = False upload_folder = "./uploaded_files/" @@ -73,7 +76,8 @@ def prepare_env(enable_ray=False, pip_requirements=None): def generate_log_name(file_list): file_set = f"{sorted(file_list)}" - # print(f"file_set: {file_set}") + # if logflag: + # logger.info(f"file_set: {file_set}") md5_str = hashlib.md5(file_set.encode(), usedforsecurity=False).hexdigest() return f"status/status_{md5_str}.log" @@ -196,7 +200,8 @@ def data_to_redis(data): index_name=INDEX_NAME, redis_url=REDIS_URL, ) - # print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") + # if logflag: + # logger.info(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") return num_chunks @@ -257,8 +262,8 @@ def _parse_html(link): for link in tqdm(link_list, total=len(link_list)): with Timer(f"read document {link}."): data = _parse_html(link) - if debug: - print("content is: ", data) + if logflag: + logger.info("content is: ", data) with Timer(f"ingest document {link} to Redis."): data_to_redis(data) return True @@ -266,6 +271,9 @@ def _parse_html(link): @register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007) async def ingest_documents(files: List[UploadFile] = File(None), link_list: str = Form(None)): + if logflag: + logger.info(files) + logger.info(link_list) if files and link_list: raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.") @@ -292,9 +300,13 @@ async def ingest_documents(files: List[UploadFile] = File(None), link_list: str enable_ray = True prepare_env(enable_ray=enable_ray) num_cpus = get_max_cpus(len(saved_path_list)) - print(f"per task num_cpus: {num_cpus}") + if logflag: + logger.info(f"per task num_cpus: {num_cpus}") ret = ingest_data_to_redis(saved_path_list, enable_ray=enable_ray, num_cpus=num_cpus) - return {"status": 200, "message": f"Data preparation succeeded. ret msg is {ret}"} + result = {"status": 200, "message": f"Data preparation succeeded. ret msg is {ret}"} + if logflag: + logger.info(result) + return result except Exception as e: raise HTTPException(status_code=400, detail=f"An error occurred: {e}") @@ -309,9 +321,13 @@ async def ingest_documents(files: List[UploadFile] = File(None), link_list: str enable_ray = True prepare_env(enable_ray=enable_ray) num_cpus = get_max_cpus(len(link_list)) - print(f"per task num_cpus: {num_cpus}") + if logflag: + logger.info(f"per task num_cpus: {num_cpus}") ret = ingest_link_to_redis(link_list, enable_ray=enable_ray, num_cpus=num_cpus) - return {"status": 200, "message": f"Data preparation succeeded, ret msg is {ret}"} + result = {"status": 200, "message": f"Data preparation succeeded. ret msg is {ret}"} + if logflag: + logger.info(result) + return result except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") except Exception as e: @@ -322,13 +338,17 @@ async def ingest_documents(files: List[UploadFile] = File(None), link_list: str name="opea_service@prepare_doc_redis_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6008 ) async def rag_get_file_structure(): - print("[ get_file_structure] ") + if logflag: + logger.info("[ get_file_structure] ") if not Path(upload_folder).exists(): - print("No file uploaded, return empty list.") + if logflag: + logger.info("No file uploaded, return empty list.") return [] file_content = get_file_structure(upload_folder) + if logflag: + logger.info(file_content) return file_content @@ -343,16 +363,23 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): - folder path (e.g. /path/to/folder) - "all": delete all files uploaded """ + if logflag: + logger.info(file_path) # delete all uploaded files if file_path == "all": - print("[dataprep - del] delete all files") + if logflag: + logger.info("[dataprep - del] delete all files") remove_folder_with_ignore(upload_folder) - print("[dataprep - del] successfully delete all files.") + if logflag: + logger.info("[dataprep - del] successfully delete all files.") create_upload_folder(upload_folder) + if logflag: + logger.info({"status": True}) return {"status": True} delete_path = Path(upload_folder + "/" + encode_filename(file_path)) - print(f"[dataprep - del] delete_path: {delete_path}") + if logflag: + logger.info(f"[dataprep - del] delete_path: {delete_path}") # partially delete files/folders if delete_path.exists(): @@ -361,15 +388,21 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): try: delete_path.unlink() except Exception as e: - print(f"[dataprep - del] fail to delete file {delete_path}: {e}") + if logflag: + logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}") + logger.info({"status": False}) return {"status": False} # delete folder else: try: shutil.rmtree(delete_path) except Exception as e: - print(f"[dataprep - del] fail to delete folder {delete_path}: {e}") + if logflag: + logger.info(f"[dataprep - del] fail to delete folder {delete_path}: {e}") + logger.info({"status": False}) return {"status": False} + if logflag: + logger.info({"status": True}) return {"status": True} else: raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") diff --git a/comps/dataprep/redis/llama_index/prepare_doc_redis.py b/comps/dataprep/redis/llama_index/prepare_doc_redis.py index ae4d00461..fc93ebaad 100644 --- a/comps/dataprep/redis/llama_index/prepare_doc_redis.py +++ b/comps/dataprep/redis/llama_index/prepare_doc_redis.py @@ -16,7 +16,10 @@ from redisvl.schema import IndexSchema from utils import * -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice + +logger = CustomLogger("prepare_doc_redis") +logflag = os.getenv("LOGFLAG", False) upload_folder = "./uploaded_files/" @@ -49,14 +52,16 @@ async def ingest_data_to_redis(doc_path: DocPath): vector_store = RedisVectorStore(redis_client=redis_client, schema=schema) storage_context = StorageContext.from_defaults(vector_store=vector_store) _ = VectorStoreIndex.from_documents(content, storage_context=storage_context) - print("[ ingest data ] data ingested into Redis DB.") + if logflag: + logger.info("[ ingest data ] data ingested into Redis DB.") return True @register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007) # llama index only support upload files now async def ingest_documents(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): - print(f"files:{files}") + if logflag: + logger.info(f"files:{files}") if not files: raise HTTPException(status_code=400, detail="Please provide at least one file.") @@ -69,10 +74,13 @@ async def ingest_documents(files: Optional[Union[UploadFile, List[UploadFile]]] save_path = upload_folder + file.filename await save_content_to_local_disk(save_path, file) await ingest_data_to_redis(DocPath(path=save_path)) - print(f"Successfully saved file {save_path}") + if logflag: + logger.info(f"Successfully saved file {save_path}") + logger.info({"status": 200, "message": "Data preparation succeeded"}) return {"status": 200, "message": "Data preparation succeeded"} except Exception as e: - print(f"Data preparation failed. Exception: {e}") + if logflag: + logger.info(f"Data preparation failed. Exception: {e}") raise HTTPException(status_code=500, detail=f"Data preparation failed. Exception: {e}") @@ -80,13 +88,17 @@ async def ingest_documents(files: Optional[Union[UploadFile, List[UploadFile]]] name="opea_service@prepare_doc_redis_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6008 ) async def rag_get_file_structure(): - print("[ get_file_structure] ") + if logflag: + logger.info("[ get_file_structure] ") if not Path(upload_folder).exists(): - print("No file uploaded, return empty list.") + if logflag: + logger.info("No file uploaded, return empty list.") return [] file_content = get_file_structure(upload_folder) + if logflag: + logger.info(file_content) return file_content @@ -101,16 +113,23 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): - folder path (e.g. /path/to/folder) - "all": delete all files uploaded """ + if logflag: + logger.info(file_path) # delete all uploaded files if file_path == "all": - print("[dataprep - del] delete all files") + if logflag: + logger.info("[dataprep - del] delete all files") remove_folder_with_ignore(upload_folder) - print("[dataprep - del] successfully delete all files.") + if logflag: + logger.info("[dataprep - del] successfully delete all files.") create_upload_folder(upload_folder) + if logflag: + logger.info({"status": True}) return {"status": True} delete_path = Path(upload_folder + "/" + encode_filename(file_path)) - print(f"[dataprep - del] delete_path: {delete_path}") + if logflag: + logger.info(f"[dataprep - del] delete_path: {delete_path}") # partially delete files/folders if delete_path.exists(): @@ -119,15 +138,21 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): try: delete_path.unlink() except Exception as e: - print(f"[dataprep - del] fail to delete file {delete_path}: {e}") + if logflag: + logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}") + logger.info({"status": False}) return {"status": False} # delete folder else: try: shutil.rmtree(delete_path) except Exception as e: - print(f"[dataprep - del] fail to delete folder {delete_path}: {e}") + if logflag: + logger.info(f"[dataprep - del] fail to delete folder {delete_path}: {e}") + logger.info({"status": False}) return {"status": False} + if logflag: + logger.info({"status": True}) return {"status": True} else: raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") diff --git a/comps/embeddings/langchain-mosec/embedding_mosec.py b/comps/embeddings/langchain-mosec/embedding_mosec.py index 702b1937c..61a3db7f2 100644 --- a/comps/embeddings/langchain-mosec/embedding_mosec.py +++ b/comps/embeddings/langchain-mosec/embedding_mosec.py @@ -8,6 +8,7 @@ from langchain_community.embeddings import OpenAIEmbeddings from comps import ( + CustomLogger, EmbedDoc, ServiceType, TextDoc, @@ -17,6 +18,9 @@ statistics_dict, ) +logger = CustomLogger("embedding_mosec") +logflag = os.getenv("LOGFLAG", False) + class MosecEmbeddings(OpenAIEmbeddings): def _get_len_safe_embeddings( @@ -54,10 +58,14 @@ def empty_embedding() -> List[float]: ) @register_statistics(names=["opea_service@embedding_mosec"]) def embedding(input: TextDoc) -> EmbedDoc: + if logflag: + logger.info(input) start = time.time() embed_vector = embeddings.embed_query(input.text) res = EmbedDoc(text=input.text, embedding=embed_vector) statistics_dict["opea_service@embedding_mosec"].append_latency(time.time() - start, None) + if logflag: + logger.info(res) return res @@ -67,5 +75,5 @@ def embedding(input: TextDoc) -> EmbedDoc: os.environ["OPENAI_API_KEY"] = "Dummy key" MODEL_ID = "/home/user/bge-large-zh-v1.5" embeddings = MosecEmbeddings(model=MODEL_ID) - print("Mosec Embedding initialized.") + logger.info("Mosec Embedding initialized.") opea_microservices["opea_service@embedding_mosec"].start() diff --git a/comps/embeddings/langchain/embedding_tei.py b/comps/embeddings/langchain/embedding_tei.py index a318ff0bf..0ddefb49a 100644 --- a/comps/embeddings/langchain/embedding_tei.py +++ b/comps/embeddings/langchain/embedding_tei.py @@ -8,6 +8,7 @@ from langchain_huggingface import HuggingFaceEndpointEmbeddings from comps import ( + CustomLogger, EmbedDoc, ServiceType, TextDoc, @@ -23,6 +24,9 @@ EmbeddingResponseData, ) +logger = CustomLogger("embedding_tei_langchain") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@embedding_tei_langchain", @@ -36,7 +40,8 @@ def embedding( input: Union[TextDoc, EmbeddingRequest, ChatCompletionRequest] ) -> Union[EmbedDoc, EmbeddingResponse, ChatCompletionRequest]: start = time.time() - + if logflag: + logger.info(input) if isinstance(input, TextDoc): embed_vector = embeddings.embed_query(input.text) res = EmbedDoc(text=input.text, embedding=embed_vector) @@ -54,11 +59,13 @@ def embedding( res = EmbeddingResponse(data=[EmbeddingResponseData(index=0, embedding=embed_vector)]) statistics_dict["opea_service@embedding_tei_langchain"].append_latency(time.time() - start, None) + if logflag: + logger.info(res) return res if __name__ == "__main__": tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT", "http://localhost:8080") embeddings = HuggingFaceEndpointEmbeddings(model=tei_embedding_endpoint) - print("TEI Gaudi Embedding initialized.") + logger.info("TEI Gaudi Embedding initialized.") opea_microservices["opea_service@embedding_tei_langchain"].start() diff --git a/comps/embeddings/langchain/local_embedding.py b/comps/embeddings/langchain/local_embedding.py index 1a3825c40..32f8944a9 100644 --- a/comps/embeddings/langchain/local_embedding.py +++ b/comps/embeddings/langchain/local_embedding.py @@ -1,9 +1,22 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os + from langchain_huggingface import HuggingFaceEmbeddings -from comps import EmbedDoc, ServiceType, TextDoc, opea_microservices, opea_telemetry, register_microservice +from comps import ( + CustomLogger, + EmbedDoc, + ServiceType, + TextDoc, + opea_microservices, + opea_telemetry, + register_microservice, +) + +logger = CustomLogger("local_embedding") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -17,8 +30,12 @@ ) @opea_telemetry def embedding(input: TextDoc) -> EmbedDoc: + if logflag: + logger.info(input) embed_vector = embeddings.embed_query(input.text) res = EmbedDoc(text=input.text, embedding=embed_vector) + if logflag: + logger.info(res) return res diff --git a/comps/embeddings/llama_index/embedding_tei.py b/comps/embeddings/llama_index/embedding_tei.py index 9042a61a7..cf14f7790 100644 --- a/comps/embeddings/llama_index/embedding_tei.py +++ b/comps/embeddings/llama_index/embedding_tei.py @@ -5,7 +5,10 @@ from llama_index.embeddings.text_embeddings_inference import TextEmbeddingsInference -from comps import EmbedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import CustomLogger, EmbedDoc, ServiceType, TextDoc, opea_microservices, register_microservice + +logger = CustomLogger("embedding_tei_llamaindex") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -18,8 +21,12 @@ output_datatype=EmbedDoc, ) def embedding(input: TextDoc) -> EmbedDoc: + if logflag: + logger.info(input) embed_vector = embeddings._get_query_embedding(input.text) res = EmbedDoc(text=input.text, embedding=embed_vector) + if logflag: + logger.info(res) return res @@ -27,5 +34,5 @@ def embedding(input: TextDoc) -> EmbedDoc: tei_embedding_model_name = os.getenv("TEI_EMBEDDING_MODEL_NAME", "BAAI/bge-large-en-v1.5") tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT", "http://localhost:8090") embeddings = TextEmbeddingsInference(model_name=tei_embedding_model_name, base_url=tei_embedding_endpoint) - print("TEI Gaudi Embedding initialized.") + logger.info("TEI Gaudi Embedding initialized.") opea_microservices["opea_service@embedding_tei_llamaindex"].start() diff --git a/comps/embeddings/llama_index/local_embedding.py b/comps/embeddings/llama_index/local_embedding.py index 53cc30e15..143d7bb07 100644 --- a/comps/embeddings/llama_index/local_embedding.py +++ b/comps/embeddings/llama_index/local_embedding.py @@ -1,9 +1,14 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os + from llama_index.embeddings.huggingface_api import HuggingFaceInferenceAPIEmbedding -from comps import EmbedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import CustomLogger, EmbedDoc, ServiceType, TextDoc, opea_microservices, register_microservice + +logger = CustomLogger("local_embedding") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -16,8 +21,12 @@ output_datatype=EmbedDoc, ) def embedding(input: TextDoc) -> EmbedDoc: + if logflag: + logger.info(input) embed_vector = embeddings.get_text_embedding(input.text) res = EmbedDoc(text=input.text, embedding=embed_vector) + if logflag: + logger.info(res) return res diff --git a/comps/guardrails/llama_guard/guardrails_tgi.py b/comps/guardrails/llama_guard/guardrails_tgi.py index 93b046e97..ecbcb7778 100644 --- a/comps/guardrails/llama_guard/guardrails_tgi.py +++ b/comps/guardrails/llama_guard/guardrails_tgi.py @@ -8,7 +8,10 @@ from langchain_huggingface import ChatHuggingFace from langchain_huggingface.llms import HuggingFaceEndpoint -from comps import GeneratedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, ServiceType, TextDoc, opea_microservices, register_microservice + +logger = CustomLogger("guardrails_tgi") +logflag = os.getenv("LOGFLAG", False) DEFAULT_MODEL = "meta-llama/LlamaGuard-7b" @@ -63,6 +66,8 @@ def get_tgi_service_model_id(endpoint_url, default=DEFAULT_MODEL): output_datatype=TextDoc, ) def safety_guard(input: Union[GeneratedDoc, TextDoc]) -> TextDoc: + if logflag: + logger.info(input) if isinstance(input, GeneratedDoc): messages = [{"role": "user", "content": input.prompt}, {"role": "assistant", "content": input.text}] else: @@ -73,12 +78,15 @@ def safety_guard(input: Union[GeneratedDoc, TextDoc]) -> TextDoc: unsafe_dict = get_unsafe_dict(llm_engine_hf.model_id) policy_violation_level = response_input_guard.split("\n")[1].strip() policy_violations = unsafe_dict[policy_violation_level] - print(f"Violated policies: {policy_violations}") + if logflag: + logger.info(f"Violated policies: {policy_violations}") res = TextDoc( text=f"Violated policies: {policy_violations}, please check your input.", downstream_black_list=[".*"] ) else: res = TextDoc(text=input.text) + if logflag: + logger.info(res) return res @@ -96,5 +104,5 @@ def safety_guard(input: Union[GeneratedDoc, TextDoc]) -> TextDoc: ) # chat engine for server-side prompt templating llm_engine_hf = ChatHuggingFace(llm=llm_guard, model_id=safety_guard_model) - print("guardrails - router] LLM initialized.") + logger.info("guardrails - router] LLM initialized.") opea_microservices["opea_service@guardrails_tgi"].start() diff --git a/comps/guardrails/pii_detection/pii_detection.py b/comps/guardrails/pii_detection/pii_detection.py index 1ae0dddae..8de9de726 100644 --- a/comps/guardrails/pii_detection/pii_detection.py +++ b/comps/guardrails/pii_detection/pii_detection.py @@ -17,7 +17,7 @@ from tqdm import tqdm -from comps import DocPath, opea_microservices, register_microservice +from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.guardrails.pii_detection.data_utils import document_loader, parse_html from comps.guardrails.pii_detection.pii.pii_utils import PIIDetector, PIIDetectorWithML, PIIDetectorWithNER from comps.guardrails.pii_detection.ray_utils import ray_execute, ray_runner_initialization, rayds_initialization @@ -29,13 +29,18 @@ save_file_to_local_disk, ) +logger = CustomLogger("guardrails-pii-detection") +logflag = os.getenv("LOGFLAG", False) + def get_pii_detection_inst(strategy="dummy", settings=None): if strategy == "ner": - print("invoking NER detector.......") + if logflag: + logger.info("invoking NER detector.......") return PIIDetectorWithNER() elif strategy == "ml": - print("invoking ML detector.......") + if logflag: + logger.info("invoking ML detector.......") return PIIDetectorWithML() else: raise ValueError(f"Invalid strategy: {strategy}") @@ -48,7 +53,8 @@ def file_based_pii_detect(file_list: List[DocPath], strategy, enable_ray=False, if enable_ray: num_cpus = get_max_cpus(len(file_list)) - print(f"per task num_cpus: {num_cpus}") + if logflag: + logger.info(f"per task num_cpus: {num_cpus}") log_name = generate_log_name(file_list) ds = rayds_initialization(file_list, document_loader, lazy_mode=True, num_cpus=num_cpus) @@ -75,7 +81,8 @@ def _parse_html(link): if enable_ray: num_cpus = get_max_cpus(len(link_list)) - print(f"per task num_cpus: {num_cpus}") + if logflag: + logger.info(f"per task num_cpus: {num_cpus}") log_name = generate_log_name(link_list) ds = rayds_initialization(link_list, _parse_html, lazy_mode=True, num_cpus=num_cpus) @@ -86,8 +93,8 @@ def _parse_html(link): for link in tqdm(link_list, total=len(link_list)): with Timer(f"read document {link}."): data = _parse_html(link) - if debug: - print("content is: ", data) + if debug or logflag: + logger.info("content is: ", data) with Timer(f"detect pii on document {link}"): ret.append(pii_detector.detect_pii(data)) return ret @@ -99,7 +106,8 @@ def text_based_pii_detect(text_list: List[str], strategy, enable_ray=False, debu if enable_ray: num_cpus = get_max_cpus(len(text_list)) - print(f"per task num_cpus: {num_cpus}") + if logflag: + logger.info(f"per task num_cpus: {num_cpus}") log_name = generate_log_name(text_list) ds = rayds_initialization(text_list, None, lazy_mode=True, num_cpus=num_cpus) @@ -108,8 +116,8 @@ def text_based_pii_detect(text_list: List[str], strategy, enable_ray=False, debu else: ret = [] for data in tqdm(text_list, total=len(text_list)): - if debug: - print("content is: ", data) + if debug or logflag: + logger.info("content is: ", data) with Timer(f"detect pii on document {data[:50]}"): ret.append(pii_detector.detect_pii(data)) return ret @@ -124,13 +132,19 @@ async def pii_detection( text_list: str = Form(None), strategy: str = Form(None), ): + if logflag: + logger.info(files) + logger.info(link_list) + logger.info(text_list) + logger.info(strategy) if not files and not link_list and not text_list: raise HTTPException(status_code=400, detail="Either files, link_list, or text_list must be provided.") if strategy is None: strategy = "ner" - print("PII detection using strategy: ", strategy) + if logflag: + logger.info("PII detection using strategy: ", strategy) pip_requirement = ["detect-secrets", "phonenumbers", "gibberish-detector"] @@ -153,7 +167,10 @@ async def pii_detection( if enable_ray: prepare_env(enable_ray=enable_ray, pip_requirements=pip_requirement, comps_path=comps_path) ret = file_based_pii_detect(saved_path_list, strategy, enable_ray=enable_ray) - return {"status": 200, "message": json.dumps(ret)} + result = {"status": 200, "message": json.dumps(ret)} + if logflag: + logger.info(result) + return result except Exception as e: raise HTTPException(status_code=400, detail=f"An error occurred: {e}") @@ -166,7 +183,10 @@ async def pii_detection( if enable_ray: prepare_env(enable_ray=enable_ray, pip_requirements=pip_requirement, comps_path=comps_path) ret = text_based_pii_detect(text_list, strategy, enable_ray=enable_ray) - return {"status": 200, "message": json.dumps(ret)} + result = {"status": 200, "message": json.dumps(ret)} + if logflag: + logger.info(result) + return result except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") except Exception as e: @@ -181,7 +201,10 @@ async def pii_detection( if enable_ray: prepare_env(enable_ray=enable_ray, pip_requirements=pip_requirement, comps_path=comps_path) ret = link_based_pii_detect(link_list, strategy, enable_ray=enable_ray) - return {"status": 200, "message": json.dumps(ret)} + result = {"status": 200, "message": json.dumps(ret)} + if logflag: + logger.info(result) + return result except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") except Exception as e: diff --git a/comps/knowledgegraphs/langchain/knowledge_graph.py b/comps/knowledgegraphs/langchain/knowledge_graph.py index 01d1a5a5a..3b5b2882e 100755 --- a/comps/knowledgegraphs/langchain/knowledge_graph.py +++ b/comps/knowledgegraphs/langchain/knowledge_graph.py @@ -23,7 +23,10 @@ from langchain_community.llms import HuggingFaceEndpoint from langchain_community.vectorstores.neo4j_vector import Neo4jVector -from comps import GeneratedDoc, GraphDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, GraphDoc, ServiceType, opea_microservices, register_microservice + +logger = CustomLogger("knowledge_graph") +logflag = os.getenv("LOGFLAG", False) def get_retriever(input, neo4j_endpoint, neo4j_username, neo4j_password, llm): @@ -105,7 +108,8 @@ def get_agent(vector_qa, cypher_chain, llm_repo_id): port=8060, ) def graph_query(input: GraphDoc) -> GeneratedDoc: - print(input) + if logflag: + logger.info(input) ## Connect to Neo4j neo4j_endpoint = os.getenv("NEO4J_ENDPOINT", "neo4j://localhost:7687") @@ -154,6 +158,8 @@ def graph_query(input: GraphDoc) -> GeneratedDoc: result = agent_executor.invoke({"input": input.text})["output"] else: result = "Please specify strtype as one of cypher, rag, query." + if logflag: + logger.info(result) return GeneratedDoc(text=result, prompt=input.text) diff --git a/comps/llms/faq-generation/tgi/llm.py b/comps/llms/faq-generation/tgi/llm.py index beaa5700b..0b4d70e85 100644 --- a/comps/llms/faq-generation/tgi/llm.py +++ b/comps/llms/faq-generation/tgi/llm.py @@ -10,7 +10,10 @@ from langchain.text_splitter import CharacterTextSplitter from langchain_community.llms import HuggingFaceEndpoint -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice + +logger = CustomLogger("llm_faqgen") +logflag = os.getenv("LOGFLAG", False) def post_process_text(text: str): @@ -32,6 +35,8 @@ def post_process_text(text: str): port=9000, ) def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") llm = HuggingFaceEndpoint( endpoint_url=llm_endpoint, @@ -71,6 +76,8 @@ async def stream_generator(): else: response = llm_chain.invoke(input.query) response = response["result"].split("")[0].split("\n")[0] + if logflag: + logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/summarization/tgi/llm.py b/comps/llms/summarization/tgi/llm.py index 43a583a96..80c5d3924 100644 --- a/comps/llms/summarization/tgi/llm.py +++ b/comps/llms/summarization/tgi/llm.py @@ -9,7 +9,10 @@ from langchain.text_splitter import CharacterTextSplitter from langchain_huggingface import HuggingFaceEndpoint -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice + +logger = CustomLogger("llm_docsum") +logflag = os.getenv("LOGFLAG", False) def post_process_text(text: str): @@ -31,6 +34,8 @@ def post_process_text(text: str): port=9000, ) def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") llm = HuggingFaceEndpoint( endpoint_url=llm_endpoint, @@ -57,7 +62,8 @@ async def stream_generator(): _serializer = WellKnownLCSerializer() async for chunk in llm_chain.astream_log(docs): data = _serializer.dumps({"ops": chunk.ops}).decode("utf-8") - print(f"[docsum - text_summarize] data: {data}") + if logflag: + logger.info(f"[docsum - text_summarize] data: {data}") yield f"data: {data}\n\n" yield "data: [DONE]\n\n" @@ -65,6 +71,8 @@ async def stream_generator(): else: response = llm_chain.invoke(input.query) response = response["result"].split("")[0].split("\n")[0] + if logflag: + logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/native/llm.py b/comps/llms/text-generation/native/llm.py index 43348670d..6008a91b8 100644 --- a/comps/llms/text-generation/native/llm.py +++ b/comps/llms/text-generation/native/llm.py @@ -16,6 +16,7 @@ sys.path.append("/test/GenAIComps/") import logging +import os import threading import time @@ -33,6 +34,8 @@ register_statistics, ) +logflag = os.getenv("LOGFLAG", False) + logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", @@ -140,7 +143,8 @@ def initialize(): @register_statistics(names=["opea_service@llm_native"]) def llm_generate(input: LLMParamsDoc): initialize() - + if logflag: + logger.info(input) prompt = input.query prompt_template = None if input.chat_template: @@ -158,7 +162,8 @@ def llm_generate(input: LLMParamsDoc): prompt = ChatTemplate.generate_rag_prompt(input.query, input.documents) res = generate([prompt]) - logger.info(f"[llm - native] inference result: {res}") + if logflag: + logger.info(f"[llm - native] inference result: {res}") return GeneratedDoc(text=res[0], prompt=input.query) diff --git a/comps/llms/text-generation/ollama/llm.py b/comps/llms/text-generation/ollama/llm.py index 1f6d330c8..06d02461c 100644 --- a/comps/llms/text-generation/ollama/llm.py +++ b/comps/llms/text-generation/ollama/llm.py @@ -6,7 +6,10 @@ from fastapi.responses import StreamingResponse from langchain_community.llms import Ollama -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice + +logger = CustomLogger("llm_ollama") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -17,6 +20,8 @@ port=9000, ) def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) ollama = Ollama( base_url=ollama_endpoint, model=input.model if input.model else model_name, @@ -34,14 +39,18 @@ async def stream_generator(): async for text in ollama.astream(input.query): chat_response += text chunk_repr = repr(text.encode("utf-8")) - print(f"[llm - chat_stream] chunk:{chunk_repr}") + if logflag: + logger.info(f"[llm - chat_stream] chunk:{chunk_repr}") yield f"data: {chunk_repr}\n\n" - print(f"[llm - chat_stream] stream response: {chat_response}") + if logflag: + logger.info(f"[llm - chat_stream] stream response: {chat_response}") yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: response = ollama.invoke(input.query) + if logflag: + logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/tgi/llm.py b/comps/llms/text-generation/tgi/llm.py index 6597d5b57..dd4d93e32 100644 --- a/comps/llms/text-generation/tgi/llm.py +++ b/comps/llms/text-generation/tgi/llm.py @@ -12,6 +12,7 @@ from template import ChatTemplate from comps import ( + CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, @@ -22,6 +23,9 @@ ) from comps.cores.proto.api_protocol import ChatCompletionRequest, ChatCompletionResponse, ChatCompletionStreamResponse +logger = CustomLogger("llm_tgi") +logflag = os.getenv("LOGFLAG", False) + llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") llm = AsyncInferenceClient( model=llm_endpoint, @@ -38,7 +42,8 @@ ) @register_statistics(names=["opea_service@llm_tgi"]) async def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest]): - + if logflag: + logger.info(input) prompt_template = None if input.chat_template: prompt_template = PromptTemplate.from_template(input.chat_template) @@ -55,7 +60,7 @@ async def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest]): elif input_variables == ["question"]: prompt = prompt_template.format(question=input.query) else: - print(f"{prompt_template} not used, we only support 2 input variables ['question', 'context']") + logger.info(f"{prompt_template} not used, we only support 2 input variables ['question', 'context']") else: if input.documents: # use rag default template @@ -78,15 +83,19 @@ async def stream_generator(): stream_gen_time.append(time.time() - start) chat_response += text chunk_repr = repr(text.encode("utf-8")) - print(f"[llm - chat_stream] chunk:{chunk_repr}") + if logflag: + logger.info(f"[llm - chat_stream] chunk:{chunk_repr}") yield f"data: {chunk_repr}\n\n" - print(f"[llm - chat_stream] stream response: {chat_response}") + if logflag: + logger.info(f"[llm - chat_stream] stream response: {chat_response}") statistics_dict["opea_service@llm_tgi"].append_latency(stream_gen_time[-1], stream_gen_time[0]) yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: statistics_dict["opea_service@llm_tgi"].append_latency(time.time() - start, None) + if logflag: + logger.info(text_generation) return GeneratedDoc(text=text_generation, prompt=input.query) else: @@ -103,7 +112,9 @@ async def stream_generator(): elif input_variables == ["question"]: prompt = prompt_template.format(question=input.messages) else: - print(f"{prompt_template} not used, we only support 2 input variables ['question', 'context']") + logger.info( + f"{prompt_template} not used, we only support 2 input variables ['question', 'context']" + ) else: if input.documents: # use rag default template @@ -141,7 +152,7 @@ async def stream_generator(): if input_variables == ["context"]: system_prompt = prompt_template.format(context="\n".join(input.documents)) else: - print(f"{prompt_template} not used, only support 1 input variables ['context']") + logger.info(f"{prompt_template} not used, only support 1 input variables ['context']") input.messages.insert(0, {"role": "system", "content": system_prompt}) @@ -173,12 +184,15 @@ async def stream_generator(): def stream_generator(): for c in chat_completion: - print(c) + if logflag: + logger.info(c) yield f"data: {c.model_dump_json()}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: + if logflag: + logger.info(chat_completion) return chat_completion diff --git a/comps/llms/text-generation/vllm-ray/llm.py b/comps/llms/text-generation/vllm-ray/llm.py index d3de026e0..e7efe6527 100644 --- a/comps/llms/text-generation/vllm-ray/llm.py +++ b/comps/llms/text-generation/vllm-ray/llm.py @@ -17,7 +17,10 @@ from fastapi.responses import StreamingResponse from langchain_openai import ChatOpenAI -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice + +logger = CustomLogger("llm_vllm_ray") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -28,6 +31,8 @@ port=9000, ) def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) llm_endpoint = os.getenv("vLLM_RAY_ENDPOINT", "http://localhost:8006") llm_model = os.getenv("LLM_MODEL", "meta-llama/Llama-2-7b-chat-hf") llm = ChatOpenAI( @@ -49,13 +54,16 @@ def stream_generator(): chat_response += text chunk_repr = repr(text.encode("utf-8")) yield f"data: {chunk_repr}\n\n" - print(f"[llm - chat_stream] stream response: {chat_response}") + if logflag: + logger.info(f"[llm - chat_stream] stream response: {chat_response}") yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: response = llm.invoke(input.query) response = response.content + if logflag: + logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/vllm-xft/llm.py b/comps/llms/text-generation/vllm-xft/llm.py index 2c479d90b..07d892bde 100644 --- a/comps/llms/text-generation/vllm-xft/llm.py +++ b/comps/llms/text-generation/vllm-xft/llm.py @@ -6,7 +6,10 @@ from fastapi.responses import StreamingResponse from langchain_community.llms import VLLMOpenAI -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice +from comps import CustomLogger, GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, register_microservice + +logger = CustomLogger("llm_vllm_xft") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -17,6 +20,8 @@ port=9000, ) def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) llm_endpoint = os.getenv("vLLM_LLM_ENDPOINT", "http://localhost:18688") llm = VLLMOpenAI( openai_api_key="EMPTY", @@ -36,14 +41,18 @@ def stream_generator(): for text in llm.stream(input.query): chat_response += text chunk_repr = repr(text.encode("utf-8")) - print(f"[llm - chat_stream] chunk:{chunk_repr}") + if logflag: + logger.info(f"[llm - chat_stream] chunk:{chunk_repr}") yield f"data: {chunk_repr}\n\n" - print(f"[llm - chat_stream] stream response: {chat_response}") + if logflag: + logger.info(f"[llm - chat_stream] stream response: {chat_response}") yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: response = llm.invoke(input.query) + if logflag: + logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/vllm/launch_microservice.sh b/comps/llms/text-generation/vllm/launch_microservice.sh index 6e8246601..01bd0f6f5 100644 --- a/comps/llms/text-generation/vllm/launch_microservice.sh +++ b/comps/llms/text-generation/vllm/launch_microservice.sh @@ -10,4 +10,5 @@ docker run -d --rm \ -e vLLM_ENDPOINT=$vLLM_ENDPOINT \ -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN \ -e LLM_MODEL=$LLM_MODEL \ + -e LOGFLAG=$LOGFLAG \ opea/llm-vllm:latest diff --git a/comps/llms/text-generation/vllm/llm.py b/comps/llms/text-generation/vllm/llm.py index 61bebbe27..c730dd66b 100644 --- a/comps/llms/text-generation/vllm/llm.py +++ b/comps/llms/text-generation/vllm/llm.py @@ -6,7 +6,18 @@ from fastapi.responses import StreamingResponse from langchain_community.llms import VLLMOpenAI -from comps import GeneratedDoc, LLMParamsDoc, ServiceType, opea_microservices, opea_telemetry, register_microservice +from comps import ( + CustomLogger, + GeneratedDoc, + LLMParamsDoc, + ServiceType, + opea_microservices, + opea_telemetry, + register_microservice, +) + +logger = CustomLogger("llm_vllm") +logflag = os.getenv("LOGFLAG", False) @opea_telemetry @@ -29,6 +40,8 @@ def post_process_text(text: str): port=9000, ) def llm_generate(input: LLMParamsDoc): + if logflag: + logger.info(input) llm_endpoint = os.getenv("vLLM_ENDPOINT", "http://localhost:8008") model_name = os.getenv("LLM_MODEL", "meta-llama/Meta-Llama-3-8B-Instruct") llm = VLLMOpenAI( @@ -49,12 +62,15 @@ def stream_generator(): chat_response += text chunk_repr = repr(text.encode("utf-8")) yield f"data: {chunk_repr}\n\n" - print(f"[llm - chat_stream] stream response: {chat_response}") + if logflag: + logger.info(f"[llm - chat_stream] stream response: {chat_response}") yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: response = llm.invoke(input.query) + if logflag: + logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/utils/lm-eval/self_hosted_hf.py b/comps/llms/utils/lm-eval/self_hosted_hf.py index b5eebaa2b..441605be0 100644 --- a/comps/llms/utils/lm-eval/self_hosted_hf.py +++ b/comps/llms/utils/lm-eval/self_hosted_hf.py @@ -10,7 +10,10 @@ from docarray import BaseDoc from evals.evaluation.lm_evaluation_harness.lm_eval.models.huggingface import HFLM, GaudiHFModelAdapter -from comps import ServiceType, opea_microservices, opea_telemetry, register_microservice +from comps import CustomLogger, ServiceType, opea_microservices, opea_telemetry, register_microservice + +logger = CustomLogger("self_hosted_hf") +logflag = os.getenv("LOGFLAG", False) lm_eval.api.registry.MODEL_REGISTRY["hf"] = HFLM lm_eval.api.registry.MODEL_REGISTRY["gaudi-hf"] = GaudiHFModelAdapter @@ -46,6 +49,8 @@ class LLMCompletionDoc(BaseDoc): ) @opea_telemetry def llm_generate(input: LLMCompletionDoc): + if logflag: + logger.info(input) global llm batched_inputs = torch.tensor(input.batched_inputs, dtype=torch.long, device=llm.device) with torch.no_grad(): @@ -56,12 +61,14 @@ def llm_generate(input: LLMCompletionDoc): # Check if per-token argmax is exactly equal to continuation greedy_tokens = logits.argmax(dim=-1) logprobs = torch.gather(logits, 2, batched_inputs[:, 1:].unsqueeze(-1)).squeeze(-1) - - return { + result = { "greedy_tokens": greedy_tokens.detach().cpu().tolist(), "logprobs": logprobs.detach().cpu().tolist(), "batched_inputs": input.batched_inputs, } + if logflag: + logger.info(result) + return result if __name__ == "__main__": diff --git a/comps/lvms/lvm.py b/comps/lvms/lvm.py index a60f6813f..4ae900aae 100644 --- a/comps/lvms/lvm.py +++ b/comps/lvms/lvm.py @@ -9,6 +9,7 @@ import requests from comps import ( + CustomLogger, LVMDoc, ServiceType, TextDoc, @@ -18,6 +19,9 @@ statistics_dict, ) +logger = CustomLogger("lvm") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@lvm", @@ -30,6 +34,8 @@ ) @register_statistics(names=["opea_service@lvm"]) async def lvm(request: LVMDoc): + if logflag: + logger.info(request) start = time.time() img_b64_str = request.image prompt = request.prompt @@ -41,11 +47,14 @@ async def lvm(request: LVMDoc): response = requests.post(url=f"{lvm_endpoint}/generate", data=json.dumps(inputs), proxies={"http": None}) statistics_dict["opea_service@lvm"].append_latency(time.time() - start, None) - return TextDoc(text=response.json()["text"]) + result = response.json()["text"] + if logflag: + logger.info(result) + return TextDoc(text=result) if __name__ == "__main__": lvm_endpoint = os.getenv("LVM_ENDPOINT", "http://localhost:8399") - print("[LVM] LVM initialized.") + logger.info("[LVM] LVM initialized.") opea_microservices["opea_service@lvm"].start() diff --git a/comps/lvms/lvm_tgi.py b/comps/lvms/lvm_tgi.py index b2eddf9f1..9492b4eaf 100644 --- a/comps/lvms/lvm_tgi.py +++ b/comps/lvms/lvm_tgi.py @@ -8,6 +8,7 @@ from huggingface_hub import AsyncInferenceClient from comps import ( + CustomLogger, LVMDoc, ServiceType, TextDoc, @@ -17,6 +18,9 @@ statistics_dict, ) +logger = CustomLogger("lvm_tgi") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@lvm_tgi", @@ -29,6 +33,8 @@ ) @register_statistics(names=["opea_service@lvm_tgi"]) async def lvm(request: LVMDoc): + if logflag: + logger.info(request) start = time.time() stream_gen_time = [] img_b64_str = request.image @@ -60,9 +66,11 @@ async def stream_generator(): stream_gen_time.append(time.time() - start) chat_response += text chunk_repr = repr(text.encode("utf-8")) - print(f"[llm - chat_stream] chunk:{chunk_repr}") + if logflag: + logger.info(f"[llm - chat_stream] chunk:{chunk_repr}") yield f"data: {chunk_repr}\n\n" - print(f"[llm - chat_stream] stream response: {chat_response}") + if logflag: + logger.info(f"[llm - chat_stream] stream response: {chat_response}") statistics_dict["opea_service@lvm_tgi"].append_latency(stream_gen_time[-1], stream_gen_time[0]) yield "data: [DONE]\n\n" @@ -77,11 +85,13 @@ async def stream_generator(): top_p=top_p, ) statistics_dict["opea_service@lvm_tgi"].append_latency(time.time() - start, None) + if logflag: + logger.info(generated_str) return TextDoc(text=generated_str) if __name__ == "__main__": lvm_endpoint = os.getenv("LVM_ENDPOINT", "http://localhost:8399") lvm_client = AsyncInferenceClient(lvm_endpoint) - print("[LVM] LVM initialized.") + logger.info("[LVM] LVM initialized.") opea_microservices["opea_service@lvm_tgi"].start() diff --git a/comps/prompt_registry/mongo/prompt.py b/comps/prompt_registry/mongo/prompt.py index e8d7d285e..fa54ea0d3 100644 --- a/comps/prompt_registry/mongo/prompt.py +++ b/comps/prompt_registry/mongo/prompt.py @@ -1,12 +1,17 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os from typing import Optional from mongo_store import PromptStore from pydantic import BaseModel +from comps import CustomLogger from comps.cores.mega.micro_service import opea_microservices, register_microservice +logger = CustomLogger("prompt_mongo") +logflag = os.getenv("LOGFLAG", False) + class PromptCreate(BaseModel): """This class represents the data model for creating and storing a new prompt in the database. @@ -49,15 +54,18 @@ async def create_prompt(prompt: PromptCreate): Returns: JSON (PromptResponse): PromptResponse class object, None otherwise. """ + if logflag: + logger.info(prompt) try: prompt_store = PromptStore(prompt.user) prompt_store.initialize_storage() response = await prompt_store.save_prompt(prompt) - + if logflag: + logger.info(response) return response except Exception as e: - print(f"An error occurred: {str(e)}") + logger.info(f"An error occurred: {str(e)}") return None @@ -77,6 +85,8 @@ async def get_prompt(prompt: PromptId): Returns: JSON: Retrieved prompt data if successful, None otherwise. """ + if logflag: + logger.info(prompt) try: prompt_store = PromptStore(prompt.user) prompt_store.initialize_storage() @@ -86,11 +96,12 @@ async def get_prompt(prompt: PromptId): response = await prompt_store.prompt_search(prompt.prompt_text) else: response = await prompt_store.get_all_prompt_of_user() - + if logflag: + logger.info(response) return response except Exception as e: - print(f"An error occurred: {str(e)}") + logger.info(f"An error occurred: {str(e)}") return None @@ -110,6 +121,8 @@ async def delete_prompt(prompt: PromptId): Returns: Result of deletion if successful, None otherwise. """ + if logflag: + logger.info(prompt) try: prompt_store = PromptStore(prompt.user) prompt_store.initialize_storage() @@ -117,10 +130,12 @@ async def delete_prompt(prompt: PromptId): raise Exception("Prompt id is required.") else: response = await prompt_store.delete_prompt(prompt.prompt_id) + if logflag: + logger.info(response) return response except Exception as e: - print(f"An error occurred: {str(e)}") + logger.info(f"An error occurred: {str(e)}") return None diff --git a/comps/ragas/tgi/llm.py b/comps/ragas/tgi/llm.py index 03c214d30..0b67164a4 100644 --- a/comps/ragas/tgi/llm.py +++ b/comps/ragas/tgi/llm.py @@ -14,7 +14,18 @@ from ragas import evaluate from ragas.metrics import answer_relevancy, context_precision, context_recall, faithfulness -from comps import GeneratedDoc, RAGASParams, RAGASScores, ServiceType, opea_microservices, register_microservice +from comps import ( + CustomLogger, + GeneratedDoc, + RAGASParams, + RAGASScores, + ServiceType, + opea_microservices, + register_microservice, +) + +logger = CustomLogger("ragas_tgi_llm") +logflag = os.getenv("LOGFLAG", False) tei_embedding_endpoint = os.getenv("TEI_ENDPOINT") EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") @@ -30,6 +41,8 @@ output_datatype=RAGASScores, ) def llm_generate(input: RAGASParams): + if logflag: + logger.info(input) llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") # Create vectorstore @@ -71,13 +84,15 @@ def llm_generate(input: RAGASParams): faithfulness_average = df["faithfulness"][:].mean() context_recall_average = df["context_recall"][:].mean() context_precision_average = df["context_precision"][:].mean() - - return RAGASScores( + result = RAGASScores( answer_relevancy=answer_relevancy_average, faithfulness=faithfulness_average, context_recallL=context_recall_average, context_precision=context_precision_average, ) + if logflag: + logger.info(result) + return result if __name__ == "__main__": diff --git a/comps/reranks/fastrag/local_reranking.py b/comps/reranks/fastrag/local_reranking.py index 4548ef9a7..d6f33193c 100644 --- a/comps/reranks/fastrag/local_reranking.py +++ b/comps/reranks/fastrag/local_reranking.py @@ -1,13 +1,19 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os + from config import RANKER_MODEL from fastrag.rankers import IPEXBiEncoderSimilarityRanker from haystack import Document +from comps import CustomLogger from comps.cores.mega.micro_service import ServiceType, opea_microservices, register_microservice from comps.cores.proto.docarray import RerankedDoc, SearchedDoc, TextDoc +logger = CustomLogger("local_reranking") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@local_reranking", @@ -19,12 +25,16 @@ output_datatype=RerankedDoc, ) def reranking(input: SearchedDoc) -> RerankedDoc: + if logflag: + logger.info(input) documents = [] for i, d in enumerate(input.retrieved_docs): documents.append(Document(content=d.text, id=(i + 1))) sorted_documents = reranker_model.run(input.initial_query, documents)["documents"] ranked_documents = [TextDoc(id=doc.id, text=doc.content) for doc in sorted_documents] res = RerankedDoc(initial_query=input.initial_query, reranked_docs=ranked_documents) + if logflag: + logger.info(res) return res diff --git a/comps/reranks/langchain-mosec/reranking_mosec_xeon.py b/comps/reranks/langchain-mosec/reranking_mosec_xeon.py index da3d7854a..1f222beb3 100644 --- a/comps/reranks/langchain-mosec/reranking_mosec_xeon.py +++ b/comps/reranks/langchain-mosec/reranking_mosec_xeon.py @@ -24,6 +24,7 @@ from langchain_core.prompts import ChatPromptTemplate from comps import ( + CustomLogger, LLMParamsDoc, SearchedDoc, ServiceType, @@ -33,6 +34,9 @@ statistics_dict, ) +logger = CustomLogger("reranking_mosec_xeon") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@reranking_mosec_xeon", @@ -45,7 +49,8 @@ ) @register_statistics(names=["opea_service@reranking_mosec_xeon"]) def reranking(input: SearchedDoc) -> LLMParamsDoc: - print("reranking input: ", input) + if logflag: + logger.info("reranking input: ", input) start = time.time() if input.retrieved_docs: docs = [doc.text for doc in input.retrieved_docs] @@ -67,8 +72,12 @@ def reranking(input: SearchedDoc) -> LLMParamsDoc: prompt = ChatPromptTemplate.from_template(template) final_prompt = prompt.format(context=doc.text, question=input.initial_query) statistics_dict["opea_service@reranking_mosec_xeon"].append_latency(time.time() - start, None) + if logflag: + logger.info(final_prompt.strip()) return LLMParamsDoc(query=final_prompt.strip()) else: + if logflag: + logger.info(input.initial_query) return LLMParamsDoc(query=input.initial_query) diff --git a/comps/reranks/tei/local_reranking.py b/comps/reranks/tei/local_reranking.py index f02a95823..284cca7e6 100644 --- a/comps/reranks/tei/local_reranking.py +++ b/comps/reranks/tei/local_reranking.py @@ -1,9 +1,22 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os + from sentence_transformers import CrossEncoder -from comps import RerankedDoc, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import ( + CustomLogger, + RerankedDoc, + SearchedDoc, + ServiceType, + TextDoc, + opea_microservices, + register_microservice, +) + +logger = CustomLogger("local_reranking") +logflag = os.getenv("LOGFLAG", False) @register_microservice( @@ -16,10 +29,14 @@ output_datatype=RerankedDoc, ) def reranking(input: SearchedDoc) -> RerankedDoc: + if logflag: + logger.info(input) query_and_docs = [(input.initial_query, doc.text) for doc in input.retrieved_docs] scores = reranker_model.predict(query_and_docs) first_passage = sorted(list(zip(input.retrieved_docs, scores)), key=lambda x: x[1], reverse=True)[0][0] res = RerankedDoc(initial_query=input.initial_query, reranked_docs=[first_passage]) + if logflag: + logger.info(res) return res diff --git a/comps/reranks/tei/reranking_tei.py b/comps/reranks/tei/reranking_tei.py index 5575aa88f..cb423cf83 100644 --- a/comps/reranks/tei/reranking_tei.py +++ b/comps/reranks/tei/reranking_tei.py @@ -11,6 +11,7 @@ import requests from comps import ( + CustomLogger, LLMParamsDoc, SearchedDoc, ServiceType, @@ -26,6 +27,9 @@ RerankingResponseData, ) +logger = CustomLogger("reranking_tgi_gaudi") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@reranking_tgi_gaudi", @@ -40,7 +44,8 @@ def reranking( input: Union[SearchedDoc, RerankingRequest, ChatCompletionRequest] ) -> Union[LLMParamsDoc, RerankingResponse, ChatCompletionRequest]: - + if logflag: + logger.info(input) start = time.time() reranking_results = [] if input.retrieved_docs: @@ -63,17 +68,25 @@ def reranking( statistics_dict["opea_service@reranking_tgi_gaudi"].append_latency(time.time() - start, None) if isinstance(input, SearchedDoc): - return LLMParamsDoc(query=input.initial_query, documents=[doc["text"] for doc in reranking_results]) + result = [doc["text"] for doc in reranking_results] + if logflag: + logger.info(result) + return LLMParamsDoc(query=input.initial_query, documents=result) else: reranking_docs = [] for doc in reranking_results: reranking_docs.append(RerankingResponseData(text=doc["text"], score=doc["score"])) if isinstance(input, RerankingRequest): - return RerankingResponse(reranked_docs=reranking_docs) + result = RerankingResponse(reranked_docs=reranking_docs) + if logflag: + logger.info(result) + return result if isinstance(input, ChatCompletionRequest): input.reranked_docs = reranking_docs input.documents = [doc["text"] for doc in reranking_results] + if logflag: + logger.info(input) return input diff --git a/comps/retrievers/haystack/qdrant/retriever_qdrant.py b/comps/retrievers/haystack/qdrant/retriever_qdrant.py index 30d1bd72d..aee2e6fe1 100644 --- a/comps/retrievers/haystack/qdrant/retriever_qdrant.py +++ b/comps/retrievers/haystack/qdrant/retriever_qdrant.py @@ -1,12 +1,17 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os + from haystack.components.embedders import HuggingFaceTEITextEmbedder, SentenceTransformersTextEmbedder from haystack_integrations.components.retrievers.qdrant import QdrantEmbeddingRetriever from haystack_integrations.document_stores.qdrant import QdrantDocumentStore from qdrant_config import EMBED_DIMENSION, EMBED_ENDPOINT, EMBED_MODEL, INDEX_NAME, QDRANT_HOST, QDRANT_PORT -from comps import EmbedDoc, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import CustomLogger, EmbedDoc, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice + +logger = CustomLogger("retriever_qdrant") +logflag = os.getenv("LOGFLAG", False) # Create a pipeline for querying a Qdrant document store @@ -28,9 +33,13 @@ def initialize_qdrant_retriever() -> QdrantEmbeddingRetriever: port=7000, ) def retrieve(input: EmbedDoc) -> SearchedDoc: + if logflag: + logger.info(input) search_res = retriever.run(query_embedding=input.embedding)["documents"] searched_docs = [TextDoc(text=r.content) for r in search_res if r.content] result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) + if logflag: + logger.info(result) return result diff --git a/comps/retrievers/langchain/milvus/retriever_milvus.py b/comps/retrievers/langchain/milvus/retriever_milvus.py index 0c81e76ce..fb8fb64b2 100644 --- a/comps/retrievers/langchain/milvus/retriever_milvus.py +++ b/comps/retrievers/langchain/milvus/retriever_milvus.py @@ -19,6 +19,7 @@ from langchain_milvus.vectorstores import Milvus from comps import ( + CustomLogger, EmbedDoc, SearchedDoc, ServiceType, @@ -29,6 +30,9 @@ statistics_dict, ) +logger = CustomLogger("retriever_milvus") +logflag = os.getenv("LOGFLAG", False) + class MosecEmbeddings(OpenAIEmbeddings): def _get_len_safe_embeddings( @@ -64,6 +68,8 @@ def empty_embedding() -> List[float]: ) @register_statistics(names=["opea_service@retriever_milvus"]) def retrieve(input: EmbedDoc) -> SearchedDoc: + if logflag: + logger.info(input) vector_db = Milvus( embeddings, connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}, @@ -92,6 +98,8 @@ def retrieve(input: EmbedDoc) -> SearchedDoc: searched_docs.append(TextDoc(text=r.page_content)) result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) statistics_dict["opea_service@retriever_milvus"].append_latency(time.time() - start, None) + if logflag: + logger.info(result) return result diff --git a/comps/retrievers/langchain/pgvector/retriever_pgvector.py b/comps/retrievers/langchain/pgvector/retriever_pgvector.py index 014a616a5..d33a9f197 100644 --- a/comps/retrievers/langchain/pgvector/retriever_pgvector.py +++ b/comps/retrievers/langchain/pgvector/retriever_pgvector.py @@ -9,6 +9,7 @@ from langchain_community.vectorstores import PGVector from comps import ( + CustomLogger, EmbedDoc, SearchedDoc, ServiceType, @@ -19,6 +20,9 @@ statistics_dict, ) +logger = CustomLogger("retriever_pgvector") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") @@ -31,6 +35,8 @@ ) @register_statistics(names=["opea_service@retriever_pgvector"]) def retrieve(input: EmbedDoc) -> SearchedDoc: + if logflag: + logger.info(input) start = time.time() search_res = vector_db.similarity_search_by_vector(embedding=input.embedding) searched_docs = [] @@ -38,6 +44,8 @@ def retrieve(input: EmbedDoc) -> SearchedDoc: searched_docs.append(TextDoc(text=r.page_content)) result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) statistics_dict["opea_service@retriever_pgvector"].append_latency(time.time() - start, None) + if logflag: + logger.info(result) return result diff --git a/comps/retrievers/langchain/pinecone/retriever_pinecone.py b/comps/retrievers/langchain/pinecone/retriever_pinecone.py index 73e77d111..9bc2da893 100644 --- a/comps/retrievers/langchain/pinecone/retriever_pinecone.py +++ b/comps/retrievers/langchain/pinecone/retriever_pinecone.py @@ -10,6 +10,7 @@ from pinecone import Pinecone, ServerlessSpec from comps import ( + CustomLogger, EmbedDoc, SearchedDoc, ServiceType, @@ -20,6 +21,9 @@ statistics_dict, ) +logger = CustomLogger("retriever_pinecone") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") @@ -32,16 +36,21 @@ ) @register_statistics(names=["opea_service@retriever_pinecone"]) def retrieve(input: EmbedDoc) -> SearchedDoc: + if logflag: + logger.info(input) start = time.time() pc = Pinecone(api_key=PINECONE_API_KEY) index = pc.Index(PINECONE_INDEX_NAME) - print(index.describe_index_stats()["total_vector_count"]) + if logflag: + logger.info(index.describe_index_stats()["total_vector_count"]) # check if the Pinecone index has data if index.describe_index_stats()["total_vector_count"] == 0: result = SearchedDoc(retrieved_docs=[], initial_query=input.text) statistics_dict["opea_service@retriever_pinecone"].append_latency(time.time() - start, None) + if logflag: + logger.info(result) return result search_res = vector_db.max_marginal_relevance_search(query=input.text, k=input.k, fetch_k=input.fetch_k) @@ -66,6 +75,8 @@ def retrieve(input: EmbedDoc) -> SearchedDoc: searched_docs.append(TextDoc(text=r.page_content)) result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) statistics_dict["opea_service@retriever_pinecone"].append_latency(time.time() - start, None) + if logflag: + logger.info(result) return result diff --git a/comps/retrievers/langchain/redis/retriever_redis.py b/comps/retrievers/langchain/redis/retriever_redis.py index a4ab5dc4e..b4c901cb3 100644 --- a/comps/retrievers/langchain/redis/retriever_redis.py +++ b/comps/retrievers/langchain/redis/retriever_redis.py @@ -10,6 +10,7 @@ from redis_config import EMBED_MODEL, INDEX_NAME, REDIS_URL from comps import ( + CustomLogger, EmbedDoc, SearchedDoc, ServiceType, @@ -26,6 +27,9 @@ RetrievalResponseData, ) +logger = CustomLogger("retriever_redis") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") @@ -40,7 +44,8 @@ def retrieve( input: Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest] ) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: - + if logflag: + logger.info(input) start = time.time() # check if the Redis index has data if vector_db.client.keys() == []: @@ -89,6 +94,8 @@ def retrieve( result = input statistics_dict["opea_service@retriever_redis"].append_latency(time.time() - start, None) + if logflag: + logger.info(result) return result diff --git a/comps/retrievers/llamaindex/retriever_redis.py b/comps/retrievers/llamaindex/retriever_redis.py index 3c387010e..8c20e36c9 100644 --- a/comps/retrievers/llamaindex/retriever_redis.py +++ b/comps/retrievers/llamaindex/retriever_redis.py @@ -7,7 +7,10 @@ from llama_index.vector_stores.redis import RedisVectorStore from redis_config import INDEX_NAME, REDIS_URL -from comps import EmbedDoc, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice +from comps import CustomLogger, EmbedDoc, SearchedDoc, ServiceType, TextDoc, opea_microservices, register_microservice + +logger = CustomLogger("retriever_redis") +logflag = os.getenv("LOGFLAG", False) tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") @@ -20,12 +23,16 @@ port=7000, ) def retrieve(input: EmbedDoc) -> SearchedDoc: + if logflag: + logger.info(input) vector_store_query = VectorStoreQuery(query_embedding=input.embedding) search_res = vector_store.query(query=vector_store_query) searched_docs = [] for node, id, similarity in zip(search_res.nodes, search_res.ids, search_res.similarities): searched_docs.append(TextDoc(text=node.get_content())) result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) + if logflag: + logger.info(result) return result diff --git a/comps/tts/tts.py b/comps/tts/tts.py index 6c6bad232..050a1bbd5 100644 --- a/comps/tts/tts.py +++ b/comps/tts/tts.py @@ -9,6 +9,7 @@ from comps import ( Base64ByteStrDoc, + CustomLogger, ServiceType, TextDoc, opea_microservices, @@ -17,6 +18,9 @@ statistics_dict, ) +logger = CustomLogger("tts") +logflag = os.getenv("LOGFLAG", False) + @register_microservice( name="opea_service@tts", @@ -29,16 +33,21 @@ ) @register_statistics(names=["opea_service@tts"]) async def text_to_audio(input: TextDoc): + if logflag: + logger.info(input) start = time.time() text = input.text inputs = {"text": text} response = requests.post(url=f"{tts_endpoint}/v1/tts", data=json.dumps(inputs), proxies={"http": None}) statistics_dict["opea_service@tts"].append_latency(time.time() - start, None) - return Base64ByteStrDoc(byte_str=response.json()["tts_result"]) + result = Base64ByteStrDoc(byte_str=response.json()["tts_result"]) + if logflag: + logger.info(result) + return result if __name__ == "__main__": tts_endpoint = os.getenv("TTS_ENDPOINT", "http://localhost:7055") - print("[tts - router] TTS initialized.") + logger.info("[tts - router] TTS initialized.") opea_microservices["opea_service@tts"].start() diff --git a/comps/web_retrievers/langchain/chroma/retriever_chroma.py b/comps/web_retrievers/langchain/chroma/retriever_chroma.py index 3fbd1b755..53d9f8c36 100644 --- a/comps/web_retrievers/langchain/chroma/retriever_chroma.py +++ b/comps/web_retrievers/langchain/chroma/retriever_chroma.py @@ -12,6 +12,7 @@ from langchain_huggingface import HuggingFaceEndpointEmbeddings from comps import ( + CustomLogger, EmbedDoc, SearchedDoc, ServiceType, @@ -22,6 +23,9 @@ statistics_dict, ) +logger = CustomLogger("web_retriever_chroma") +logflag = os.getenv("LOGFLAG", False) + tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT") @@ -37,7 +41,8 @@ def retrieve_htmls(all_urls): def parse_htmls(docs): - print("Indexing new urls...") + if logflag: + logger.info("Indexing new urls...") html2text = Html2TextTransformer() docs = list(html2text.transform_documents(docs)) @@ -59,6 +64,8 @@ def dump_docs(docs): ) @register_statistics(names=["opea_service@web_retriever_chroma", "opea_service@search"]) def web_retrieve(input: EmbedDoc) -> SearchedDoc: + if logflag: + logger.info(input) start = time.time() query = input.text embedding = input.embedding @@ -70,10 +77,12 @@ def web_retrieve(input: EmbedDoc) -> SearchedDoc: if res.get("link", None): urls_to_look.append(res["link"]) urls = list(set(urls_to_look)) - print(f"urls: {urls}") + if logflag: + logger.info(f"urls: {urls}") docs = retrieve_htmls(urls) docs = parse_htmls(docs) - print(docs) + if logflag: + logger.info(docs) # Remove duplicated docs unique_documents_dict = {(doc.page_content, tuple(sorted(doc.metadata.items()))): doc for doc in docs} unique_documents = list(unique_documents_dict.values()) @@ -101,6 +110,8 @@ def web_retrieve(input: EmbedDoc) -> SearchedDoc: # For Now history is banned if vector_db.get()["ids"]: vector_db.delete(vector_db.get()["ids"]) + if logflag: + logger.info(result) return result