Skip to content

Commit

Permalink
Merge pull request #1198 from tahoe-lafs/3893-mutable-http-protocol-p…
Browse files Browse the repository at this point in the history
…art-3

Mutable http protocol, part 3

Fixes ticket:3893
  • Loading branch information
itamarst committed May 20, 2022
2 parents 8da7f95 + 12927d5 commit f0635d5
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 84 deletions.
Empty file added newsfragments/3893.minor
Empty file.
101 changes: 62 additions & 39 deletions src/allmydata/storage/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,31 @@ def get_version(self):
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
returnValue(decoded_response)

@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
) -> Deferred[None]:
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)

if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)


@define
class UploadProgress(object):
Expand Down Expand Up @@ -406,6 +431,30 @@ def read_share_chunk(
raise ClientException(response.code)


@async_to_deferred
async def advise_corrupt_share(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
reason: str,
):
assert isinstance(reason, str)
url = client.relative_url(
"/v1/{}/{}/{}/corrupt".format(
share_type, _encode_si(storage_index), share_number
)
)
message = {"reason": reason}
response = await client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)


@define
class StorageClientImmutables(object):
"""
Expand Down Expand Up @@ -554,53 +603,16 @@ def list_shares(self, storage_index): # type: (bytes,) -> Deferred[set[int]]
else:
raise ClientException(response.code)

@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
):
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)

if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)

@inlineCallbacks
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
assert isinstance(reason, str)
url = self._client.relative_url(
"/v1/immutable/{}/{}/corrupt".format(
_encode_si(storage_index), share_number
)
return advise_corrupt_share(
self._client, "immutable", storage_index, share_number, reason
)
message = {"reason": reason}
response = yield self._client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)


@frozen
Expand Down Expand Up @@ -738,3 +750,14 @@ async def list_shares(self, storage_index: bytes) -> set[int]:
return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"])
else:
raise ClientException(response.code)

def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "mutable", storage_index, share_number, reason
)
26 changes: 24 additions & 2 deletions src/allmydata/storage/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ def read_share_chunk(self, request, authorization, storage_index, share_number):
methods=["PUT"],
)
def add_or_renew_lease(self, request, authorization, storage_index):
"""Update the lease for an immutable share."""
if not self._storage_server.get_buckets(storage_index):
"""Update the lease for an immutable or mutable share."""
if not list(self._storage_server.get_shares(storage_index)):
raise _HTTPError(http.NOT_FOUND)

# Checking of the renewal secret is done by the backend.
Expand Down Expand Up @@ -663,6 +663,28 @@ def enumerate_mutable_shares(self, request, authorization, storage_index):
shares = self._storage_server.enumerate_mutable_shares(storage_index)
return self._send_encoded(request, shares)

@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
# TODO unit test all the paths
if share_number not in {
shnum for (shnum, _) in self._storage_server.get_shares(storage_index)
}:
raise _HTTPError(http.NOT_FOUND)

info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
)
return b""


@implementer(IStreamServerEndpoint)
@attr.s
Expand Down
25 changes: 15 additions & 10 deletions src/allmydata/storage/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from __future__ import annotations
from future.utils import bytes_to_native_str
from typing import Dict, Tuple
from typing import Dict, Tuple, Iterable

import os, re

Expand Down Expand Up @@ -321,7 +321,7 @@ def allocate_buckets(self, storage_index,
# they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file.
for (shnum, fn) in self._get_bucket_shares(storage_index):
for (shnum, fn) in self.get_shares(storage_index):
alreadygot[shnum] = ShareFile(fn)
if renew_leases:
self._add_or_renew_leases(alreadygot.values(), lease_info)
Expand Down Expand Up @@ -363,7 +363,7 @@ def allocate_buckets(self, storage_index,
return set(alreadygot), bucketwriters

def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index):
for shnum, filename in self.get_shares(storage_index):
with open(filename, 'rb') as f:
header = f.read(32)
if MutableShareFile.is_valid_header(header):
Expand Down Expand Up @@ -416,10 +416,12 @@ def register_bucket_writer_close_handler(self, handler):
"""
self._call_on_bucket_writer_close.append(handler)

