Skip to content

Commit

Permalink
Merge 0fbf746 into 5e8cc06
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarst committed Feb 1, 2022
2 parents 5e8cc06 + 0fbf746 commit 7e2481a
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 7 deletions.
Empty file added newsfragments/3868.minor
Empty file.
100 changes: 99 additions & 1 deletion src/allmydata/storage_client.py
Expand Up @@ -40,7 +40,7 @@
from six import ensure_text

import re, time, hashlib

from os import urandom
# On Python 2 this will be the backport.
from configparser import NoSectionError

Expand Down Expand Up @@ -75,6 +75,7 @@
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import StorageClient, StorageClientImmutables


# who is responsible for de-duplication?
Expand Down Expand Up @@ -1024,3 +1025,100 @@ def advise_corrupt_share(
shnum,
reason,
).addErrback(log.err, "Error from remote call to advise_corrupt_share")



@attr.s
class _FakeRemoteReference(object):
"""
Emulate a Foolscap RemoteReference, calling a local object instead.
"""
local_object = attr.ib(type=object)

def callRemote(self, action, *args, **kwargs):
return getattr(self.local_object, action)(*args, **kwargs)


@attr.s
class _HTTPBucketWriter(object):
"""
Emulate a ``RIBucketWriter``.
"""
client = attr.ib(type=StorageClientImmutables)
storage_index = attr.ib(type=bytes)
share_number = attr.ib(type=int)
upload_secret = attr.ib(type=bytes)
finished = attr.ib(type=bool, default=False)

def abort(self):
pass # TODO in later ticket

@defer.inlineCallbacks
def write(self, offset, data):
result = yield self.client.write_share_chunk(
self.storage_index, self.share_number, self.upload_secret, offset, data
)
if result.finished:
self.finished = True
defer.returnValue(None)

def close(self):
# A no-op in HTTP protocol.
if not self.finished:
return defer.fail(RuntimeError("You didn't finish writing?!"))
return defer.succeed(None)


# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@implementer(IStorageServer) # type: ignore
@attr.s
class _HTTPStorageServer(object):
"""
Talk to remote storage server over HTTP.
"""
_http_client = attr.ib(type=StorageClient)

@staticmethod
def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer
"""
Create an ``IStorageServer`` from a HTTP ``StorageClient``.
"""
return _HTTPStorageServer(http_client=http_client)

def get_version(self):
return self._http_client.get_version()

@defer.inlineCallbacks
def allocate_buckets(
self,
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
):
upload_secret = urandom(20)
immutable_client = StorageClientImmutables(self._http_client)
result = immutable_client.create(
storage_index, sharenums, allocated_size, upload_secret, renew_secret,
cancel_secret
)
result = yield result
defer.returnValue(
(result.already_have, {
share_num: _FakeRemoteReference(_HTTPBucketWriter(
client=immutable_client,
storage_index=storage_index,
share_number=share_num,
upload_secret=upload_secret
))
for share_num in result.allocated
})
)

def get_buckets(
self,
storage_index,
):
pass
114 changes: 108 additions & 6 deletions src/allmydata/test/test_istorageserver.py
@@ -1,6 +1,8 @@
"""
Tests for the ``IStorageServer`` interface.
Keep in mind that ``IStorageServer`` is actually the storage _client_ interface.
Note that for performance, in the future we might want the same node to be
reused across tests, so each test should be careful to generate unique storage
indexes.
Expand All @@ -17,18 +19,31 @@
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
from typing import Set

from random import Random
from unittest import SkipTest

from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from twisted.internet import reactor
from twisted.web.server import Site
from twisted.web.client import HTTPConnectionPool
from hyperlink import DecodedURL
from treq.api import set_global_pool as set_treq_pool

from foolscap.api import Referenceable, RemoteException

from allmydata.interfaces import IStorageServer # really, IStorageClient
from .common_system import SystemTestMixin
from .common import AsyncTestCase
from allmydata.storage.server import StorageServer # not a IStorageServer!!
from allmydata.storage.http_server import HTTPServer
from allmydata.storage.http_client import StorageClient
from allmydata.util.iputil import allocate_tcp_port
from allmydata.storage_client import _HTTPStorageServer


# Use random generator with known seed, so results are reproducible if tests
# are run in the same order.
Expand Down Expand Up @@ -998,20 +1013,25 @@ def test_add_new_lease(self):
self.assertEqual(lease2.get_expiration_time() - initial_expiration_time, 167)


class _FoolscapMixin(SystemTestMixin):
"""Run tests on Foolscap version of ``IStorageServer."""
class _SharedMixin(SystemTestMixin):
"""Base class for Foolscap and HTTP mixins."""

def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
SKIP_TESTS = set() # type: Set[str]

def _get_istorage_server(self):
raise NotImplementedError("implement in subclass")

@inlineCallbacks
def setUp(self):
if self._testMethodName in self.SKIP_TESTS:
raise SkipTest(
"Test {} is still not supported".format(self._testMethodName)
)

AsyncTestCase.setUp(self)
self.basedir = "test_istorageserver/" + self.id()
yield SystemTestMixin.setUp(self)
yield self.set_up_nodes(1)
self.storage_client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(self.storage_client))
self.server = None
for s in self.clients[0].services:
if isinstance(s, StorageServer):
Expand All @@ -1021,6 +1041,7 @@ def setUp(self):
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
self.storage_client = self._get_istorage_server()

def fake_time(self):
"""Return the current fake, test-controlled, time."""
Expand All @@ -1035,6 +1056,25 @@ def tearDown(self):
AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self)

@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
raise NotImplementedError("implement in subclass")


class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""

def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))

def _get_istorage_server(self):
client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return client

@inlineCallbacks
def disconnect(self):
"""
Expand All @@ -1046,18 +1086,80 @@ def disconnect(self):
assert self.storage_client is not current


class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""

def setUp(self):
if PY2:
self.skipTest("Not going to bother supporting Python 2")
return _SharedMixin.setUp(self)

def _get_istorage_server(self):
set_treq_pool(HTTPConnectionPool(reactor, persistent=False))
swissnum = b"1234"
self._http_storage_server = HTTPServer(self.server, swissnum)
self._port_number = allocate_tcp_port()
self._listening_port = reactor.listenTCP(
self._port_number,
Site(self._http_storage_server.get_resource()),
interface="127.0.0.1",
)
return _HTTPStorageServer.from_http_client(
StorageClient(
DecodedURL.from_text("http://127.0.0.1:{}".format(self._port_number)),
swissnum,
)
)
# Eventually should also:
# self.assertTrue(IStorageServer.providedBy(client))

@inlineCallbacks
def tearDown(self):
yield _SharedMixin.tearDown(self)
yield self._listening_port.stopListening()


class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for shared ``IStorageServer`` APIs."""


class HTTPSharedAPIsTests(
_HTTPMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for shared ``IStorageServer`` APIs."""


class FoolscapImmutableAPIsTests(
_FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""


class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs."""

# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_abort",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_allocate_buckets_repeat",
"test_bucket_advise_corrupt_share",
"test_disconnection",
"test_get_buckets_skips_unfinished_buckets",
"test_matching_overlapping_writes",
"test_non_matching_overlapping_writes",
"test_read_bucket_at_offset",
"test_written_shares_are_readable",
"test_written_shares_are_allocated",
}


class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
Expand Down

0 comments on commit 7e2481a

Please sign in to comment.