Skip to content

Commit

Permalink
Merge da4deab into be2c6b0
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarst committed May 16, 2022
2 parents be2c6b0 + da4deab commit 325e141
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 55 deletions.
6 changes: 4 additions & 2 deletions docs/proposed/http-storage-node-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ Because of the simple types used throughout
and the equivalence described in `RFC 7049`_
these examples should be representative regardless of which of these two encodings is chosen.

The one exception is sets.
For CBOR messages, any sequence that is semantically a set (i.e. no repeated values allowed, order doesn't matter, and elements are hashable in Python) should be sent as a set.
Tag 6.258 is used to indicate sets in CBOR; see `the CBOR registry <https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml>`_ for more details.
Sets will be represented as JSON lists in examples because JSON doesn't support sets.

HTTP Design
~~~~~~~~~~~
Expand Down Expand Up @@ -738,8 +740,8 @@ Reading
``GET /v1/mutable/:storage_index/shares``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

Retrieve a list indicating all shares available for the indicated storage index.
For example::
Retrieve a set indicating all shares available for the indicated storage index.
For example (this is shown as list, since it will be list for JSON, but will be set for CBOR)::

[1, 5]

Expand Down
10 changes: 7 additions & 3 deletions integration/test_tor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@

@pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
util.await_client_ready(carol, minimum_number_of_servers=2)
util.await_client_ready(dave, minimum_number_of_servers=2)

# ensure both nodes are connected to "a grid" by uploading
# something via carol, and retrieve it using dave.
gold_path = join(temp_dir, "gold")
Expand Down Expand Up @@ -143,5 +146,6 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
f.write(node_config)

print("running")
yield util._run_node(reactor, node_dir.path, request, None)
result = yield util._run_node(reactor, node_dir.path, request, None)
print("okay, launched")
return result
9 changes: 5 additions & 4 deletions integration/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,14 +482,15 @@ def web_post(tahoe, uri_fragment, **kwargs):
return resp.content


def await_client_ready(tahoe, timeout=10, liveness=60*2):
def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_servers=1):
"""
Uses the status API to wait for a client-type node (in `tahoe`, a
`TahoeProcess` instance usually from a fixture e.g. `alice`) to be
'ready'. A client is deemed ready if:
- it answers `http://<node_url>/statistics/?t=json/`
- there is at least one storage-server connected
- there is at least one storage-server connected (configurable via
``minimum_number_of_servers``)
- every storage-server has a "last_received_data" and it is
within the last `liveness` seconds
Expand All @@ -506,8 +507,8 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2):
time.sleep(1)
continue

if len(js['servers']) == 0:
print("waiting because no servers at all")
if len(js['servers']) < minimum_number_of_servers:
print("waiting because insufficient servers")
time.sleep(1)
continue
server_times = [
Expand Down
Empty file added newsfragments/3891.minor
Empty file.
Empty file added newsfragments/3895.minor
Empty file.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def read_version_py(infname):
"attrs >= 18.2.0",

# WebSocket library for twisted and asyncio
"autobahn >= 19.5.2",
"autobahn < 22.4.1", # remove this when 22.4.3 is released

# Support for Python 3 transition
"future >= 0.18.2",
Expand Down
38 changes: 28 additions & 10 deletions src/allmydata/storage/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ def __init__(self, code, *additional_args):
share_number = uint
"""
),
"mutable_list_shares": Schema(
"""
response = #6.258([* uint])
"""
),
}


Expand Down Expand Up @@ -375,16 +380,14 @@ def read_share_chunk(
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed downloads
should be transparently retried and redownloaded by the implementation a
few times so that if a failure percolates up, the caller can assume the
failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
NOTE: the underlying HTTP protocol is somewhat more flexible than this API,
insofar as it doesn't always require a range. In practice a range is
always provided by the current callers.
"""
url = client.relative_url(
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
Expand Down Expand Up @@ -712,11 +715,26 @@ def read_share_chunk(
share_number: int,
offset: int,
length: int,
) -> bytes:
) -> Deferred[bytes]:
"""
Download a chunk of data from a share.
"""
# TODO unit test all the things
return read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)

@async_to_deferred
async def list_shares(self, storage_index: bytes) -> set[int]:
"""
List the share numbers for a given storage index.
"""
# TODO unit test all the things
url = self._client.relative_url(
"/v1/mutable/{}/shares".format(_encode_si(storage_index))
)
response = await self._client.request("GET", url)
if response.code == http.OK:
return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"])
else:
raise ClientException(response.code)
44 changes: 31 additions & 13 deletions src/allmydata/storage/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
from allmydata.interfaces import BadWriteEnablerError


class ClientSecretsException(Exception):
Expand Down Expand Up @@ -587,19 +588,25 @@ def mutable_read_test_write(self, request, authorization, storage_index):
authorization[Secrets.LEASE_RENEW],
authorization[Secrets.LEASE_CANCEL],
)
success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
{
k: (
[(d["offset"], d["size"], b"eq", d["specimen"]) for d in v["test"]],
[(d["offset"], d["data"]) for d in v["write"]],
v["new-length"],
)
for (k, v) in rtw_request["test-write-vectors"].items()
},
[(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
)
try:
success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
{
k: (
[
(d["offset"], d["size"], b"eq", d["specimen"])
for d in v["test"]
],
[(d["offset"], d["data"]) for d in v["write"]],
v["new-length"],
)
for (k, v) in rtw_request["test-write-vectors"].items()
},
[(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
)
except BadWriteEnablerError:
raise _HTTPError(http.UNAUTHORIZED)
return self._send_encoded(request, {"success": success, "data": read_data})

@_authorized_route(
Expand Down Expand Up @@ -645,6 +652,17 @@ def read_mutable_chunk(self, request, authorization, storage_index, share_number
)
return data

@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/shares",
methods=["GET"],
)
def enumerate_mutable_shares(self, request, authorization, storage_index):
"""List mutable shares for a storage index."""
shares = self._storage_server.enumerate_mutable_shares(storage_index)
return self._send_encoded(request, shares)


@implementer(IStreamServerEndpoint)
@attr.s
Expand Down
30 changes: 18 additions & 12 deletions src/allmydata/storage/server.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
"""
Ported to Python 3.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

from future.utils import bytes_to_native_str, PY2
if PY2:
# Omit open() to get native behavior where open("w") always accepts native
# strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict, Tuple
from __future__ import annotations
from future.utils import bytes_to_native_str
from typing import Dict, Tuple

import os, re

Expand Down Expand Up @@ -699,6 +690,21 @@ def _allocate_slot_share(self, bucketdir, secrets, sharenum,
self)
return share

def enumerate_mutable_shares(self, storage_index: bytes) -> set[int]:
"""Return all share numbers for the given mutable."""
si_dir = storage_index_to_dir(storage_index)
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
return set()
result = set()
for sharenum_s in os.listdir(bucketdir):
try:
result.add(int(sharenum_s))
except ValueError:
continue
return result

def slot_readv(self, storage_index, shares, readv):
start = self._clock.seconds()
self.count("readv")
Expand Down
21 changes: 14 additions & 7 deletions src/allmydata/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
Interface,
implementer,
)
from twisted.web import http
from twisted.internet import defer
from twisted.application import service
from twisted.plugin import (
Expand Down Expand Up @@ -78,7 +79,7 @@
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
)


