diff --git a/dspy/__init__.py b/dspy/__init__.py index d79a51d5db..aa555ad32d 100644 --- a/dspy/__init__.py +++ b/dspy/__init__.py @@ -18,8 +18,8 @@ configure_dspy_loggers(__name__) -from dspy.dsp.modules.colbertv2 import ColBERTv2 -# from dspy.dsp.modules.you import You +from dspy.dsp.colbertv2 import ColBERTv2 +# from dspy.dsp.you import You configure = settings.configure context = settings.context diff --git a/dspy/dsp/modules/cache_utils.py b/dspy/dsp/cache_utils.py similarity index 100% rename from dspy/dsp/modules/cache_utils.py rename to dspy/dsp/cache_utils.py diff --git a/dspy/dsp/modules/colbertv2.py b/dspy/dsp/colbertv2.py similarity index 99% rename from dspy/dsp/modules/colbertv2.py rename to dspy/dsp/colbertv2.py index ba0ccbea15..6b1fd8cb3e 100644 --- a/dspy/dsp/modules/colbertv2.py +++ b/dspy/dsp/colbertv2.py @@ -3,7 +3,7 @@ import requests -from dspy.dsp.modules.cache_utils import CacheMemory, NotebookCacheMemory +from dspy.dsp.cache_utils import CacheMemory, NotebookCacheMemory from dspy.dsp.utils import dotdict # TODO: Ideally, this takes the name of the index and looks up its port. diff --git a/dspy/retrieve/faiss_rm.py b/dspy/retrieve/faiss_rm.py index a66140f939..5ac4de971a 100755 --- a/dspy/retrieve/faiss_rm.py +++ b/dspy/retrieve/faiss_rm.py @@ -1,156 +1,156 @@ -"""Retriever model for faiss: https://github.com/facebookresearch/faiss. -Author: Jagane Sundar: https://github.com/jagane. -""" - -import logging -from typing import Optional, Union - -import numpy as np - -import dspy -from dspy.dsp.modules.sentence_vectorizer import SentenceTransformersVectorizer -from dspy.dsp.utils import dotdict - -try: - import faiss -except ImportError: - faiss = None - -if faiss is None: - raise ImportError( - """ - The faiss package is required. Install it using `pip install dspy-ai[faiss-cpu]` - """, - ) - - -logger = logging.getLogger(__name__) -class FaissRM(dspy.Retrieve): - """A retrieval module that uses an in-memory Faiss to return the top passages for a given query. - - Args: - document_chunks: the input text chunks - vectorizer: an object that is a subclass of BaseSentenceVectorizer - k (int, optional): The number of top passages to retrieve. Defaults to 3. - - Returns: - dspy.Prediction: An object containing the retrieved passages. - - Examples: - Below is a code snippet that shows how to use this as the default retriver: - ```python - import dspy - from dspy.retrieve import faiss_rm - - document_chunks = [ - "The superbowl this year was played between the San Francisco 49ers and the Kanasas City Chiefs", - "Pop corn is often served in a bowl", - "The Rice Bowl is a Chinese Restaurant located in the city of Tucson, Arizona", - "Mars is the fourth planet in the Solar System", - "An aquarium is a place where children can learn about marine life", - "The capital of the United States is Washington, D.C", - "Rock and Roll musicians are honored by being inducted in the Rock and Roll Hall of Fame", - "Music albums were published on Long Play Records in the 70s and 80s", - "Sichuan cuisine is a spicy cuisine from central China", - "The interest rates for mortgages is considered to be very high in 2024", - ] - - frm = faiss_rm.FaissRM(document_chunks) - turbo = dspy.OpenAI(model="gpt-3.5-turbo") - dspy.settings.configure(lm=turbo, rm=frm) - print(frm(["I am in the mood for Chinese food"])) - ``` - - Below is a code snippet that shows how to use this in the forward() function of a module - ```python - self.retrieve = FaissRM(k=num_passages) - ``` - """ - - def __init__(self, document_chunks, vectorizer=None, k: int = 3): - """Inits the faiss retriever. - - Args: - document_chunks: a list of input strings. - vectorizer: an object that is a subclass of BaseTransformersVectorizer. - k: number of matches to return. - """ - if vectorizer: - self._vectorizer = vectorizer - else: - self._vectorizer = SentenceTransformersVectorizer() - embeddings = self._vectorizer(document_chunks) - xb = np.array(embeddings) - d = len(xb[0]) - logger.info(f"FaissRM: embedding size={d}") - if len(xb) < 100: - self._faiss_index = faiss.IndexFlatL2(d) - self._faiss_index.add(xb) - else: - # if we have at least 100 vectors, we use Voronoi cells - nlist = 100 - quantizer = faiss.IndexFlatL2(d) - self._faiss_index = faiss.IndexIVFFlat(quantizer, d, nlist) - self._faiss_index.train(xb) - self._faiss_index.add(xb) - - logger.info(f"{self._faiss_index.ntotal} vectors in faiss index") - self._document_chunks = document_chunks # save the input document chunks - - super().__init__(k=k) - - def _dump_raw_results(self, queries, index_list, distance_list) -> None: - for i in range(len(queries)): - indices = index_list[i] - distances = distance_list[i] - logger.debug(f"Query: {queries[i]}") - for j in range(len(indices)): - logger.debug(f" Hit {j} = {indices[j]}/{distances[j]}: {self._document_chunks[indices[j]]}") - return - - def forward(self, query_or_queries: Union[str, list[str]], k: Optional[int] = None, **kwargs) -> dspy.Prediction: - """Search the faiss index for k or self.k top passages for query. - - Args: - query_or_queries (Union[str, List[str]]): The query or queries to search for. - - Returns: - dspy.Prediction: An object containing the retrieved passages. - """ - queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries - queries = [q for q in queries if q] # Filter empty queries - embeddings = self._vectorizer(queries) - emb_npa = np.array(embeddings) - # For single query, just look up the top k passages - if len(queries) == 1: - distance_list, index_list = self._faiss_index.search(emb_npa, k or self.k) - # self._dump_raw_results(queries, index_list, distance_list) - passages = [(self._document_chunks[ind], ind) for ind in index_list[0]] - return [dotdict({"long_text": passage[0], "index": passage[1]}) for passage in passages] - - distance_list, index_list = self._faiss_index.search(emb_npa, (k or self.k) * 3, **kwargs) - # self._dump_raw_results(queries, index_list, distance_list) - passage_scores = {} - for emb in range(len(embeddings)): - indices = index_list[emb] # indices of neighbors for embeddings[emb] - this is an array of k*3 integers - distances = distance_list[ - emb - ] # distances of neighbors for embeddings[emb] - this is an array of k*3 floating point numbers - for res in range((k or self.k) * 3): - neighbor = indices[res] - distance = distances[res] - if neighbor in passage_scores: - passage_scores[neighbor].append(distance) - else: - passage_scores[neighbor] = [distance] - # Note re. sorting: - # first degree sort: number of queries that got a hit with any particular document chunk. More - # is a better match. This is len(queries)-len(x[1]) - # second degree sort: sum of the distances of each hit returned by faiss. Smaller distance is a better match - sorted_passages = sorted(passage_scores.items(), key=lambda x: (len(queries) - len(x[1]), sum(x[1])))[ - : k or self.k - ] - return [ - dotdict({"long_text": self._document_chunks[passage_index], "index": passage_index}) - for passage_index, _ in sorted_passages - ] +# """Retriever model for faiss: https://github.com/facebookresearch/faiss. +# Author: Jagane Sundar: https://github.com/jagane. +# """ + +# import logging +# from typing import Optional, Union + +# import numpy as np + +# import dspy +# from dspy.dsp.modules.sentence_vectorizer import SentenceTransformersVectorizer +# from dspy.dsp.utils import dotdict + +# try: +# import faiss +# except ImportError: +# faiss = None + +# if faiss is None: +# raise ImportError( +# """ +# The faiss package is required. Install it using `pip install dspy-ai[faiss-cpu]` +# """, +# ) + + +# logger = logging.getLogger(__name__) +# class FaissRM(dspy.Retrieve): +# """A retrieval module that uses an in-memory Faiss to return the top passages for a given query. + +# Args: +# document_chunks: the input text chunks +# vectorizer: an object that is a subclass of BaseSentenceVectorizer +# k (int, optional): The number of top passages to retrieve. Defaults to 3. + +# Returns: +# dspy.Prediction: An object containing the retrieved passages. + +# Examples: +# Below is a code snippet that shows how to use this as the default retriver: +# ```python +# import dspy +# from dspy.retrieve import faiss_rm + +# document_chunks = [ +# "The superbowl this year was played between the San Francisco 49ers and the Kanasas City Chiefs", +# "Pop corn is often served in a bowl", +# "The Rice Bowl is a Chinese Restaurant located in the city of Tucson, Arizona", +# "Mars is the fourth planet in the Solar System", +# "An aquarium is a place where children can learn about marine life", +# "The capital of the United States is Washington, D.C", +# "Rock and Roll musicians are honored by being inducted in the Rock and Roll Hall of Fame", +# "Music albums were published on Long Play Records in the 70s and 80s", +# "Sichuan cuisine is a spicy cuisine from central China", +# "The interest rates for mortgages is considered to be very high in 2024", +# ] + +# frm = faiss_rm.FaissRM(document_chunks) +# turbo = dspy.OpenAI(model="gpt-3.5-turbo") +# dspy.settings.configure(lm=turbo, rm=frm) +# print(frm(["I am in the mood for Chinese food"])) +# ``` + +# Below is a code snippet that shows how to use this in the forward() function of a module +# ```python +# self.retrieve = FaissRM(k=num_passages) +# ``` +# """ + +# def __init__(self, document_chunks, vectorizer=None, k: int = 3): +# """Inits the faiss retriever. + +# Args: +# document_chunks: a list of input strings. +# vectorizer: an object that is a subclass of BaseTransformersVectorizer. +# k: number of matches to return. +# """ +# if vectorizer: +# self._vectorizer = vectorizer +# else: +# self._vectorizer = SentenceTransformersVectorizer() +# embeddings = self._vectorizer(document_chunks) +# xb = np.array(embeddings) +# d = len(xb[0]) +# logger.info(f"FaissRM: embedding size={d}") +# if len(xb) < 100: +# self._faiss_index = faiss.IndexFlatL2(d) +# self._faiss_index.add(xb) +# else: +# # if we have at least 100 vectors, we use Voronoi cells +# nlist = 100 +# quantizer = faiss.IndexFlatL2(d) +# self._faiss_index = faiss.IndexIVFFlat(quantizer, d, nlist) +# self._faiss_index.train(xb) +# self._faiss_index.add(xb) + +# logger.info(f"{self._faiss_index.ntotal} vectors in faiss index") +# self._document_chunks = document_chunks # save the input document chunks + +# super().__init__(k=k) + +# def _dump_raw_results(self, queries, index_list, distance_list) -> None: +# for i in range(len(queries)): +# indices = index_list[i] +# distances = distance_list[i] +# logger.debug(f"Query: {queries[i]}") +# for j in range(len(indices)): +# logger.debug(f" Hit {j} = {indices[j]}/{distances[j]}: {self._document_chunks[indices[j]]}") +# return + +# def forward(self, query_or_queries: Union[str, list[str]], k: Optional[int] = None, **kwargs) -> dspy.Prediction: +# """Search the faiss index for k or self.k top passages for query. + +# Args: +# query_or_queries (Union[str, List[str]]): The query or queries to search for. + +# Returns: +# dspy.Prediction: An object containing the retrieved passages. +# """ +# queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries +# queries = [q for q in queries if q] # Filter empty queries +# embeddings = self._vectorizer(queries) +# emb_npa = np.array(embeddings) +# # For single query, just look up the top k passages +# if len(queries) == 1: +# distance_list, index_list = self._faiss_index.search(emb_npa, k or self.k) +# # self._dump_raw_results(queries, index_list, distance_list) +# passages = [(self._document_chunks[ind], ind) for ind in index_list[0]] +# return [dotdict({"long_text": passage[0], "index": passage[1]}) for passage in passages] + +# distance_list, index_list = self._faiss_index.search(emb_npa, (k or self.k) * 3, **kwargs) +# # self._dump_raw_results(queries, index_list, distance_list) +# passage_scores = {} +# for emb in range(len(embeddings)): +# indices = index_list[emb] # indices of neighbors for embeddings[emb] - this is an array of k*3 integers +# distances = distance_list[ +# emb +# ] # distances of neighbors for embeddings[emb] - this is an array of k*3 floating point numbers +# for res in range((k or self.k) * 3): +# neighbor = indices[res] +# distance = distances[res] +# if neighbor in passage_scores: +# passage_scores[neighbor].append(distance) +# else: +# passage_scores[neighbor] = [distance] +# # Note re. sorting: +# # first degree sort: number of queries that got a hit with any particular document chunk. More +# # is a better match. This is len(queries)-len(x[1]) +# # second degree sort: sum of the distances of each hit returned by faiss. Smaller distance is a better match +# sorted_passages = sorted(passage_scores.items(), key=lambda x: (len(queries) - len(x[1]), sum(x[1])))[ +# : k or self.k +# ] +# return [ +# dotdict({"long_text": self._document_chunks[passage_index], "index": passage_index}) +# for passage_index, _ in sorted_passages +# ] diff --git a/dspy/retrieve/my_scale_rm.py b/dspy/retrieve/my_scale_rm.py index 2b4a489792..8d61d00287 100644 --- a/dspy/retrieve/my_scale_rm.py +++ b/dspy/retrieve/my_scale_rm.py @@ -1,231 +1,231 @@ -import functools -import os -from typing import List, Optional - -import openai - -import dspy -from dspy.dsp.modules.cache_utils import NotebookCacheMemory, cache_turn_on -from dspy.dsp.utils import dotdict - -# Check for necessary libraries and suggest installation if not found. -try: - import clickhouse_connect -except ImportError: - raise ImportError( - "The 'myscale' extra is required to use MyScaleRM. Install it with `pip install dspy-ai[myscale]`", - ) - -# Verify the compatibility of the OpenAI library version installed. -try: - major, minor, _ = map(int, openai.__version__.split(".")) - OPENAI_VERSION_COMPATIBLE = major >= 1 and minor >= 16 -except Exception: - OPENAI_VERSION_COMPATIBLE = False - -if not OPENAI_VERSION_COMPATIBLE: - raise ImportError( - "An incompatible OpenAI library version is installed. Ensure you have version 1.16.1 or later.", - ) - -# Attempt to handle specific OpenAI errors; fallback to general ones if necessary. -try: - import openai.error - - ERRORS = (openai.error.RateLimitError, openai.error.ServiceUnavailableError, openai.error.APIError) -except Exception: - ERRORS = (openai.RateLimitError, openai.APIError) - - -class MyScaleRM(dspy.Retrieve): - """ - A retrieval module that uses MyScaleDB to return the top passages for a given query. - - MyScaleDB is a fork of ClickHouse that focuses on vector similarity search and full - text search. MyScaleRM is designed to facilitate easy retrieval of information from - MyScaleDB using embeddings. It supports embedding generation through either a local - model or the OpenAI API. This class abstracts away the complexities of connecting to - MyScaleDB, managing API keys, and processing queries to return semantically - relevant results. - - Assumes that a table named `database.table` exists in MyScaleDB, and that the - table has column named `vector_column` that stores vector data and a vector index has - been created on this column. Other metadata are stored in `metadata_columns`. - - Args: - client (clickhouse_connect.driver.client.Client): A client connection to the MyScaleDB. - table (str): Name of the table within the database to perform queries against. - database (str, optional): Name of the database to query within MyScaleDB. - metadata_columns(List[str], optional): A list of columns to include in the results. - vector_column (str, optional): The name of the column in the table that stores vector data. - k (int, optional): The number of closest matches to retrieve for a given query. - openai_api_key (str, optional): The API key for accessing OpenAI's services. - model (str, optional): Specifies the particular OpenAI model to use for embedding generation. - use_local_model (bool): Flag indicating whether a local model is used for embeddings. - - """ - - def __init__( - self, - client: clickhouse_connect.driver.client.Client, - table: str, - database: str = "default", - metadata_columns: List[str] = ["text"], - vector_column: str = "vector", - k: int = 3, - openai_api_key: Optional[str] = None, - openai_model: Optional[str] = None, - local_embed_model: Optional[str] = None, - ): - self.client = client - self.database = database - self.table = table - if not metadata_columns: - raise ValueError("metadata_columns is required") - self.metadata_columns = metadata_columns - self.vector_column = vector_column - self.k = k - self.openai_api_key = openai_api_key - self.model = openai_model - self.use_local_model = False - - if local_embed_model: - self.setup_local_model(local_embed_model) - elif openai_api_key: - os.environ["OPENAI_API_KEY"] = self.openai_api_key - - def setup_local_model(self, model_name: str): - """ - Configures a local model for embedding generation, including model and tokenizer loading. - - Args: - model_name: The name or path to the pre-trained model to load. - - Raises: - ModuleNotFoundError: If necessary libraries (torch or transformers) are not installed. - """ - try: - import torch - from transformers import AutoModel, AutoTokenizer - except ImportError as exc: - raise ModuleNotFoundError( - """You need to install PyTorch and Hugging Face's transformers library to use a local embedding model. - Install the pytorch using `pip install torch` and transformers using `pip install transformers` """, - ) from exc - - try: - self._local_embed_model = AutoModel.from_pretrained(model_name) - self._local_tokenizer = AutoTokenizer.from_pretrained(model_name) - self.use_local_model = True - except Exception as e: - raise ValueError(f"Failed to load model or tokenizer. Error: {str(e)}") - - if torch.cuda.is_available(): - self.device = torch.device("cuda:0") - elif torch.backends.mps.is_available(): - self.device = torch.device("mps") - else: - self.device = torch.device("cpu") - - self._local_embed_model.to(self.device) - - @functools.lru_cache(maxsize=None if cache_turn_on else 0) - @NotebookCacheMemory.cache - def get_embeddings(self, query: str) -> List[float]: - """ - Determines the appropriate source (OpenAI or local model) for embedding generation based on class configuration, - and retrieves embeddings for the provided queries. - - Args: - query: A query to generate embeddings for. - - Returns: - A list of embeddings corresponding to the query in the input list. - - Raises: - ValueError: If neither an OpenAI API key nor a local model has been configured. - """ - if self.openai_api_key and self.model: - return self._get_embeddings_from_openai(query) - elif self.use_local_model: - return self._get_embedding_from_local_model(query) - else: - raise ValueError("No valid method for obtaining embeddings is configured.") - - # TO DO Add this method as Util method outside MyScaleRM - def _get_embeddings_from_openai(self, query: str) -> List[float]: - """ - Uses the OpenAI API to generate embeddings for the given query. - - Args: - query: A string for which to generate embeddings. - - Returns: - A list containing the embedding of a query. - """ - response = openai.embeddings.create(model=self.model, input=query) - return response.data[0].embedding - - # TO DO Add this method as Util method outside MyScaleRM - def _get_embedding_from_local_model(self, query: str) -> List[float]: - """ - Generates embeddings for a single query using the configured local model. - - Args: - query: The text query to generate an embedding for. - - Returns: - A list representing the query's embedding. - """ - import torch - - self._local_embed_model.eval() # Ensure the model is in evaluation mode - - inputs = self._local_tokenizer(query, return_tensors="pt", padding=True, truncation=True).to(self.device) - with torch.no_grad(): - output = self._local_embed_model(**inputs) - - return output.last_hidden_state.mean(dim=1).squeeze().numpy().tolist() - - def forward(self, user_query: str, k: Optional[int] = None) -> List[dotdict]: - """ - Executes a retrieval operation based on a user's query and returns the top k relevant results. - - Args: - user_query: The query text to search for. - k: Optional; The number of top matches to return. Defaults to the class's configured k value. - - Returns: - A list of dotdict objects containing the formatted retrieval results. - - Raises: - ValueError: If the user_query is None. - """ - if user_query is None: - raise ValueError("Query is required") - k = k or self.k - embeddings = self.get_embeddings(user_query) - columns_string = ", ".join(self.metadata_columns) - result = self.client.query(f""" - SELECT {columns_string}, - distance({self.vector_column}, {embeddings}) as dist FROM {self.database}.{self.table} ORDER BY dist LIMIT {k} - """) - result = self.client.query(f""" - SELECT {columns_string}, distance({self.vector_column}, {embeddings}) AS dist - FROM {self.database}.{self.table} - ORDER BY dist - LIMIT {k} - """) - - # Convert the metadata into strings to pass to dspy.Prediction - results = [] - for row in result.named_results(): - if len(self.metadata_columns) == 1: - results.append(row[self.metadata_columns[0]]) - else: - row_strings = [f"{column}: {row[column]}" for column in self.metadata_columns] # Format row data - row_string = "\n".join(row_strings) # Combine formatted data - results.append(row_string) # Append to results - - # Return list of dotdict - return [dotdict({"long_text": passage}) for passage in results] +# import functools +# import os +# from typing import List, Optional + +# import openai + +# import dspy +# from dspy.dsp.modules.cache_utils import NotebookCacheMemory, cache_turn_on +# from dspy.dsp.utils import dotdict + +# # Check for necessary libraries and suggest installation if not found. +# try: +# import clickhouse_connect +# except ImportError: +# raise ImportError( +# "The 'myscale' extra is required to use MyScaleRM. Install it with `pip install dspy-ai[myscale]`", +# ) + +# # Verify the compatibility of the OpenAI library version installed. +# try: +# major, minor, _ = map(int, openai.__version__.split(".")) +# OPENAI_VERSION_COMPATIBLE = major >= 1 and minor >= 16 +# except Exception: +# OPENAI_VERSION_COMPATIBLE = False + +# if not OPENAI_VERSION_COMPATIBLE: +# raise ImportError( +# "An incompatible OpenAI library version is installed. Ensure you have version 1.16.1 or later.", +# ) + +# # Attempt to handle specific OpenAI errors; fallback to general ones if necessary. +# try: +# import openai.error + +# ERRORS = (openai.error.RateLimitError, openai.error.ServiceUnavailableError, openai.error.APIError) +# except Exception: +# ERRORS = (openai.RateLimitError, openai.APIError) + + +# class MyScaleRM(dspy.Retrieve): +# """ +# A retrieval module that uses MyScaleDB to return the top passages for a given query. + +# MyScaleDB is a fork of ClickHouse that focuses on vector similarity search and full +# text search. MyScaleRM is designed to facilitate easy retrieval of information from +# MyScaleDB using embeddings. It supports embedding generation through either a local +# model or the OpenAI API. This class abstracts away the complexities of connecting to +# MyScaleDB, managing API keys, and processing queries to return semantically +# relevant results. + +# Assumes that a table named `database.table` exists in MyScaleDB, and that the +# table has column named `vector_column` that stores vector data and a vector index has +# been created on this column. Other metadata are stored in `metadata_columns`. + +# Args: +# client (clickhouse_connect.driver.client.Client): A client connection to the MyScaleDB. +# table (str): Name of the table within the database to perform queries against. +# database (str, optional): Name of the database to query within MyScaleDB. +# metadata_columns(List[str], optional): A list of columns to include in the results. +# vector_column (str, optional): The name of the column in the table that stores vector data. +# k (int, optional): The number of closest matches to retrieve for a given query. +# openai_api_key (str, optional): The API key for accessing OpenAI's services. +# model (str, optional): Specifies the particular OpenAI model to use for embedding generation. +# use_local_model (bool): Flag indicating whether a local model is used for embeddings. + +# """ + +# def __init__( +# self, +# client: clickhouse_connect.driver.client.Client, +# table: str, +# database: str = "default", +# metadata_columns: List[str] = ["text"], +# vector_column: str = "vector", +# k: int = 3, +# openai_api_key: Optional[str] = None, +# openai_model: Optional[str] = None, +# local_embed_model: Optional[str] = None, +# ): +# self.client = client +# self.database = database +# self.table = table +# if not metadata_columns: +# raise ValueError("metadata_columns is required") +# self.metadata_columns = metadata_columns +# self.vector_column = vector_column +# self.k = k +# self.openai_api_key = openai_api_key +# self.model = openai_model +# self.use_local_model = False + +# if local_embed_model: +# self.setup_local_model(local_embed_model) +# elif openai_api_key: +# os.environ["OPENAI_API_KEY"] = self.openai_api_key + +# def setup_local_model(self, model_name: str): +# """ +# Configures a local model for embedding generation, including model and tokenizer loading. + +# Args: +# model_name: The name or path to the pre-trained model to load. + +# Raises: +# ModuleNotFoundError: If necessary libraries (torch or transformers) are not installed. +# """ +# try: +# import torch +# from transformers import AutoModel, AutoTokenizer +# except ImportError as exc: +# raise ModuleNotFoundError( +# """You need to install PyTorch and Hugging Face's transformers library to use a local embedding model. +# Install the pytorch using `pip install torch` and transformers using `pip install transformers` """, +# ) from exc + +# try: +# self._local_embed_model = AutoModel.from_pretrained(model_name) +# self._local_tokenizer = AutoTokenizer.from_pretrained(model_name) +# self.use_local_model = True +# except Exception as e: +# raise ValueError(f"Failed to load model or tokenizer. Error: {str(e)}") + +# if torch.cuda.is_available(): +# self.device = torch.device("cuda:0") +# elif torch.backends.mps.is_available(): +# self.device = torch.device("mps") +# else: +# self.device = torch.device("cpu") + +# self._local_embed_model.to(self.device) + +# @functools.lru_cache(maxsize=None if cache_turn_on else 0) +# @NotebookCacheMemory.cache +# def get_embeddings(self, query: str) -> List[float]: +# """ +# Determines the appropriate source (OpenAI or local model) for embedding generation based on class configuration, +# and retrieves embeddings for the provided queries. + +# Args: +# query: A query to generate embeddings for. + +# Returns: +# A list of embeddings corresponding to the query in the input list. + +# Raises: +# ValueError: If neither an OpenAI API key nor a local model has been configured. +# """ +# if self.openai_api_key and self.model: +# return self._get_embeddings_from_openai(query) +# elif self.use_local_model: +# return self._get_embedding_from_local_model(query) +# else: +# raise ValueError("No valid method for obtaining embeddings is configured.") + +# # TO DO Add this method as Util method outside MyScaleRM +# def _get_embeddings_from_openai(self, query: str) -> List[float]: +# """ +# Uses the OpenAI API to generate embeddings for the given query. + +# Args: +# query: A string for which to generate embeddings. + +# Returns: +# A list containing the embedding of a query. +# """ +# response = openai.embeddings.create(model=self.model, input=query) +# return response.data[0].embedding + +# # TO DO Add this method as Util method outside MyScaleRM +# def _get_embedding_from_local_model(self, query: str) -> List[float]: +# """ +# Generates embeddings for a single query using the configured local model. + +# Args: +# query: The text query to generate an embedding for. + +# Returns: +# A list representing the query's embedding. +# """ +# import torch + +# self._local_embed_model.eval() # Ensure the model is in evaluation mode + +# inputs = self._local_tokenizer(query, return_tensors="pt", padding=True, truncation=True).to(self.device) +# with torch.no_grad(): +# output = self._local_embed_model(**inputs) + +# return output.last_hidden_state.mean(dim=1).squeeze().numpy().tolist() + +# def forward(self, user_query: str, k: Optional[int] = None) -> List[dotdict]: +# """ +# Executes a retrieval operation based on a user's query and returns the top k relevant results. + +# Args: +# user_query: The query text to search for. +# k: Optional; The number of top matches to return. Defaults to the class's configured k value. + +# Returns: +# A list of dotdict objects containing the formatted retrieval results. + +# Raises: +# ValueError: If the user_query is None. +# """ +# if user_query is None: +# raise ValueError("Query is required") +# k = k or self.k +# embeddings = self.get_embeddings(user_query) +# columns_string = ", ".join(self.metadata_columns) +# result = self.client.query(f""" +# SELECT {columns_string}, +# distance({self.vector_column}, {embeddings}) as dist FROM {self.database}.{self.table} ORDER BY dist LIMIT {k} +# """) +# result = self.client.query(f""" +# SELECT {columns_string}, distance({self.vector_column}, {embeddings}) AS dist +# FROM {self.database}.{self.table} +# ORDER BY dist +# LIMIT {k} +# """) + +# # Convert the metadata into strings to pass to dspy.Prediction +# results = [] +# for row in result.named_results(): +# if len(self.metadata_columns) == 1: +# results.append(row[self.metadata_columns[0]]) +# else: +# row_strings = [f"{column}: {row[column]}" for column in self.metadata_columns] # Format row data +# row_string = "\n".join(row_strings) # Combine formatted data +# results.append(row_string) # Append to results + +# # Return list of dotdict +# return [dotdict({"long_text": passage}) for passage in results] diff --git a/dspy/retrieve/qdrant_rm.py b/dspy/retrieve/qdrant_rm.py index 01af9fccba..956adeab44 100644 --- a/dspy/retrieve/qdrant_rm.py +++ b/dspy/retrieve/qdrant_rm.py @@ -1,116 +1,116 @@ -from collections import defaultdict -from typing import Optional, Union - -import dspy -from dspy.dsp.modules.sentence_vectorizer import BaseSentenceVectorizer, FastEmbedVectorizer -from dspy.dsp.utils import dotdict - -try: - from qdrant_client import QdrantClient, models -except ImportError as e: - raise ImportError( - "The 'qdrant' extra is required to use QdrantRM. Install it with `pip install dspy-ai[qdrant]`", - ) from e - - -class QdrantRM(dspy.Retrieve): - """A retrieval module that uses Qdrant to return the top passages for a given query. - - Args: - qdrant_collection_name (str): The name of the Qdrant collection. - qdrant_client (QdrantClient): An instance of `qdrant_client.QdrantClient`. - k (int, optional): The default number of top passages to retrieve. Default: 3. - document_field (str, optional): The key in the Qdrant payload with the content. Default: `"document"`. - vectorizer (BaseSentenceVectorizer, optional): An implementation `BaseSentenceVectorizer`. - Default: `FastEmbedVectorizer`. - vector_name (str, optional): Name of the vector in the collection. Default: The first available vector name. - - Examples: - Below is a code snippet that shows how to use Qdrant as the default retriver: - ```python - from qdrant_client import QdrantClient - - llm = dspy.OpenAI(model="gpt-3.5-turbo") - qdrant_client = QdrantClient() - retriever_model = QdrantRM("my_collection_name", qdrant_client=qdrant_client) - dspy.settings.configure(lm=llm, rm=retriever_model) - ``` - - Below is a code snippet that shows how to use Qdrant in the forward() function of a module - ```python - self.retrieve = QdrantRM(question, k=num_passages, filter=filter) - ``` - """ - - def __init__( - self, - qdrant_collection_name: str, - qdrant_client: QdrantClient, - k: int = 3, - document_field: str = "document", - vectorizer: Optional[BaseSentenceVectorizer] = None, - vector_name: Optional[str] = None, - ): - self._collection_name = qdrant_collection_name - self._client = qdrant_client - - self._vectorizer = vectorizer or FastEmbedVectorizer(self._client.embedding_model_name) - - self._document_field = document_field - - self._vector_name = vector_name or self._get_first_vector_name() - - super().__init__(k=k) - - def forward(self, query_or_queries: Union[str, list[str]], k: Optional[int] = None, filter: Optional[models.Filter]=None) -> dspy.Prediction: - """Search with Qdrant for self.k top passages for query. - - Args: - query_or_queries (Union[str, List[str]]): The query or queries to search for. - k (Optional[int]): The number of top passages to retrieve. Defaults to self.k. - filter (Optional["Filter"]): "Look only for points which satisfies this conditions". Default: None. - - Returns: - dspy.Prediction: An object containing the retrieved passages. - """ - queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries - queries = [q for q in queries if q] # Filter empty queries - - vectors = self._vectorizer(queries) - - search_requests = [ - models.QueryRequest( - query=vector, - using=self._vector_name, - limit=k or self.k, - with_payload=[self._document_field], - filter=filter, - ) - for vector in vectors - ] - batch_results = self._client.query_batch_points(self._collection_name, requests=search_requests) - - passages_scores = defaultdict(float) - for batch in batch_results: - for result in batch.points: - # If a passage is returned multiple times, the score is accumulated. - document = result.payload.get(self._document_field) - passages_scores[document] += result.score - - # Sort passages by their accumulated scores in descending order - sorted_passages = sorted(passages_scores.items(), key=lambda x: x[1], reverse=True)[:k] - - # Wrap each sorted passage in a dotdict with 'long_text' - return [dotdict({"long_text": passage}) for passage, _ in sorted_passages] - - def _get_first_vector_name(self) -> Optional[str]: - vectors = self._client.get_collection(self._collection_name).config.params.vectors - - if not isinstance(vectors, dict): - # The collection only has the default, unnamed vector - return None - - first_vector_name = list(vectors.keys())[0] - - # The collection has multiple vectors. Could also include the falsy unnamed vector - Empty string("") - return first_vector_name or None +# from collections import defaultdict +# from typing import Optional, Union + +# import dspy +# from dspy.dsp.modules.sentence_vectorizer import BaseSentenceVectorizer, FastEmbedVectorizer +# from dspy.dsp.utils import dotdict + +# try: +# from qdrant_client import QdrantClient, models +# except ImportError as e: +# raise ImportError( +# "The 'qdrant' extra is required to use QdrantRM. Install it with `pip install dspy-ai[qdrant]`", +# ) from e + + +# class QdrantRM(dspy.Retrieve): +# """A retrieval module that uses Qdrant to return the top passages for a given query. + +# Args: +# qdrant_collection_name (str): The name of the Qdrant collection. +# qdrant_client (QdrantClient): An instance of `qdrant_client.QdrantClient`. +# k (int, optional): The default number of top passages to retrieve. Default: 3. +# document_field (str, optional): The key in the Qdrant payload with the content. Default: `"document"`. +# vectorizer (BaseSentenceVectorizer, optional): An implementation `BaseSentenceVectorizer`. +# Default: `FastEmbedVectorizer`. +# vector_name (str, optional): Name of the vector in the collection. Default: The first available vector name. + +# Examples: +# Below is a code snippet that shows how to use Qdrant as the default retriver: +# ```python +# from qdrant_client import QdrantClient + +# llm = dspy.OpenAI(model="gpt-3.5-turbo") +# qdrant_client = QdrantClient() +# retriever_model = QdrantRM("my_collection_name", qdrant_client=qdrant_client) +# dspy.settings.configure(lm=llm, rm=retriever_model) +# ``` + +# Below is a code snippet that shows how to use Qdrant in the forward() function of a module +# ```python +# self.retrieve = QdrantRM(question, k=num_passages, filter=filter) +# ``` +# """ + +# def __init__( +# self, +# qdrant_collection_name: str, +# qdrant_client: QdrantClient, +# k: int = 3, +# document_field: str = "document", +# vectorizer: Optional[BaseSentenceVectorizer] = None, +# vector_name: Optional[str] = None, +# ): +# self._collection_name = qdrant_collection_name +# self._client = qdrant_client + +# self._vectorizer = vectorizer or FastEmbedVectorizer(self._client.embedding_model_name) + +# self._document_field = document_field + +# self._vector_name = vector_name or self._get_first_vector_name() + +# super().__init__(k=k) + +# def forward(self, query_or_queries: Union[str, list[str]], k: Optional[int] = None, filter: Optional[models.Filter]=None) -> dspy.Prediction: +# """Search with Qdrant for self.k top passages for query. + +# Args: +# query_or_queries (Union[str, List[str]]): The query or queries to search for. +# k (Optional[int]): The number of top passages to retrieve. Defaults to self.k. +# filter (Optional["Filter"]): "Look only for points which satisfies this conditions". Default: None. + +# Returns: +# dspy.Prediction: An object containing the retrieved passages. +# """ +# queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries +# queries = [q for q in queries if q] # Filter empty queries + +# vectors = self._vectorizer(queries) + +# search_requests = [ +# models.QueryRequest( +# query=vector, +# using=self._vector_name, +# limit=k or self.k, +# with_payload=[self._document_field], +# filter=filter, +# ) +# for vector in vectors +# ] +# batch_results = self._client.query_batch_points(self._collection_name, requests=search_requests) + +# passages_scores = defaultdict(float) +# for batch in batch_results: +# for result in batch.points: +# # If a passage is returned multiple times, the score is accumulated. +# document = result.payload.get(self._document_field) +# passages_scores[document] += result.score + +# # Sort passages by their accumulated scores in descending order +# sorted_passages = sorted(passages_scores.items(), key=lambda x: x[1], reverse=True)[:k] + +# # Wrap each sorted passage in a dotdict with 'long_text' +# return [dotdict({"long_text": passage}) for passage, _ in sorted_passages] + +# def _get_first_vector_name(self) -> Optional[str]: +# vectors = self._client.get_collection(self._collection_name).config.params.vectors + +# if not isinstance(vectors, dict): +# # The collection only has the default, unnamed vector +# return None + +# first_vector_name = list(vectors.keys())[0] + +# # The collection has multiple vectors. Could also include the falsy unnamed vector - Empty string("") +# return first_vector_name or None diff --git a/setup.py b/setup.py index 2e074d33f7..dace8c814e 100644 --- a/setup.py +++ b/setup.py @@ -16,11 +16,11 @@ description="DSPy", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/stanfordnlp/dsp", + url="https://github.com/stanfordnlp/dspy", author="Omar Khattab", author_email="okhattab@stanford.edu", license="MIT License", - packages=find_packages(include=["dsp.*", "dspy.*", "dsp", "dspy"]), + packages=find_packages(include=["dspy.*", "dspy"]), python_requires=">=3.9", install_requires=requirements,