Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify kb and shard creation #2052

Merged
merged 4 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions nucliadb/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from typing import Any, Awaitable, Callable, Optional

import backoff
from nucliadb_protos.knowledgebox_pb2 import SemanticModelMetadata # type: ignore
from nucliadb_protos.nodewriter_pb2 import IndexMessage, IndexMessageSource, TypeMessage

from nucliadb.common import datamanagers
Expand All @@ -43,7 +42,6 @@
nodereader_pb2,
noderesources_pb2,
nodewriter_pb2,
utils_pb2,
writer_pb2,
)
from nucliadb_telemetry import errors
Expand Down Expand Up @@ -183,8 +181,6 @@ async def create_shard_by_kbid(
self,
txn: Transaction,
kbid: str,
semantic_model: SemanticModelMetadata,
release_channel: utils_pb2.ReleaseChannel.ValueType,
) -> writer_pb2.ShardObject:
try:
check_enough_nodes()
Expand All @@ -197,18 +193,12 @@ async def create_shard_by_kbid(

kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
if kb_shards is None:
# First logic shard on the index
kb_shards = writer_pb2.Shards()
kb_shards.kbid = kbid
# B/c with Shards.actual
kb_shards.actual = -1
kb_shards.similarity = semantic_model.similarity_function
kb_shards.model.CopyFrom(semantic_model)
else:
# New logic shard on an existing index
pass
msg = (
"Attempting to create a shard for a KB when it has no stored shards in maindb",
)
logger.error(msg, extra={"kbid": kbid})
raise ShardsNotFound(msg)

kb_shards.release_channel = release_channel
existing_kb_nodes = [
replica.node for shard in kb_shards.shards for replica in shard.replicas
]
Expand Down Expand Up @@ -259,8 +249,8 @@ async def create_shard_by_kbid(

# Append the created shard and make `actual` point to it.
kb_shards.shards.append(shard)
# B/c with Shards.actual
kb_shards.actual += 1
# B/c with Shards.actual - we only use last created shard
kb_shards.actual = len(kb_shards.shards) - 1

await datamanagers.cluster.update_kb_shards(txn, kbid=kbid, shards=kb_shards)

Expand Down Expand Up @@ -371,21 +361,14 @@ async def maybe_create_new_shard(
self,
kbid: str,
num_paragraphs: int,
release_channel: utils_pb2.ReleaseChannel.ValueType = utils_pb2.ReleaseChannel.STABLE,
):
if not self.should_create_new_shard(num_paragraphs):
return

logger.warning({"message": "Adding shard", "kbid": kbid})

async with datamanagers.with_transaction() as txn:
model = await datamanagers.kb.get_model_metadata(txn, kbid=kbid)
await self.create_shard_by_kbid(
txn,
kbid,
semantic_model=model,
release_channel=release_channel,
)
await self.create_shard_by_kbid(txn, kbid)
await txn.commit()


Expand Down Expand Up @@ -417,7 +400,6 @@ async def _resource_change_event(
await self.maybe_create_new_shard(
kbid,
shard_info.paragraphs,
shard_info.metadata.release_channel,
)
await index_node.writer.GC(noderesources_pb2.ShardId(id=shard_id)) # type: ignore

Expand Down
8 changes: 1 addition & 7 deletions nucliadb/nucliadb/common/cluster/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,8 @@ async def maybe_add_shard(kbid: str) -> None:
):
# create new shard
async with datamanagers.with_transaction() as txn:
model = await datamanagers.kb.get_model_metadata(txn, kbid=kbid)
sm = get_shard_manager()
await sm.create_shard_by_kbid(
txn,
kbid,
semantic_model=model,
release_channel=kb_shards.release_channel,
)
await sm.create_shard_by_kbid(txn, kbid)
await txn.commit()


Expand Down
14 changes: 5 additions & 9 deletions nucliadb/nucliadb/ingest/consumer/shard_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import uuid
from functools import partial

from nucliadb.common import datamanagers, locking
from nucliadb.common import locking
from nucliadb.common.cluster.manager import choose_node
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.maindb.driver import Driver
Expand Down Expand Up @@ -91,12 +91,12 @@ async def handle_message(self, raw_data) -> None:
async def process_kb(self, kbid: str) -> None:
logger.info({"message": "Processing notification for kbid", "kbid": kbid})
async with self.driver.transaction(read_only=True) as txn:
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
current_shard = await self.shard_manager.get_current_active_shard(txn, kbid)

if kb_shards is None or current_shard is None:
if current_shard is None:
logger.error(
"Processing a notification for a nonexistent", extra={"kbid": kbid}
"Processing a notification for KB with no current shard",
extra={"kbid": kbid},
)
return

Expand All @@ -109,8 +109,4 @@ async def process_kb(self, kbid: str) -> None:
shard: nodereader_pb2.Shard = await node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore
)
await self.shard_manager.maybe_create_new_shard(
kbid,
shard.paragraphs,
kb_shards.release_channel,
)
await self.shard_manager.maybe_create_new_shard(kbid, shard.paragraphs)
20 changes: 14 additions & 6 deletions nucliadb/nucliadb/ingest/orm/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,22 @@ async def create(
failed = True

if failed is False:
kb_shards = writer_pb2.Shards()
kb_shards.kbid = uuid
# B/c with Shards.actual
kb_shards.actual = -1
# B/c with `Shards.similarity`, replaced by `model`
kb_shards.similarity = semantic_model.similarity_function
kb_shards.model.CopyFrom(semantic_model)
kb_shards.release_channel = release_channel

await datamanagers.cluster.update_kb_shards(
txn, kbid=uuid, shards=kb_shards
)

shard_manager = get_shard_manager()
try:
await shard_manager.create_shard_by_kbid(
txn,
uuid,
semantic_model=semantic_model,
release_channel=release_channel,
)
await shard_manager.create_shard_by_kbid(txn, uuid)
except Exception as e:
await storage.delete_kb(uuid)
raise e
Expand Down
14 changes: 1 addition & 13 deletions nucliadb/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,19 +398,7 @@ async def index_resource(
shard = await self.shard_manager.get_current_active_shard(txn, kbid)
if shard is None:
# no shard available, create a new one
model = await datamanagers.kb.get_model_metadata(txn, kbid=kbid)
config = await kb.get_config()
if config is not None:
release_channel = config.release_channel
else:
release_channel = utils_pb2.ReleaseChannel.STABLE

shard = await self.shard_manager.create_shard_by_kbid(
txn,
kbid,
semantic_model=model,
release_channel=release_channel,
)
shard = await self.shard_manager.create_shard_by_kbid(txn, kbid)
await datamanagers.resources.set_resource_shard_id(
txn, kbid=kbid, rid=uuid, shard=shard.shard
)
Expand Down
7 changes: 1 addition & 6 deletions nucliadb/nucliadb/ingest/service/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ async def _create_kb_onprem(self, request: KnowledgeBoxNew) -> str:
"""
kbid = request.forceuuid or str(uuid.uuid4())
release_channel = get_release_channel(request)
request.config.release_channel = release_channel
lconfig = await learning_proxy.get_configuration(kbid)
lconfig_created = False
if lconfig is None:
Expand Down Expand Up @@ -235,7 +234,6 @@ async def _create_kb_hosted(self, request: KnowledgeBoxNew) -> str:
"""
kbid = request.forceuuid or str(uuid.uuid4())
release_channel = get_release_channel(request)
request.config.release_channel = release_channel
await self.proc.create_kb(
request.slug,
request.config,
Expand Down Expand Up @@ -777,11 +775,8 @@ async def ReIndex(self, request: IndexResource, context=None) -> IndexStatus: #
)
if shard is None:
# no shard currently exists, create one
model = await datamanagers.kb.get_model_metadata(
txn, kbid=request.kbid
)
shard = await self.shards_manager.create_shard_by_kbid(
txn, request.kbid, semantic_model=model
txn, request.kbid
)

await datamanagers.resources.set_resource_shard_id(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,4 @@ async def test_create_knowledgebox_release_channel(
async with driver.transaction() as txn:
shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
assert shards is not None
config = await get_kb_config(txn, kbid)
assert shards.release_channel == config.release_channel == release_channel
assert shards.release_channel == release_channel
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def shard_manager(reader):
), patch(
"nucliadb.ingest.consumer.shard_creator.choose_node",
return_value=(node, "shard_id"),
), patch(
"nucliadb.ingest.consumer.shard_creator.datamanagers.cluster.get_kb_shards",
AsyncMock(return_value=shards),
), patch(
"nucliadb.ingest.consumer.shard_creator.locking.distributed_lock",
return_value=AsyncMock(),
Expand Down Expand Up @@ -103,7 +100,7 @@ async def test_handle_message_create_new_shard(
await shard_creator_handler.handle_message(notif.SerializeToString())
await asyncio.sleep(0.06)
shard_manager.maybe_create_new_shard.assert_called_with(
"kbid", settings.max_shard_paragraphs + 1, 0
"kbid", settings.max_shard_paragraphs + 1
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,15 @@ def node_new_shard():
yield mocked


@pytest.mark.parametrize("release_channel", ("EXPERIMENTAL", "STABLE"))
async def test_create_shard_by_kbid_attempts_on_all_nodes(
shards, maindb_driver, fake_kbid, node_new_shard, release_channel
shards, maindb_driver, fake_kbid, node_new_shard
):
shard_manager = manager.KBShardManager()
async with maindb_driver.transaction() as txn:
with pytest.raises(ExhaustedNodesError):
await shard_manager.create_shard_by_kbid(
txn,
fake_kbid,
semantic_model=mock.MagicMock(),
release_channel=release_channel,
)

assert node_new_shard.await_count == len(manager.get_index_nodes())
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from nucliadb.common.cluster import manager
from nucliadb.common.cluster.settings import settings
from nucliadb.common.maindb.driver import Transaction
from nucliadb_protos import knowledgebox_pb2, utils_pb2, writer_pb2
from nucliadb_protos import writer_pb2


def test_should_create_new_shard():
Expand Down Expand Up @@ -89,15 +89,20 @@ async def test_shard_creation(fake_index_nodes: list[str], txn: Transaction):
"""
index_nodes = set(fake_index_nodes)
kbid = f"kbid:{test_shard_creation.__name__}"
semantic_model = knowledgebox_pb2.SemanticModelMetadata()
release_channel = utils_pb2.ReleaseChannel.STABLE
sm = manager.KBShardManager()

# Fake KB shards instead of creating a KB to generate it
shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
assert shards is None
await datamanagers.cluster.update_kb_shards(
txn,
kbid=kbid,
shards=writer_pb2.Shards(
kbid=kbid,
),
)

# create first shard
await sm.create_shard_by_kbid(txn, kbid, semantic_model, release_channel)
await sm.create_shard_by_kbid(txn, kbid)

shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
assert shards is not None
Expand All @@ -108,7 +113,7 @@ async def test_shard_creation(fake_index_nodes: list[str], txn: Transaction):
assert set((replica.node for replica in shards.shards[0].replicas)) == index_nodes

# adding a second shard will mark the first as read only
await sm.create_shard_by_kbid(txn, kbid, semantic_model, release_channel)
await sm.create_shard_by_kbid(txn, kbid)

shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
assert shards is not None
Expand All @@ -120,7 +125,7 @@ async def test_shard_creation(fake_index_nodes: list[str], txn: Transaction):
assert set((replica.node for replica in shards.shards[1].replicas)) == index_nodes

# adding a third one will be equivalent
await sm.create_shard_by_kbid(txn, kbid, semantic_model, release_channel)
await sm.create_shard_by_kbid(txn, kbid)

shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
assert shards is not None
Expand Down
3 changes: 2 additions & 1 deletion nucliadb_protos/knowledgebox.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ message KnowledgeBoxConfig {
string slug = 5;
bool disable_vectors = 6 [deprecated = true];
int64 migration_version = 7;
utils.ReleaseChannel release_channel = 8;
// DEPRECATED: duplicated field also stored in `writer.proto Shards`
utils.ReleaseChannel release_channel = 8 [deprecated = true];
}

// NEW
Expand Down
4 changes: 2 additions & 2 deletions nucliadb_protos/python/nucliadb_protos/audit_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class AuditField(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _FieldActionEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[AuditField._FieldAction.ValueType], builtins.type):
class _FieldActionEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[AuditField._FieldAction.ValueType], builtins.type): # noqa: F821
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
ADDED: AuditField._FieldAction.ValueType # 0
MODIFIED: AuditField._FieldAction.ValueType # 1
Expand Down Expand Up @@ -167,7 +167,7 @@ class AuditRequest(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _AuditTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[AuditRequest._AuditType.ValueType], builtins.type):
class _AuditTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[AuditRequest._AuditType.ValueType], builtins.type): # noqa: F821
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
VISITED: AuditRequest._AuditType.ValueType # 0
MODIFIED: AuditRequest._AuditType.ValueType # 1
Expand Down
Loading
Loading