Expand Down Expand Up @@ -1196,9 +1197,10 @@ def slot_readv(self, storage_index, shares, readv):
mutable_client = StorageClientMutables(self._http_client)
pending_reads = {}
reads = {}
# TODO if shares list is empty, that means list all shares, so we need
# If shares list is empty, that means list all shares, so we need
# to do a query to get that.
assert shares # TODO replace with call to list shares if and only if it's empty
if not shares:
shares = yield mutable_client.list_shares(storage_index)

# Start all the queries in parallel:
for share_number in shares:
Expand Down Expand Up @@ -1246,8 +1248,13 @@ def slot_testv_and_readv_and_writev(
ReadVector(offset=offset, size=size)
for (offset, size) in r_vector
]
client_result = yield mutable_client.read_test_write_chunks(
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
client_read_vectors,
)
try:
client_result = yield mutable_client.read_test_write_chunks(
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
client_read_vectors,
)
except ClientException as e:
if e.code == http.UNAUTHORIZED:
raise RemoteException("Unauthorized write, possibly you passed the wrong write enabler?")
raise
return (client_result.success, client_result.reads)
18 changes: 16 additions & 2 deletions src/allmydata/test/test_istorageserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,22 @@ def test_slot_readv_no_shares(self):
{0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]},
)

@inlineCallbacks
def test_slot_readv_unknown_storage_index(self):
"""
With unknown storage index, ``IStorageServer.slot_readv()`` TODO.
"""
storage_index = new_storage_index()
reads = yield self.storage_client.slot_readv(
storage_index,
shares=[],
readv=[(0, 7)],
)
self.assertEqual(
reads,
{},
)

@inlineCallbacks
def create_slot(self):
"""Create a slot with sharenum 0."""
Expand Down Expand Up @@ -1150,9 +1166,7 @@ class HTTPMutableAPIsTests(

# TODO will be implemented in later tickets
SKIP_TESTS = {
"test_STARAW_write_enabler_must_match",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_slot_readv_no_shares",
}
25 changes: 25 additions & 0 deletions src/allmydata/test/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,31 @@ def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
self.failUnless(isinstance(readv_data, dict))
self.failUnlessEqual(len(readv_data), 0)

def test_enumerate_mutable_shares(self):
"""
``StorageServer.enumerate_mutable_shares()`` returns a set of share
numbers for the given storage index, or an empty set if it does not
exist at all.
"""
ss = self.create("test_enumerate_mutable_shares")

# Initially, nothing exists:
empty = ss.enumerate_mutable_shares(b"si1")

self.allocate(ss, b"si1", b"we1", b"le1", [0, 1, 4, 2], 12)
shares0_1_2_4 = ss.enumerate_mutable_shares(b"si1")

# Remove share 2, by setting size to 0:
secrets = (self.write_enabler(b"we1"),
self.renew_secret(b"le1"),
self.cancel_secret(b"le1"))
ss.slot_testv_and_readv_and_writev(b"si1", secrets, {2: ([], [], 0)}, [])
shares0_1_4 = ss.enumerate_mutable_shares(b"si1")
self.assertEqual(
(empty, shares0_1_2_4, shares0_1_4),
(set(), {0, 1, 2, 4}, {0, 1, 4})
)

def test_bad_magic(self):
ss = self.create("test_bad_magic")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0]), 10)
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ setenv =
COVERAGE_PROCESS_START=.coveragerc
commands =
# NOTE: 'run with "py.test --keep-tempdir -s -v integration/" to debug failures'
py.test --timeout=1800 --coverage -v {posargs:integration}
py.test --timeout=1800 --coverage -s -v {posargs:integration}
coverage combine
coverage report

Expand Down

0 comments on commit 325e141

Please sign in to comment.