Skip to content

Commit

Permalink
馃悰 Destination Meilisearch: fix incomplete data indexing (airbytehq#27692
Browse files Browse the repository at this point in the history
)

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
3 people authored and jatinyadav-cc committed Feb 26, 2024
1 parent 383d347 commit b60ba1f
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY destination_meilisearch ./destination_meilisearch
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
#


from logging import Logger
from typing import Any, Iterable, Mapping
from logging import Logger, getLogger
from typing import Any, Dict, Iterable, Mapping

from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from destination_meilisearch.writer import MeiliWriter
from meilisearch import Client

logger = getLogger("airbyte")


def get_client(config: Mapping[str, Any]) -> Client:
host = config.get("host")
Expand All @@ -21,36 +23,51 @@ def get_client(config: Mapping[str, Any]) -> Client:
class DestinationMeilisearch(Destination):
primary_key = "_ab_pk"

def _flush_streams(self, streams: Dict[str, MeiliWriter]) -> Iterable[AirbyteMessage]:
for stream in streams:
streams[stream].flush()

def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
client = get_client(config=config)
# Creating Meilisearch writers
writers = {s.stream.name: MeiliWriter(client, s.stream.name, self.primary_key) for s in configured_catalog.streams}

for configured_stream in configured_catalog.streams:
steam_name = configured_stream.stream.name
stream_name = configured_stream.stream.name
# Deleting index in Meilisearch if sync mode is overwite
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
client.delete_index(steam_name)
client.create_index(steam_name, {"primaryKey": self.primary_key})

writer = MeiliWriter(client, steam_name, self.primary_key)
for message in input_messages:
if message.type == Type.STATE:
writer.flush()
yield message
elif message.type == Type.RECORD:
writer.queue_write_operation(message.record.data)
else:
logger.debug(f"Deleting index: {stream_name}.")
client.delete_index(stream_name)
# Creating index in Meilisearch
client.create_index(stream_name, {"primaryKey": self.primary_key})
logger.debug(f"Creating index: {stream_name}.")

for message in input_messages:
if message.type == Type.STATE:
yield message
elif message.type == Type.RECORD:
data = message.record.data
stream = message.record.stream
# Skip unselected streams
if stream not in writers:
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
continue
writer.flush()
writers[stream].queue_write_operation(data)
else:
logger.info(f"Unhandled message type {message.type}: {message}")

# Flush any leftover messages
self._flush_streams(writers)

def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
client = get_client(config=config)

create_index_job = client.create_index("_airbyte", {"primaryKey": "id"})
client.wait_for_task(create_index_job["taskUid"])
client.create_index("_airbyte", {"primaryKey": "id"})

add_documents_job = client.index("_airbyte").add_documents(
client.index("_airbyte").add_documents(
[
{
"id": 287947,
Expand All @@ -59,9 +76,7 @@ def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionS
}
]
)
client.wait_for_task(add_documents_job.task_uid)

client.index("_airbyte").search("Shazam")
client.delete_index("_airbyte")
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,28 @@


class MeiliWriter:
write_buffer = []
flush_interval = 50000

def __init__(self, client: Client, steam_name: str, primary_key: str):
def __init__(self, client: Client, stream_name: str, primary_key: str):
self.client = client
self.steam_name = steam_name
self.primary_key = primary_key
self.stream_name: str = stream_name
self._write_buffer = []

logger.info(f"Creating MeiliWriter for {self.stream_name}")

def queue_write_operation(self, data: Mapping):
random_key = str(uuid4())
self.write_buffer.append({**data, self.primary_key: random_key})
if len(self.write_buffer) == self.flush_interval:
self._write_buffer.append({**data, self.primary_key: random_key})
if len(self._write_buffer) == self.flush_interval:
logger.debug(f"Reached limit size: flushing records for {self.stream_name}")
self.flush()

def flush(self):
buffer_size = len(self.write_buffer)
buffer_size = len(self._write_buffer)
if buffer_size == 0:
return
logger.info(f"flushing {buffer_size} records")
response = self.client.index(self.steam_name).add_documents(self.write_buffer)
logger.info(f"Flushing {buffer_size} records")
response = self.client.index(self.stream_name).add_documents(self._write_buffer)
self.client.wait_for_task(response.task_uid, 1800000, 1000)
self.write_buffer.clear()
self._write_buffer.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import json
import logging
import time
from typing import Any, Dict, Mapping

import pytest
Expand Down Expand Up @@ -56,12 +55,7 @@ def teardown(config: Mapping):
def client_fixture(config) -> Client:
client = get_client(config=config)
resp = client.create_index("_airbyte", {"primaryKey": "_ab_pk"})
while True:
time.sleep(0.2)
task = client.get_task(resp["taskUid"])
status = task["status"]
if status == "succeeded" or status == "failed":
break
client.wait_for_task(_handle_breaking_wait_for_task(resp))
return client


Expand All @@ -87,6 +81,13 @@ def _record(stream: str, str_value: str, int_value: int) -> AirbyteMessage:
)


def _handle_breaking_wait_for_task(task: Any) -> int:
if type(task) is dict:
return task["taskUid"]
else:
return task.task_uid


def records_count(client: Client) -> int:
documents_results = client.index("_airbyte").get_documents()
return documents_results.total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: destination
definitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
dockerRepository: airbyte/destination-meilisearch
githubIssueLabel: destination-meilisearch
icon: meilisearch.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@

@patch("meilisearch.Client")
def test_queue_write_operation(client):
writer = MeiliWriter(client, "steam_name", "primary_key")
writer = MeiliWriter(client, "stream_name", "primary_key")
writer.queue_write_operation({"a": "a"})
assert len(writer.write_buffer) == 1
assert len(writer._write_buffer) == 1
writer.queue_write_operation({"b": "b"})
assert len(writer._write_buffer) == 2
writer2 = MeiliWriter(client, "stream_name2", "primary_key")
writer2.queue_write_operation({"a": "a"})
assert len(writer2._write_buffer) == 1
assert len(writer._write_buffer) == 2


@patch("meilisearch.Client")
def test_flush(client):
writer = MeiliWriter(client, "steam_name", "primary_key")
writer = MeiliWriter(client, "stream_name", "primary_key")
writer.queue_write_operation({"a": "a"})
writer.flush()
client.index.assert_called_once_with("steam_name")
client.index.assert_called_once_with("stream_name")
client.wait_for_task.assert_called_once()
2 changes: 2 additions & 0 deletions docs/integrations/destinations/meilisearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ The setup only requires two fields. First is the `host` which is the address at

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :----------------------------------------------------- |
| 1.0.1 | 2023-12-19 | [27692](https://github.com/airbytehq/airbyte/pull/27692) | Fix incomplete data indexing |
| 1.0.0 | 2022-10-26 | [18036](https://github.com/airbytehq/airbyte/pull/18036) | Migrate MeiliSearch to Python CDK |
| 0.2.13 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors |
| 0.2.12 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.2.11 | 2021-12-28 | [9156](https://github.com/airbytehq/airbyte/pull/9156) | Update connector fields title/description |

0 comments on commit b60ba1f

Please sign in to comment.