Skip to content

Commit

Permalink
Cleanup old data (#2066)
Browse files Browse the repository at this point in the history
* Cleanup tasks

* Update

* Added tests

* Create cron job

* Delete old data cron

* Fix import

* import fix

* Added delete + script to disable pgp for proton mboxes
  • Loading branch information
acasajus committed Mar 18, 2024
1 parent f2fcaa6 commit 0da1811
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 35 deletions.
2 changes: 2 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ def sl_getenv(env_var: str, default_factory: Callable = None):
HIBP_RPM = 100
HIBP_SKIP_PARTNER_ALIAS = os.environ.get("HIBP_SKIP_PARTNER_ALIAS")

KEEP_OLD_DATA_DAYS = 30

POSTMASTER = os.environ.get("POSTMASTER")

# store temporary files, especially for debugging
Expand Down
59 changes: 25 additions & 34 deletions app/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,39 @@
import boto3
import requests

from app.config import (
AWS_REGION,
BUCKET,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
LOCAL_FILE_UPLOAD,
UPLOAD_DIR,
URL,
AWS_ENDPOINT_URL,
)
from app import config
from app.log import LOG


_s3_client = None


def _get_s3client():
global _s3_client
if _s3_client is None:
args = {
"aws_access_key_id": AWS_ACCESS_KEY_ID,
"aws_secret_access_key": AWS_SECRET_ACCESS_KEY,
"region_name": AWS_REGION,
"aws_access_key_id": config.AWS_ACCESS_KEY_ID,
"aws_secret_access_key": config.AWS_SECRET_ACCESS_KEY,
"region_name": config.AWS_REGION,
}
if AWS_ENDPOINT_URL:
args["endpoint_url"] = AWS_ENDPOINT_URL
if config.AWS_ENDPOINT_URL:
args["endpoint_url"] = config.AWS_ENDPOINT_URL
_s3_client = boto3.client("s3", **args)
return _s3_client


def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-stream"):
bs.seek(0)

if LOCAL_FILE_UPLOAD:
file_path = os.path.join(UPLOAD_DIR, key)
if config.LOCAL_FILE_UPLOAD:
file_path = os.path.join(config.UPLOAD_DIR, key)
file_dir = os.path.dirname(file_path)
os.makedirs(file_dir, exist_ok=True)
with open(file_path, "wb") as f:
f.write(bs.read())

else:
_get_s3client().put_object(
Bucket=BUCKET,
Bucket=config.BUCKET,
Key=key,
Body=bs,
ContentType=content_type,
Expand All @@ -57,16 +47,16 @@ def upload_from_bytesio(key: str, bs: BytesIO, content_type="application/octet-s
def upload_email_from_bytesio(path: str, bs: BytesIO, filename):
bs.seek(0)

if LOCAL_FILE_UPLOAD:
file_path = os.path.join(UPLOAD_DIR, path)
if config.LOCAL_FILE_UPLOAD:
file_path = os.path.join(config.UPLOAD_DIR, path)
file_dir = os.path.dirname(file_path)
os.makedirs(file_dir, exist_ok=True)
with open(file_path, "wb") as f:
f.write(bs.read())

else:
_get_s3client().put_object(
Bucket=BUCKET,
Bucket=config.BUCKET,
Key=path,
Body=bs,
# Support saving a remote file using Http header
Expand All @@ -77,12 +67,12 @@ def upload_email_from_bytesio(path: str, bs: BytesIO, filename):


def download_email(path: str) -> Optional[str]:
if LOCAL_FILE_UPLOAD:
file_path = os.path.join(UPLOAD_DIR, path)
if config.LOCAL_FILE_UPLOAD:
file_path = os.path.join(config.UPLOAD_DIR, path)
with open(file_path, "rb") as f:
return f.read()
resp = _get_s3client().get_object(
Bucket=BUCKET,
Bucket=config.BUCKET,
Key=path,
)
if not resp or "Body" not in resp:
Expand All @@ -96,29 +86,30 @@ def upload_from_url(url: str, upload_path):


def get_url(key: str, expires_in=3600) -> str:
if LOCAL_FILE_UPLOAD:
return URL + "/static/upload/" + key
if config.LOCAL_FILE_UPLOAD:
return config.URL + "/static/upload/" + key
else:
return _get_s3client().generate_presigned_url(
ExpiresIn=expires_in,
ClientMethod="get_object",
Params={"Bucket": BUCKET, "Key": key},
Params={"Bucket": config.BUCKET, "Key": key},
)


def delete(path: str):
if LOCAL_FILE_UPLOAD:
os.remove(os.path.join(UPLOAD_DIR, path))
if config.LOCAL_FILE_UPLOAD:
file_path = os.path.join(config.UPLOAD_DIR, path)
os.remove(file_path)
else:
_get_s3client().delete_object(Bucket=BUCKET, Key=path)
_get_s3client().delete_object(Bucket=config.BUCKET, Key=path)


def create_bucket_if_not_exists():
s3client = _get_s3client()
buckets = s3client.list_buckets()
for bucket in buckets["Buckets"]:
if bucket["Name"] == BUCKET:
if bucket["Name"] == config.BUCKET:
LOG.i("Bucket already exists")
return
s3client.create_bucket(Bucket=BUCKET)
LOG.i(f"Bucket {BUCKET} created")
s3client.create_bucket(Bucket=config.BUCKET)
LOG.i(f"Bucket {config.BUCKET} created")
14 changes: 14 additions & 0 deletions cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
from app.proton.utils import get_proton_partner
from app.utils import sanitize_email
from server import create_light_app
from tasks.cleanup_old_imports import cleanup_old_imports
from tasks.cleanup_old_jobs import cleanup_old_jobs
from tasks.cleanup_old_notifications import cleanup_old_notifications

DELETE_GRACE_DAYS = 30

Expand Down Expand Up @@ -1221,6 +1224,13 @@ def clear_users_scheduled_to_be_deleted(dry_run=False):
Session.commit()


def delete_old_data():
oldest_valid = arrow.now().shift(days=-config.KEEP_OLD_DATA_DAYS)
cleanup_old_imports(oldest_valid)
cleanup_old_jobs(oldest_valid)
cleanup_old_notifications(oldest_valid)


if __name__ == "__main__":
LOG.d("Start running cronjob")
parser = argparse.ArgumentParser()
Expand All @@ -1235,6 +1245,7 @@ def clear_users_scheduled_to_be_deleted(dry_run=False):
"notify_manual_subscription_end",
"notify_premium_end",
"delete_logs",
"delete_old_data",
"poll_apple_subscription",
"sanity_check",
"delete_old_monitoring",
Expand Down Expand Up @@ -1263,6 +1274,9 @@ def clear_users_scheduled_to_be_deleted(dry_run=False):
elif args.job == "delete_logs":
LOG.d("Deleted Logs")
delete_logs()
elif args.job == "delete_old_data":
LOG.d("Delete old data")
delete_old_data()
elif args.job == "poll_apple_subscription":
LOG.d("Poll Apple Subscriptions")
poll_apple_subscription()
Expand Down
6 changes: 6 additions & 0 deletions crontab.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ jobs:
schedule: "15 5 * * *"
captureStderr: true

- name: SimpleLogin Delete Old data
command: python /code/cron.py -j delete_old_data
shell: /bin/bash
schedule: "30 5 * * *"
captureStderr: true

- name: SimpleLogin Poll Apple Subscriptions
command: python /code/cron.py -j poll_apple_subscription
shell: /bin/bash
Expand Down
Empty file added tasks/__init__.py
Empty file.
19 changes: 19 additions & 0 deletions tasks/cleanup_old_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import arrow

from app import s3
from app.log import LOG
from app.models import BatchImport


def cleanup_old_imports(oldest_allowed: arrow.Arrow):
LOG.i(f"Deleting imports older than {oldest_allowed}")
for batch_import in (
BatchImport.filter(BatchImport.created_at < oldest_allowed).yield_per(500).all()
):
LOG.i(
f"Deleting batch import {batch_import} with file {batch_import.file.path}"
)
file = batch_import.file
if file is not None:
s3.delete(file.path)
BatchImport.delete(batch_import.id, commit=True)
24 changes: 24 additions & 0 deletions tasks/cleanup_old_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import arrow
from sqlalchemy import or_, and_

from app import config
from app.db import Session
from app.log import LOG
from app.models import Job, JobState


def cleanup_old_jobs(oldest_allowed: arrow.Arrow):
LOG.i(f"Deleting jobs older than {oldest_allowed}")
count = Job.filter(
or_(
Job.state == JobState.done.value,
Job.state == JobState.error.value,
and_(
Job.state == JobState.taken.value,
Job.attempts >= config.JOB_MAX_ATTEMPTS,
),
),
Job.updated_at < oldest_allowed,
).delete()
Session.commit()
LOG.i(f"Deleted {count} jobs")
12 changes: 12 additions & 0 deletions tasks/cleanup_old_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import arrow

from app.db import Session
from app.log import LOG
from app.models import Notification


def cleanup_old_notifications(oldest_allowed: arrow.Arrow):
LOG.i(f"Deleting notifications older than {oldest_allowed}")
count = Notification.filter(Notification.created_at < oldest_allowed).delete()
Session.commit()
LOG.i(f"Deleted {count} notifications")
28 changes: 27 additions & 1 deletion tests/cron/test_get_alias_for_hibp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_get_alias_for_free_user_has_no_alias():
assert len(aliases) == 0


def test_get_alias_for_lifetime():
def test_get_alias_for_lifetime_with_null_hibp_date():
user = create_new_user()
user.lifetime = True
alias_id = Alias.create_new_random(user).id
Expand All @@ -39,6 +39,19 @@ def test_get_alias_for_lifetime():
assert alias_id == aliases[0].id


def test_get_alias_for_lifetime_with_old_hibp_date():
user = create_new_user()
user.lifetime = True
alias = Alias.create_new_random(user)
alias.hibp_last_check = arrow.now().shift(days=-1)
alias_id = alias.id
Session.commit()
aliases = list(
cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1)
)
assert alias_id == aliases[0].id


def create_partner_sub(user: User):
pu = PartnerUser.create(
partner_id=get_proton_partner().id,
Expand Down Expand Up @@ -114,3 +127,16 @@ def test_skipped_user_is_not_checked():
cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1)
)
assert len(aliases) == 0


def test_already_checked_is_not_checked():
user = create_new_user()
user.lifetime = True
alias = Alias.create_new_random(user)
alias.hibp_last_check = arrow.now().shift(days=1)
alias_id = alias.id
Session.commit()
aliases = list(
cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1)
)
assert len(aliases) == 0
Empty file added tests/tasks/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions tests/tasks/test_cleanup_old_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import tempfile
from io import BytesIO

import arrow

from app import s3, config
from app.models import File, BatchImport
from tasks.cleanup_old_imports import cleanup_old_imports
from tests.utils import random_token, create_new_user


def test_cleanup_old_imports():
BatchImport.filter().delete()
with tempfile.TemporaryDirectory() as tmpdir:
config.UPLOAD_DIR = tmpdir
user = create_new_user()
path = random_token()
s3.upload_from_bytesio(path, BytesIO("data".encode("utf-8")))
file = File.create(path=path, commit=True) # noqa: F821
now = arrow.now()
delete_batch_import_id = BatchImport.create(
user_id=user.id,
file_id=file.id,
created_at=now.shift(minutes=-1),
flush=True,
).id
keep_batch_import_id = BatchImport.create(
user_id=user.id,
file_id=file.id,
created_at=now.shift(minutes=+1),
commit=True,
).id
cleanup_old_imports(now)
assert BatchImport.get(id=delete_batch_import_id) is None
assert BatchImport.get(id=keep_batch_import_id) is not None

0 comments on commit 0da1811

Please sign in to comment.