Skip to content

Commit

Permalink
rawx: Send "chunk.new" event when linking chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Oct 1, 2021
1 parent bdb83ff commit 670e350
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
3 changes: 2 additions & 1 deletion oio/blob/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def chunk_put(self, url, meta, data, **kwargs):
else 'chunk_hash']
writer = ReplicatedMetachunkWriter(
meta, [chunk], FakeChecksum(checksum),
storage_method, quorum=1, perfdata=self.perfdata)
storage_method, quorum=1, perfdata=self.perfdata,
logger=self.logger, **kwargs)
bytes_transferred, chunk_hash, _ = writer.stream(data, None)
return bytes_transferred, chunk_hash

Expand Down
2 changes: 2 additions & 0 deletions rawx/handler_chunk.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// OpenIO SDS Go rawx
// Copyright (C) 2015-2020 OpenIO SAS
// Copyright (C) 2021 OVH SAS
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Affero General Public
Expand Down Expand Up @@ -270,6 +271,7 @@ func (rr *rawxRequest) copyChunk() {
// The link already exists and has an xattr. Commit is a matter of sync.
_ = op.commit()
rr.replyCode(http.StatusCreated)
rr.rawx.notifier.notifyNew(rr.reqid, rr.chunk)
}
}
}
Expand Down
28 changes: 20 additions & 8 deletions tests/functional/blob/test_indexer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -16,14 +17,14 @@
import string
import os
from hashlib import md5
from time import sleep

from oio.rdir.client import RdirClient
from oio.blob.client import BlobClient
from oio.blob.indexer import BlobIndexer
from oio.common.constants import OIO_VERSION
from oio.common.fullpath import encode_fullpath
from oio.common.utils import cid_from_name, paths_gen
from oio.common.utils import cid_from_name, paths_gen, request_id
from oio.event.evob import EventTypes

from tests.utils import BaseTestCase, random_str, random_id
from tests.functional.blob import random_chunk_id, random_buffer, \
Expand Down Expand Up @@ -53,6 +54,7 @@ def setUp(self):
for chunk_file in chunk_files:
os.remove(chunk_file)
self.rdir_client.admin_clear(self.rawx_id, clear_all=True)
self.beanstalkd0.drain_tube('oio-preserved')

def _put_chunk(self):
account = random_str(16)
Expand Down Expand Up @@ -80,16 +82,22 @@ def _put_chunk(self):
'metachunk_hash': md5().hexdigest(),
'metachunk_size': 1024
}
reqid = request_id()
self.blob_client.chunk_put('http://' + self.rawx_id + '/' + chunk_id,
meta, data)
sleep(1) # ensure chunk event have been processed
meta, data, reqid=reqid)
# ensure chunk event have been processed
self.wait_for_event('oio-preserved', reqid=reqid,
types=(EventTypes.CHUNK_NEW, ))
return account, container, cid, content_path, content_version, \
content_id, chunk_id

def _delete_chunk(self, chunk_id):
reqid = request_id()
self.blob_client.chunk_delete(
'http://' + self.rawx_id + '/' + chunk_id)
sleep(1) # ensure chunk event have been processed
'http://' + self.rawx_id + '/' + chunk_id, reqid=reqid)
# ensure chunk event have been processed
self.wait_for_event('oio-preserved', reqid=reqid,
types=(EventTypes.CHUNK_DELETED, ))

def _link_chunk(self, target_chunk_id):
account = random_str(16)
Expand All @@ -100,10 +108,14 @@ def _link_chunk(self, target_chunk_id):
content_id = random_id(32)
fullpath = encode_fullpath(
account, container, content_path, content_version, content_id)
reqid = request_id()
_, link = self.blob_client.chunk_link(
'http://' + self.rawx_id + '/' + target_chunk_id, None, fullpath)
'http://' + self.rawx_id + '/' + target_chunk_id, None, fullpath,
reqid=reqid)
chunk_id = link.split('/')[-1]
sleep(1) # ensure chunk event have been processed
# ensure chunk event have been processed
self.wait_for_event('oio-preserved', reqid=reqid,
types=(EventTypes.CHUNK_NEW, ))
return account, container, cid, content_path, content_version, \
content_id, chunk_id

Expand Down

0 comments on commit 670e350

Please sign in to comment.