Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove legacy db entirely #236

Merged
merged 17 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 1 addition & 32 deletions .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- run: python -m pip install tox
- run: tox -e check_types

run-unit-tests-sqlite:
run-unit-tests:
needs: [check-code-style, check-types-mypy]
runs-on: ubuntu-latest
steps:
Expand All @@ -35,34 +35,3 @@ jobs:
python-version: "3.7"
- run: python -m pip install -e .
- run: trial tests

run-unit-tests-postgres:
needs: [check-code-style, check-types-mypy]
runs-on: ubuntu-latest

services:
postgres:
image: postgres:11
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: "postgres"
POSTGRES_INITDB_ARGS: "--lc-collate C --lc-ctype C --encoding UTF8"
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: "3.7"
- run: python -m pip install -e .
- run: trial tests
env:
TEST_USE_POSTGRES: "yes"
TEST_POSTGRES_USER: "postgres"
TEST_POSTGRES_PASSWORD: "postgres"
TEST_POSTGRES_HOST: "localhost"
10 changes: 0 additions & 10 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@ Everyone is welcome to contribute code to Sygnal, provided you are willing to
license your contributions under the same license as the project itself. In
this case, the [Apache Software License v2](LICENSE).

## Preparing your development environment

Sygnal depends on the `psycopg2` database adapter for PostgreSQL.
You may need to install development headers for Python and libpq.
For example on Debian/Ubuntu distributions these can be installed with:

```bash
sudo apt install libpq-dev python3-dev build-essential
```

### Create a virtualenv

To contribute to Sygnal, ensure you have Python 3.7 or newer and then run:
Expand Down
3 changes: 0 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ To change this, set the ``SYGNAL_CONF`` environment variable to the path to your
A sample configuration file is provided in this repository;
see ``sygnal.yaml.sample``.

Sygnal supports using either SQLite3 or PostgreSQL as a database backend. See the ``sygnal.yaml.sample``
for more information on how to configure.

The `apps:` section is where you set up different apps that are to be handled.
Each app should be given its own subsection, with the key of that subsection being the app's ``app_id``.
Keys in this section take the form of the ``app_id``, as specified when setting up a Matrix pusher
Expand Down
1 change: 1 addition & 0 deletions changelog.d/236.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove legacy database to ease horizontal scaling.
8 changes: 1 addition & 7 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ FROM python:3.7-slim as builder
# (we really just care about caching a wheel here, as the "pip install" below
# will install them again.)

# we need dependencies for postgresql
RUN apt-get update && apt-get install -y gcc git libpq-dev

RUN pip install --prefix="/install" --no-warn-script-location cryptography
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

# now install sygnal and all of the python deps to /install.
# install sygnal and all of the python deps to /install.

COPY . /sygnal/

Expand All @@ -33,7 +28,6 @@ RUN pip install --prefix="/install" --no-warn-script-location /sygnal
###

FROM python:3.7-slim
RUN apt-get update && apt-get install -y libpq5 && apt-get clean
COPY --from=builder /install /usr/local

EXPOSE 5000/tcp
Expand Down
3 changes: 0 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ ignore_missing_imports = True
[mypy-importlib_metadata]
ignore_missing_imports = True

[mypy-psycopg2]
ignore_missing_imports = True

[mypy-setuptools]
ignore_missing_imports = True

Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def read(fname: Union[str, "PathLike[str]"]) -> str:
"sentry-sdk>=0.10.2",
"zope.interface>=4.6.0",
"idna>=2.8",
"psycopg2>=2.8.4",
"importlib_metadata",
"pywebpush>=1.13.0",
"py-vapid>=1.7.0",
Expand Down
38 changes: 0 additions & 38 deletions sygnal.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,6 @@
# See: matrix.org
##

