Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Disable bytes usage with postgres #6186

Merged
merged 7 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6186.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where we were updating censored events as bytes rather than text, occaisonally causing invalid JSON being inserted breaking APIs that attempted to fetch such events.
4 changes: 2 additions & 2 deletions synapse/rest/media/v1/preview_url_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _do_preview(self, url, user, ts):

logger.debug("Calculated OG for %s as %s" % (url, og))

jsonog = json.dumps(og).encode("utf8")
jsonog = json.dumps(og)

# store OG in history-aware DB cache
yield self.store.store_url_cache(
Expand All @@ -283,7 +283,7 @@ def _do_preview(self, url, user, ts):
media_info["created_ts"],
)

return jsonog
return jsonog.encode("utf8")

@defer.inlineCallbacks
def _download_url(self, url, user):
Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class PostgresEngine(object):
def __init__(self, database_module, database_config):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)

# Disables passing `bytes` to txn.execute, c.f. #6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
def _disable_bytes_adapter(_):
raise Exception("Passing bytes to DB is disabled.")

self.module.extensions.register_adapter(bytes, _disable_bytes_adapter)
self.synchronous_commit = database_config.get("synchronous_commit", True)
self._version = None # unknown as yet

Expand Down
43 changes: 43 additions & 0 deletions synapse/storage/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ def __init__(self, db_conn, hs):
"redactions_received_ts", self._redactions_received_ts
)

# This index gets deleted in `event_fix_redactions_bytes` update
self.register_background_index_update(
"event_fix_redactions_bytes_create_index",
index_name="redactions_censored_redacts",
table="redactions",
columns=["redacts"],
where_clause="have_censored",
)

self.register_background_update_handler(
"event_fix_redactions_bytes", self._event_fix_redactions_bytes
)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
Expand Down Expand Up @@ -458,3 +471,33 @@ def _redactions_received_ts_txn(txn):
yield self._end_background_update("redactions_received_ts")

return count

@defer.inlineCallbacks
def _event_fix_redactions_bytes(self, progress, batch_size):
"""Undoes hex encoded censored redacted event JSON.
"""

def _event_fix_redactions_bytes_txn(txn):
# This update is quite fast due to new index.
txn.execute(
"""
UPDATE event_json
SET
json = convert_from(json::bytea, 'utf8')
FROM redactions
WHERE
redactions.have_censored
AND event_json.event_id = redactions.redacts
AND json NOT LIKE '{%';
"""
)

txn.execute("DROP INDEX redactions_censored_redacts")

yield self.runInteraction(
"_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
)

yield self._end_background_update("event_fix_redactions_bytes")

return 1
4 changes: 2 additions & 2 deletions synapse/storage/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _do_txn(txn):
"SELECT filter_id FROM user_filters "
"WHERE user_id = ? AND filter_json = ?"
)
txn.execute(sql, (user_localpart, def_json))
txn.execute(sql, (user_localpart, bytearray(def_json)))
filter_id_response = txn.fetchone()
if filter_id_response is not None:
return filter_id_response[0]
Expand All @@ -68,7 +68,7 @@ def _do_txn(txn):
"INSERT INTO user_filters (user_id, filter_id, filter_json)"
"VALUES(?, ?, ?)"
)
txn.execute(sql, (user_localpart, filter_id, def_json))
txn.execute(sql, (user_localpart, filter_id, bytearray(def_json)))

return filter_id

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def add_pusher(
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": encode_canonical_json(data),
"data": bytearray(encode_canonical_json(data)),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@


-- There was a bug where we may have updated censored redactions as bytes,
-- which can (somehow) cause json to be inserted hex encoded. This goes and
-- undoes any such hex encoded JSON.
UPDATE event_json SET json = convert_from(json::bytea, 'utf8')
WHERE event_id IN (
SELECT event_json.event_id
FROM event_json
INNER JOIN redactions ON (event_json.event_id = redacts)
WHERE have_censored AND json NOT LIKE '{%'
);
-- which can (somehow) cause json to be inserted hex encoded. These updates go
-- and undoes any such hex encoded JSON.

INSERT into background_updates (update_name, progress_json)
VALUES ('event_fix_redactions_bytes_create_index', '{}');

INSERT into background_updates (update_name, progress_json, depends_on)
VALUES ('event_fix_redactions_bytes', '{}', 'event_fix_redactions_bytes_create_index');
2 changes: 1 addition & 1 deletion tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def insert_event(txn, i):
"(event_id, algorithm, hash) "
"VALUES (?, 'sha256', ?)"
),
(event_id, b"ffff"),
(event_id, bytearray(b"ffff")),
)

for i in range(0, 11):
Expand Down