Skip to content

Commit

Permalink
mobile_push_service: Add subcommand for deactivating remote servers.
Browse files Browse the repository at this point in the history
  • Loading branch information
eeshangarg authored and timabbott committed Jan 21, 2022
1 parent 5c5910d commit b95b1ca
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions zerver/management/commands/mobile_push_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import subprocess
from argparse import ArgumentParser
from typing import Any, Dict
from typing import Any, Dict, Optional

import requests
from django.conf import settings
Expand All @@ -26,6 +27,11 @@ def add_arguments(self, parser: ArgumentParser) -> None:
action="store_true",
help="Register your server for the Mobile Push Notification Service.",
)
parser.add_argument(
"--deactivate",
action="store_true",
help="Deactivate your server's Mobile Push Notification Service registration.",
)
parser.add_argument(
"--agree_to_terms_of_service",
action="store_true",
Expand Down Expand Up @@ -64,6 +70,8 @@ def handle(self, *args: Any, **options: Any) -> None:
self._handle_register_subcommand(*args, **options)
elif options["rotate_key"]:
self._handle_rotate_key_subcommand(*args, **options)
elif options["deactivate"]:
self._handle_deactivate_subcommand(*args, **options)

def _handle_rotate_key_subcommand(self, *args: Any, **options: Any) -> None:
request = {
Expand Down Expand Up @@ -131,11 +139,27 @@ def _handle_register_subcommand(self, *args: Any, **options: Any) -> None:
else:
print("Mobile Push Notification Service registration successfully updated!")

def _request_push_notification_bouncer_url(self, url: str, params: Dict[str, Any]) -> Response:
def _handle_deactivate_subcommand(self, *args: Any, **options: Any) -> None:
credentials = "{}:{}".format(settings.ZULIP_ORG_ID, settings.ZULIP_ORG_KEY)
basic_auth = "Basic " + base64.b64encode(credentials.encode()).decode()

response = self._request_push_notification_bouncer_url(
"/api/v1/remotes/server/deactivate", headers={"authorization": basic_auth}
)

assert response.json()["result"] == "success"
print("Mobile Push Notification Service registration successfully deactivated!")

def _request_push_notification_bouncer_url(
self,
url: str,
params: Optional[Dict[str, Any]] = {},
headers: Optional[Dict[str, Any]] = {},
) -> Response:
registration_url = settings.PUSH_NOTIFICATION_BOUNCER_URL + url
session = PushBouncerSession()
try:
response = session.post(registration_url, params=params)
response = session.post(registration_url, params=params, headers=headers)
except requests.RequestException:
raise CommandError(
"Network error connecting to push notifications service "
Expand Down

0 comments on commit b95b1ca

Please sign in to comment.