Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add integration testing for Rust connection node
Browse files Browse the repository at this point in the history
Adds the following functionality for capability parity:

- Skip sending messages if the message is expired
- Properly handle legacy messages in a message stream
- Set appropriate flags for a uaid not found in the db
- Always return a timestamp when querying into timestamp messages
- Send messages in the order they're retrieved from the db
- Accept messages from endpoint while waiting for acks
- Don't save TTL:0 messages in the db if the client fails to ack them
- Allow TTL:None from endpoint and treat as TTL:0

Closes #1060
  • Loading branch information
bbangert committed Jan 18, 2018
1 parent 5b4cd09 commit c4351cc
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 46 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -22,8 +22,9 @@ include/*
lib_pypy/*
*.swp
pypy/
src/
./src/
.tox/
.eggs/
autopush_rs/target
autopush_rs/_native*
*.rs.bk
21 changes: 13 additions & 8 deletions autopush/tests/test_integration.py
Expand Up @@ -107,9 +107,10 @@ def hello(self, uaid=None):
else:
chans = []
hello_dict = dict(messageType="hello",
uaid=uaid or self.uaid or "",
use_webpush=True,
channelIDs=chans)
if uaid or self.uaid:
hello_dict["uaid"] = uaid or self.uaid
msg = json.dumps(hello_dict)
log.debug("Send: %s", msg)
self.ws.send(msg)
Expand Down Expand Up @@ -278,13 +279,16 @@ class IntegrationBase(unittest.TestCase):
track_objects = True
track_objects_excludes = [AutopushConfig, PushServerFactory]

connection_port = 9010
endpoint_port = 9020
router_port = 9030

_endpoint_defaults = dict(
hostname='localhost',
port=endpoint_port,
endpoint_port=endpoint_port,
endpoint_scheme='http',
router_port=router_port,
statsd_host=None,
router_table=dict(tablename=ROUTER_TABLE),
message_table=dict(tablename=MESSAGE_TABLE),
Expand All @@ -293,9 +297,9 @@ class IntegrationBase(unittest.TestCase):

_conn_defaults = dict(
hostname='localhost',
port=9010,
port=connection_port,
endpoint_port=endpoint_port,
router_port=9030,
router_port=router_port,
endpoint_scheme='http',
statsd_host=None,
router_table=dict(tablename=ROUTER_TABLE),
Expand Down Expand Up @@ -340,7 +344,7 @@ def conn_kwargs(self):

@inlineCallbacks
def quick_register(self, sslcontext=None):
client = Client("ws://localhost:9010/",
client = Client("ws://localhost:{}/".format(self.connection_port),
sslcontext=sslcontext)
yield client.connect()
yield client.hello()
Expand Down Expand Up @@ -498,7 +502,7 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):
hostname="localhost")
self.conn.db.metrics._client = Mock()

client = Client("ws://localhost:9010/")
client = Client("ws://localhost:{}/".format(self.connection_port))
yield client.connect()
yield client.hello()
for chan, test in tests.items():
Expand Down Expand Up @@ -532,7 +536,7 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):
def test_webpush_data_save_fail(self):
chan = "d248d4e0-0ef4-41d9-8db5-2533ad8e4041"
test = dict(data=b"\xe2\x82\x28\xf0\x28\x8c\xbc", result="4oIo8CiMvA")
client = Client("ws://localhost:9010/")
client = Client("ws://localhost:{}/".format(self.connection_port))
yield client.connect()
yield client.hello()
yield client.register(chid=chan)
Expand Down Expand Up @@ -1365,7 +1369,7 @@ def test_webpush_monthly_rotation_prior_record_exists(self):
@inlineCallbacks
def test_webpush_monthly_rotation_no_channels(self):
from autopush.db import make_rotating_tablename
client = Client("ws://localhost:9010/")
client = Client("ws://localhost:{}/".format(self.connection_port))
yield client.connect()
yield client.hello()
yield client.disconnect()
Expand Down Expand Up @@ -1562,7 +1566,8 @@ def _test_health_skips_auth(self, certfile):
class TestHealth(IntegrationBase):
@inlineCallbacks
def test_status(self):
response, body = yield _agent('GET', "http://localhost:9010/status")
response, body = yield _agent(
'GET', "http://localhost:{}/status".format(self.connection_port))
assert response.code == 200
payload = json.loads(body)
assert payload == dict(status="OK", version=__version__)
Expand Down
6 changes: 6 additions & 0 deletions autopush/tests/test_main.py → autopush/tests/test_z_main.py
@@ -1,3 +1,9 @@
"""Test main instantiation
This is named test_z_main.py to run it last. Due to issues in this test, the
testing environment is unclean and no further tests can be run reliably.
"""
import unittest
import datetime
import json
Expand Down
41 changes: 35 additions & 6 deletions autopush/webpush_server.py
Expand Up @@ -55,6 +55,22 @@ def uaid_from_str(input):
return None


def dict_to_webpush_message(input):
if isinstance(input, dict):
return WebPushMessage(
uaid=input.get("uaid"),
timestamp=input["timestamp"],
channelID=input["channelID"],
ttl=input["ttl"],
topic=input.get("topic"),
version=input["version"],
sortkey_timestamp=input.get("sortkey_timestamp"),
data=input.get("data"),
headers=input.get("headers"),
)
return input


@attrs(slots=True)
class WebPushMessage(object):
"""Serializable version of attributes needed for message delivery"""
Expand All @@ -64,7 +80,7 @@ class WebPushMessage(object):
ttl = attrib() # type: int
topic = attrib() # type: str
version = attrib() # type: str
sortkey_timestamp = attrib(default=None) # type: Optional[int]
sortkey_timestamp = attrib(default=None) # type: Optional[str]
data = attrib(default=None) # type: Optional[str]
headers = attrib(default=None) # type: Optional[JSONDict]

Expand All @@ -84,7 +100,7 @@ def from_WebPushNotification(cls, notif):

def to_WebPushNotification(self):
# type: () -> WebPushNotification
return WebPushNotification(
notif = WebPushNotification(
uaid=UUID(self.uaid),
channel_id=self.channelID,
data=self.data,
Expand All @@ -94,8 +110,15 @@ def to_WebPushNotification(self):
timestamp=self.timestamp,
message_id=self.version,
update_id=self.version,
sortkey_timestamp=self.sortkey_timestamp,
)

# If there's no sortkey_timestamp and no topic, its legacy
if not notif.sortkey_timestamp and not notif.topic:
notif.legacy = True

return notif


###############################################################################
# Input messages off the incoming queue
Expand Down Expand Up @@ -129,7 +152,7 @@ class IncStoragePosition(InputCommand):
@attrs(slots=True)
class DeleteMessage(InputCommand):
message_month = attrib() # type: str
message = attrib() # type: WebPushMessage
message = attrib(convert=dict_to_webpush_message) # type: WebPushMessage


@attrs(slots=True)
Expand Down Expand Up @@ -353,7 +376,10 @@ def process(self, hello):
reset_uaid=False
)
if hello.uaid:
user_item, flags = self.lookup_user(hello)
user_item, new_flags = self.lookup_user(hello)
if user_item:
# Only swap for the new flags if the user exists
flags = new_flags

if not user_item:
user_item = self.create_user(hello)
Expand Down Expand Up @@ -447,8 +473,6 @@ def drop_user(self, uaid, uaid_record, code):
class CheckStorageCommand(ProcessorCommand):
def process(self, command):
# type: (CheckStorage) -> CheckStorageResponse

# First, determine if there's any messages to retrieve
timestamp, messages, include_topic = self._check_storage(command)
return CheckStorageResponse(
timestamp=timestamp,
Expand Down Expand Up @@ -482,6 +506,11 @@ def _check_storage(self, command):
)
messages = [WebPushMessage.from_WebPushNotification(m)
for m in messages]

# If we're out of messages, timestamp is set to None, so we return
# the last timestamp supplied
if not timestamp:
timestamp = command.timestamp
return timestamp, messages, False


Expand Down
10 changes: 5 additions & 5 deletions autopush_rs/src/call.rs
Expand Up @@ -141,7 +141,7 @@ enum Call {
uaid: String,
message_month: String,
include_topic: bool,
timestamp: Option<i64>,
timestamp: Option<u64>,
},

DeleteMessage {
Expand All @@ -152,7 +152,7 @@ enum Call {
IncStoragePosition {
uaid: String,
message_month: String,
timestamp: i64,
timestamp: u64,
},

DropUser { uaid: String },
Expand Down Expand Up @@ -210,7 +210,7 @@ pub enum UnRegisterResponse {
pub struct CheckStorageResponse {
pub include_topic: bool,
pub messages: Vec<protocol::Notification>,
pub timestamp: Option<i64>,
pub timestamp: Option<u64>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Server {
uaid: String,
message_month: String,
include_topic: bool,
timestamp: Option<i64>,
timestamp: Option<u64>,
) -> MyFuture<CheckStorageResponse> {
let (call, fut) = PythonCall::new(&Call::CheckStorage {
uaid: uaid,
Expand All @@ -309,7 +309,7 @@ impl Server {
&self,
uaid: String,
message_month: String,
timestamp: i64,
timestamp: u64,
) -> MyFuture<IncStorageResponse> {
let (call, fut) = PythonCall::new(&Call::IncStoragePosition {
uaid: uaid,
Expand Down

0 comments on commit c4351cc

Please sign in to comment.