def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold
def get_shares(self, storage_index) -> Iterable[tuple[int, str]]:
"""
Return an iterable of (shnum, pathname) tuples for files that hold
shares for this storage_index. In each tuple, 'shnum' will always be
the integer form of the last component of 'pathname'."""
the integer form of the last component of 'pathname'.
"""
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
try:
for f in os.listdir(storagedir):
Expand All @@ -431,12 +433,15 @@ def _get_bucket_shares(self, storage_index):
pass

def get_buckets(self, storage_index):
"""
Get ``BucketReaders`` for an immutable.
"""
start = self._clock.seconds()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s)
bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index):
for shnum, filename in self.get_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", self._clock.seconds() - start)
Expand All @@ -453,7 +458,7 @@ def get_leases(self, storage_index):
# since all shares get the same lease data, we just grab the leases
# from the first share
try:
shnum, filename = next(self._get_bucket_shares(storage_index))
shnum, filename = next(self.get_shares(storage_index))
sf = ShareFile(filename)
return sf.get_leases()
except StopIteration:
Expand All @@ -467,7 +472,7 @@ def get_slot_leases(self, storage_index):
:return: An iterable of the leases attached to this slot.
"""
for _, share_filename in self._get_bucket_shares(storage_index):
for _, share_filename in self.get_shares(storage_index):
share = MutableShareFile(share_filename)
return share.get_leases()
return []
Expand Down Expand Up @@ -742,7 +747,7 @@ def _share_exists(self, storage_index, shnum):
:return bool: ``True`` if a share with the given number exists at the
given storage index, ``False`` otherwise.
"""
for existing_sharenum, ignored in self._get_bucket_shares(storage_index):
for existing_sharenum, ignored in self.get_shares(storage_index):
if existing_sharenum == shnum:
return True
return False
Expand Down
40 changes: 21 additions & 19 deletions src/allmydata/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

# roadmap:
#
Expand All @@ -34,14 +30,10 @@
#
# 6: implement other sorts of IStorageClient classes: S3, etc

from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six import ensure_text

from typing import Union
import re, time, hashlib
from os import urandom
# On Python 2 this will be the backport.
from configparser import NoSectionError

import attr
Expand Down Expand Up @@ -76,6 +68,7 @@
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.util.deferredutil import async_to_deferred
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables,
Expand Down Expand Up @@ -1166,16 +1159,23 @@ def get_buckets(
for share_num in share_numbers
})

def add_lease(
@async_to_deferred
async def add_lease(
self,
storage_index,
renew_secret,
cancel_secret
):
immutable_client = StorageClientImmutables(self._http_client)
return immutable_client.add_or_renew_lease(
storage_index, renew_secret, cancel_secret
)
client = StorageClientGeneral(self._http_client)
try:
await client.add_or_renew_lease(
storage_index, renew_secret, cancel_secret
)
except ClientException as e:
if e.code == http.NOT_FOUND:
# Silently do nothing, as is the case for the Foolscap client
return
raise

def advise_corrupt_share(
self,
Expand All @@ -1185,12 +1185,14 @@ def advise_corrupt_share(
reason: bytes
):
if share_type == b"immutable":
imm_client = StorageClientImmutables(self._http_client)
return imm_client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
client : Union[StorageClientImmutables, StorageClientMutables] = StorageClientImmutables(self._http_client)
elif share_type == b"mutable":
client = StorageClientMutables(self._http_client)
else:
raise NotImplementedError() # future tickets
raise ValueError("Unknown share type")
return client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)

@defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv):
Expand Down
25 changes: 17 additions & 8 deletions src/allmydata/test/test_istorageserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,21 @@ def test_allocate_buckets_creates_lease(self):
lease.get_expiration_time() - self.fake_time() > (31 * 24 * 60 * 60 - 10)
)

@inlineCallbacks
def test_add_lease_non_existent(self):
"""
If the storage index doesn't exist, adding the lease silently does nothing.
"""
storage_index = new_storage_index()
self.assertEqual(list(self.server.get_leases(storage_index)), [])

renew_secret = new_secret()
cancel_secret = new_secret()

# Add a lease:
yield self.storage_client.add_lease(storage_index, renew_secret, cancel_secret)
self.assertEqual(list(self.server.get_leases(storage_index)), [])

@inlineCallbacks
def test_add_lease_renewal(self):
"""
Expand Down Expand Up @@ -857,7 +872,8 @@ def test_slot_readv_no_shares(self):
@inlineCallbacks
def test_slot_readv_unknown_storage_index(self):
"""
With unknown storage index, ``IStorageServer.slot_readv()`` TODO.
With unknown storage index, ``IStorageServer.slot_readv()`` returns
empty dict.
"""
storage_index = new_storage_index()
reads = yield self.storage_client.slot_readv(
Expand Down Expand Up @@ -1163,10 +1179,3 @@ class HTTPMutableAPIsTests(
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for mutable ``IStorageServer`` APIs."""

# TODO will be implemented in later tickets
SKIP_TESTS = {
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
}
2 changes: 1 addition & 1 deletion src/allmydata/test/test_repairer.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ def _then(ign):
ss = self.g.servers_by_number[0]
# we want to delete the share corresponding to the server
# we're making not-respond
share = next(ss._get_bucket_shares(self.c0_filenode.get_storage_index()))[0]
share = next(ss.get_shares(self.c0_filenode.get_storage_index()))[0]
self.delete_shares_numbered(self.uri, [share])
return self.c0_filenode.check_and_repair(Monitor())
d.addCallback(_then)
Expand Down
Loading

0 comments on commit f0635d5

Please sign in to comment.