In [5]:
import logging
from typing import Any, Dict, List, Optional

from haystack import Document
from haystack.document_stores.errors import DuplicateDocumentError, MissingDocumentError
from haystack.document_stores.types import DuplicatePolicy

import clickhouse_driver

In [4]:
import logging
from typing import Any, Dict, List, Optional

from haystack.dataclasses import Document
from haystack.document_stores.types.policy import DuplicatePolicy
from clickhouse_driver import Client

logger = logging.getLogger(__name__)


class ClickHouseDocumentStore:
    def __init__(self, host: str = "localhost", port: int = 9000, database: str = "haystack", table_name: str = "documents"):
        self.client = Client(host, port=port)
        self.database = database
        self.table_name = table_name
        self._create_table()

    def _create_table(self):
        self.client.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}")
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.database}.{self.table_name} (
            id String,
            content String,
            meta Nested(key String, value String)
        ) ENGINE = MergeTree()
        ORDER BY id
        """
        self.client.execute(create_table_query)

    def to_dict(self) -> Dict[str, Any]:
        return {'host': self.client.host, 'port': self.client.port, 'database': self.database, 'table_name': self.table_name}

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ClickHouseDocumentStore":
        return cls(host=data.get('host', 'localhost'), port=data.get('port', 9000), database=data.get('database', 'haystack'), table_name=data.get('table_name', 'documents'))

    def count_documents(self) -> int:
        query = f"SELECT COUNT(*) FROM {self.database}.{self.table_name}"
        result = self.client.execute(query)
        return result[0][0]

    def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
        # Фильтрация не реализована в этом примере
        # Нужно добавить соответствующую логику согласно вашим требованиям
        pass

    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
        written_count = 0
        for doc in documents:
            if policy == DuplicatePolicy.FAIL:
                existing_doc = self._get_document_by_id(doc.id)
                if existing_doc:
                    raise DuplicateError(f"Document with id {doc.id} already exists")

            insert_query = f"INSERT INTO {self.database}.{self.table_name} (id, content, meta) VALUES"
            values = (doc.id, doc.content, [(k, str(v)) for k, v in doc.meta.items()])
            self.client.execute(insert_query, [values])
            written_count += 1

        return written_count

    def delete_documents(self, document_ids: List[str]) -> None:
        for doc_id in document_ids:
            delete_query = f"ALTER TABLE {self.database}.{self.table_name} DELETE WHERE id = '{doc_id}'"
            self.client.execute(delete_query)

    def _get_document_by_id(self, doc_id: str) -> Optional[Document]:
        query = f"SELECT id, content, meta FROM {self.database}.{self.table_name} WHERE id = '{doc_id}'"
        result = self.client.execute(query)
        if result:
            r = result[0]
            return Document(id=r[0], content=r[1], meta=dict(r[2]))
        return None

# Далее идёт использование класса ClickHouseDocumentStore в коде.


In [4]:

logger = logging.getLogger(__name__)


class ClickHouseDocumentStore(BaseDocumentStore):
    def __init__(self, host: str = "localhost", port: int = 9000, database: str = "haystack", table_name: str = "documents"):
        self.client = clickhouse_driver.Client(host, port=port)
        self.database = database
        self.table_name = table_name
        self._create_table()

    def _create_table(self):
        self.client.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}")
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.database}.{self.table_name} (
            id String,
            content String,
            meta Nested(key String, value String)
        ) ENGINE = MergeTree()
        ORDER BY id
        """
        self.client.execute(create_table_query)

    def count_documents(self) -> int:
        count_query = f"SELECT COUNT(*) FROM {self.database}.{self.table_name}"
        result = self.client.execute(count_query)
        return result[0][0]

    def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
        if not filters:
            return self.get_all_documents()

        # Примерная реализация фильтрации
        # Это должно быть адаптировано к вашим требованиям и структуре данных
        filter_clauses = []
        for key, value in filters.items():
            filter_clauses.append(f"{key} = '{value}'")
        filter_query = " AND ".join(filter_clauses)

        query = f"SELECT id, content, meta FROM {self.database}.{self.table_name} WHERE {filter_query}"
        results = self.client.execute(query)
        return [Document(id=r[0], content=r[1], meta=dict(r[2])) for r in results]

    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> int:
        for doc in documents:
            if policy == DuplicatePolicy.FAIL:
                existing_doc = self.get_document_by_id(doc.id)
                if existing_doc:
                    raise DuplicateDocumentError(f"Duplicate document with id {doc.id}")

            values = (doc.id, doc.content, [(k, str(v)) for k, v in doc.meta.items()])
            insert_query = f"INSERT INTO {self.database}.{self.table_name} (id, content, meta) VALUES"
            self.client.execute(insert_query, [values])

        return len(documents)

    def delete_documents(self, document_ids: List[str]) -> None:
        for doc_id in document_ids:
            delete_query = f"ALTER TABLE {self.database}.{self.table_name} DELETE WHERE id = '{doc_id}'"
            self.client.execute(delete_query)

    def to_dict(self) -> Dict[str, Any]:
        return {
            'host': self.client.host,
            'port': self.client.port,
            'database': self.database,
            'table_name': self.table_name
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ClickHouseDocumentStore":
        return cls(host=data.get('host', 'localhost'), port=data.get('port', 9000), database=data.get('database', 'haystack'), table_name=data.get('table_name', 'documents'))

    def get_all_documents(self) -> List[Document]:
        select_query = f"SELECT id, content, meta FROM {self.database}.{self.table_name}"
        results = self.client.execute(select_query)
        return [Document(id=r[0], content=r[1], meta=dict(r[2])) for r in results]

    def get_document_by_id(self, doc_id: str) -> Optional[Document]:
        query = f"SELECT id, content, meta FROM {self.database}.{self.table_name} WHERE id = '{doc_id}'"
        result = self.client.execute(query)
        if result:
            r = result[0]
            return Document(id=r[0], content=r[1], meta=dict(r[2]))
        return None

# Далее идёт использование вашего класса ClickHouseDocumentStore в коде.


ModuleNotFoundError: No module named 'haystack.document_stores.base'