# The 'database' setting defines the database that sygnal uses to store all of
# its data.
#
# 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or
# 'psycopg2' (for PostgreSQL).
#
# 'args' gives options which are passed through to the database engine,
# except for options starting 'cp_', which are used to configure the Twisted
# connection pool. For a reference to valid arguments, see:
# * for sqlite: https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
# * for postgres: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
# * for the connection pool: https://twistedmatrix.com/documents/current/api/twisted.enterprise.adbapi.ConnectionPool.html#__init__
#
#
# Example SQLite configuration:
#
#database:
# name: sqlite3
# args:
# dbfile: /path/to/database.db
#
#
# Example Postgres configuration:
#
#database:
# name: psycopg2
# args:
# host: localhost
# database: sygnal
# user: sygnal
# password: pass
# cp_min: 1
# cp_max: 5
#
database:
name: sqlite3
args:
dbfile: sygnal.db

## Logging #
#
Expand Down
129 changes: 8 additions & 121 deletions sygnal/gcmpushkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from opentracing import logs, tags
from prometheus_client import Counter, Gauge, Histogram
from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import DeferredSemaphore
from twisted.web.client import FileBodyProducer, HTTPConnectionPool, readBody
from twisted.web.http_headers import Headers
Expand Down Expand Up @@ -99,7 +98,7 @@ class GcmPushkin(ConcurrencyLimitedPushkin):
"max_connections",
} | ConcurrencyLimitedPushkin.UNDERSTOOD_CONFIG_FIELDS

def __init__(self, name, sygnal, config, canonical_reg_id_store):
def __init__(self, name, sygnal, config):
super(GcmPushkin, self).__init__(name, sygnal, config)

nonunderstood = set(self.cfg.keys()).difference(self.UNDERSTOOD_CONFIG_FIELDS)
Expand Down Expand Up @@ -128,9 +127,6 @@ def __init__(self, name, sygnal, config, canonical_reg_id_store):
proxy_url_str=proxy_url,
)

self.db = sygnal.database
self.canonical_reg_id_store = canonical_reg_id_store

self.api_key = self.get_config("api_key")
if not self.api_key:
raise PushkinSetupException("No API key set in config")
Expand All @@ -154,14 +150,7 @@ async def create(cls, name, sygnal, config):
Returns:
an instance of this Pushkin
"""
logger.debug("About to set up CanonicalRegId Store")
canonical_reg_id_store = CanonicalRegIdStore(
sygnal.database, sygnal.database_engine
)
await canonical_reg_id_store.setup()
logger.debug("Finished setting up CanonicalRegId Store")

return cls(name, sygnal, config, canonical_reg_id_store)
return cls(name, sygnal, config)

async def _perform_http_request(self, body, headers):
"""
Expand Down Expand Up @@ -276,11 +265,6 @@ async def _request_dispatch(self, n, log, body, headers, pushkeys, span):
# determine which pushkeys to retry or forget about
new_pushkeys = []
for i, result in enumerate(resp_object["results"]):
span.set_tag("gcm_regid_updated", "registration_id" in result)
if "registration_id" in result:
await self.canonical_reg_id_store.set_canonical_id(
pushkeys[i], result["registration_id"]
)
if "error" in result:
log.warning(
"Error for pushkey %s: %s", pushkeys[i], result["error"]
Expand Down Expand Up @@ -333,30 +317,13 @@ async def _dispatch_notification_unlimited(self, n, device, context):
with self.sygnal.tracer.start_span(
"gcm_dispatch", tags=span_tags, child_of=context.opentracing_span
) as span_parent:
reg_id_mappings = await self.canonical_reg_id_store.get_canonical_ids(
pushkeys
)

reg_id_mappings = {
reg_id: canonical_reg_id or reg_id
for (reg_id, canonical_reg_id) in reg_id_mappings.items()
}

inverse_reg_id_mappings = {v: k for (k, v) in reg_id_mappings.items()}

data = GcmPushkin._build_data(n, device)
headers = {
b"User-Agent": ["sygnal"],
b"Content-Type": ["application/json"],
b"Authorization": ["key=%s" % (self.api_key,)],
}

# count the number of remapped registration IDs in the request
span_parent.set_tag(
"gcm_num_remapped_reg_ids_used",
[k != v for (k, v) in reg_id_mappings.items()].count(True),
)

# TODO: Implement collapse_key to queue only one message per room.
failed = []

Expand All @@ -365,14 +332,12 @@ async def _dispatch_notification_unlimited(self, n, device, context):
body["priority"] = "normal" if n.prio == "low" else "high"

for retry_number in range(0, MAX_TRIES):
mapped_pushkeys = [reg_id_mappings[pk] for pk in pushkeys]

if len(pushkeys) == 1:
body["to"] = mapped_pushkeys[0]
body["to"] = pushkeys[0]
else:
body["registration_ids"] = mapped_pushkeys
body["registration_ids"] = pushkeys

log.info("Sending (attempt %i) => %r", retry_number, mapped_pushkeys)
log.info("Sending (attempt %i) => %r", retry_number, pushkeys)

try:
span_tags = {"retry_num": retry_number}
Expand All @@ -381,13 +346,11 @@ async def _dispatch_notification_unlimited(self, n, device, context):
"gcm_dispatch_try", tags=span_tags, child_of=span_parent
) as span:
new_failed, new_pushkeys = await self._request_dispatch(
n, log, body, headers, mapped_pushkeys, span
n, log, body, headers, pushkeys, span
)
pushkeys = new_pushkeys
failed += [
inverse_reg_id_mappings[canonical_pk]
for canonical_pk in new_failed
]
failed += new_failed

