Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix origin handling for pushed transactions
Browse files Browse the repository at this point in the history
Use the actual origin for push transactions, rather than whatever the remote
server claimed.
  • Loading branch information
richvdh committed Sep 5, 2018
1 parent 804dd41 commit c127c8d
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 29 deletions.
20 changes: 10 additions & 10 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def on_backfill_request(self, origin, room_id, versions, limit):

@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, transaction_data):
def on_incoming_transaction(self, origin, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
Expand All @@ -108,34 +108,33 @@ def on_incoming_transaction(self, transaction_data):

if not transaction.transaction_id:
raise Exception("Transaction missing transaction_id")
if not transaction.origin:
raise Exception("Transaction missing origin")

logger.debug("[%s] Got transaction", transaction.transaction_id)

# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
with (yield self._transaction_linearizer.queue(
(transaction.origin, transaction.transaction_id),
(origin, transaction.transaction_id),
)):
result = yield self._handle_incoming_transaction(
transaction, request_time,
origin, transaction, request_time,
)

defer.returnValue(result)

@defer.inlineCallbacks
def _handle_incoming_transaction(self, transaction, request_time):
def _handle_incoming_transaction(self, origin, transaction, request_time):
""" Process an incoming transaction and return the HTTP response
Args:
origin (unicode): the server making the request
transaction (Transaction): incoming transaction
request_time (int): timestamp that the HTTP request arrived at
Returns:
Deferred[(int, object)]: http response code and body
"""
response = yield self.transaction_actions.have_responded(transaction)
response = yield self.transaction_actions.have_responded(origin, transaction)

if response:
logger.debug(
Expand All @@ -149,7 +148,7 @@ def _handle_incoming_transaction(self, transaction, request_time):

received_pdus_counter.inc(len(transaction.pdus))

origin_host, _ = parse_server_name(transaction.origin)
origin_host, _ = parse_server_name(origin)

pdus_by_room = {}

Expand Down Expand Up @@ -190,7 +189,7 @@ def process_pdus_for_room(room_id):
event_id = pdu.event_id
try:
yield self._handle_received_pdu(
transaction.origin, pdu
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
Expand All @@ -212,7 +211,7 @@ def process_pdus_for_room(room_id):
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
yield self.received_edu(
transaction.origin,
origin,
edu.edu_type,
edu.content
)
Expand All @@ -224,6 +223,7 @@ def process_pdus_for_room(room_id):
logger.debug("Returning: %s", str(response))

yield self.transaction_actions.set_response(
origin,
transaction,
200, response
)
Expand Down
8 changes: 4 additions & 4 deletions synapse/federation/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, datastore):
self.store = datastore

@log_function
def have_responded(self, transaction):
def have_responded(self, origin, transaction):
""" Have we already responded to a transaction with the same id and
origin?
Expand All @@ -50,11 +50,11 @@ def have_responded(self, transaction):
"transaction_id")

return self.store.get_received_txn_response(
transaction.transaction_id, transaction.origin
transaction.transaction_id, origin
)

@log_function
def set_response(self, transaction, code, response):
def set_response(self, origin, transaction, code, response):
""" Persist how we responded to a transaction.
Returns:
Expand All @@ -66,7 +66,7 @@ def set_response(self, transaction, code, response):

return self.store.set_received_txn_response(
transaction.transaction_id,
transaction.origin,
origin,
code,
response,
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def on_PUT(self, origin, content, query, transaction_id):

try:
code, response = yield self.handler.on_incoming_transaction(
transaction_data
origin, transaction_data,
)
except Exception:
logger.exception("on_incoming_transaction failed")
Expand Down
19 changes: 8 additions & 11 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)


def _expect_edu(destination, edu_type, content, origin="test"):
def _expect_edu_transaction(edu_type, content, origin="test"):
return {
"origin": origin,
"origin_server_ts": 1000000,
Expand All @@ -42,8 +42,8 @@ def _expect_edu(destination, edu_type, content, origin="test"):
}


def _make_edu_json(origin, edu_type, content):
return json.dumps(_expect_edu("test", edu_type, content, origin=origin)).encode(
def _make_edu_transaction_json(edu_type, content):
return json.dumps(_expect_edu_transaction(edu_type, content)).encode(
'utf8'
)

Expand Down Expand Up @@ -190,8 +190,7 @@ def test_started_typing_remote_send(self):
call(
"farm",
path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu(
"farm",
data=_expect_edu_transaction(
"m.typing",
content={
"room_id": self.room_id,
Expand Down Expand Up @@ -221,19 +220,18 @@ def test_started_typing_remote_recv(self):

self.assertEquals(self.event_source.get_current_key(), 0)

yield self.mock_federation_resource.trigger(
(code, response) = yield self.mock_federation_resource.trigger(
"PUT",
"/_matrix/federation/v1/send/1000000/",
_make_edu_json(
"farm",
_make_edu_transaction_json(
"m.typing",
content={
"room_id": self.room_id,
"user_id": self.u_onion.to_string(),
"typing": True,
},
),
federation_auth=True,
federation_auth_origin=b'farm',
)

self.on_new_event.assert_has_calls(
Expand Down Expand Up @@ -264,8 +262,7 @@ def test_stopped_typing(self):
call(
"farm",
path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu(
"farm",
data=_expect_edu_transaction(
"m.typing",
content={
"room_id": self.room_id,
Expand Down
12 changes: 9 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ def trigger_get(self, path):

@patch('twisted.web.http.Request')
@defer.inlineCallbacks
def trigger(self, http_method, path, content, mock_request, federation_auth=False):
def trigger(
self, http_method, path, content, mock_request,
federation_auth_origin=None,
):
""" Fire an HTTP event.
Args:
Expand All @@ -315,6 +318,7 @@ def trigger(self, http_method, path, content, mock_request, federation_auth=Fals
content : The HTTP body
mock_request : Mocked request to pass to the event so it can get
content.
federation_auth_origin (bytes|None): domain to authenticate as, for federation
Returns:
A tuple of (code, response)
Raises:
Expand All @@ -335,8 +339,10 @@ def trigger(self, http_method, path, content, mock_request, federation_auth=Fals
mock_request.getClientIP.return_value = "-"

headers = {}
if federation_auth:
headers[b"Authorization"] = [b"X-Matrix origin=test,key=,sig="]
if federation_auth_origin is not None:
headers[b"Authorization"] = [
b"X-Matrix origin=%s,key=,sig=" % (federation_auth_origin, )
]
mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)

# return the right path if the event requires it
Expand Down

0 comments on commit c127c8d

Please sign in to comment.