if len(pushkeys) == 0:
break
except TemporaryNotificationDispatchException as exc:
Expand Down Expand Up @@ -459,79 +422,3 @@ def _build_data(n, device):
data["missed_calls"] = n.counts.missed_calls

return data


class CanonicalRegIdStore(object):
TABLE_CREATE_QUERY = """
CREATE TABLE IF NOT EXISTS gcm_canonical_reg_id (
reg_id TEXT PRIMARY KEY,
canonical_reg_id TEXT NOT NULL
);
"""

def __init__(self, db: ConnectionPool, engine: str):
"""
Args:
db (adbapi.ConnectionPool): database to prepare
engine (str):
Database engine to use. Shoud be either "sqlite" or "postgresql".
"""
self.db = db
self.engine = engine

async def setup(self):
"""
Prepares, if necessary, the database for storing canonical registration IDs.

Separate method from the constructor because we wait for an async request
to complete, so it must be an `async def` method.
"""
await self.db.runOperation(self.TABLE_CREATE_QUERY)

async def set_canonical_id(self, reg_id, canonical_reg_id):
"""
Associates a GCM registration ID with a canonical registration ID.
Args:
reg_id (str): a registration ID
canonical_reg_id (str): the canonical registration ID for `reg_id`
"""
if self.engine == "sqlite":
await self.db.runOperation(
"INSERT OR REPLACE INTO gcm_canonical_reg_id VALUES (?, ?);",
(reg_id, canonical_reg_id),
)
else:
await self.db.runOperation(
"""
INSERT INTO gcm_canonical_reg_id VALUES (%s, %s)
ON CONFLICT (reg_id) DO UPDATE
SET canonical_reg_id = EXCLUDED.canonical_reg_id;
""",
(reg_id, canonical_reg_id),
)

async def get_canonical_ids(self, reg_ids):
"""
Retrieves the canonical registration ID for multiple registration IDs.

Args:
reg_ids (iterable): registration IDs to retrieve canonical registration
IDs for.

Returns (dict):
mapping of registration ID to either its canonical registration ID,
or `None` if there is no entry.
"""
parameter_key = "?" if self.engine == "sqlite" else "%s"
rows = dict(
await self.db.runQuery(
"""
SELECT reg_id, canonical_reg_id
FROM gcm_canonical_reg_id
WHERE reg_id IN (%s)
"""
% (",".join(parameter_key for _ in reg_ids)),
reg_ids,
)
)
return {reg_id: dict(rows).get(reg_id) for reg_id in reg_ids}
Loading