Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

Commit

Permalink
Merge 61e3cc9 into 2a35221
Browse files Browse the repository at this point in the history
  • Loading branch information
radkomateusz committed Jun 3, 2019
2 parents 2a35221 + 61e3cc9 commit f691ddd
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 74 deletions.
10 changes: 10 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,13 @@ project_settings:

# default_restoration_project_id - project into which data will be restored by default during restoration process
# default_restoration_project_id: 'default-restoration-storage-project-id'

retention_settings:

# young_old_generation_threshold_in_months - for all backups older than this number of months,
# retention process will delete all backups except the most recent one.
# This doesn't affect backups younger than this threshold
young_old_generation_threshold_in_months: 7

# grace_period_after_source_table_deletion_in_months - number of months since deletion of source table after retention will remove last backup for given table
grace_period_after_source_table_deletion_in_months: 7
12 changes: 12 additions & 0 deletions src/commons/config/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ def default_restoration_project_id(self):
def projects_to_skip(self):
return self.__project_config['backup_settings'].get('projects_to_skip')

@property
def grace_period_after_source_table_deletion_in_months(self):
property = self.__project_config['retention_settings'].get(
'grace_period_after_source_table_deletion_in_months')
assert property >= 1
return property

@property
def young_old_generation_threshold_in_months(self):
property = self.__project_config['retention_settings'].get('young_old_generation_threshold_in_months')
assert property >= 1
return property

config_file_yaml = "config/config.yaml"
logging.info("Loading configuration from file: '%s'", config_file_yaml)
Expand Down
57 changes: 57 additions & 0 deletions src/retention/organization_retention.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import datetime

from dateutil.relativedelta import relativedelta
from google.appengine.api.taskqueue import Task

from src.backup.datastore.Table import Table
from src.commons.config.configuration import configuration
from src.commons.tasks import Tasks


class OrganizationRetention(object):
QUERY_PAGE_SIZE = 2500

@classmethod
def schedule_retention_tasks_starting_from_cursor(cls, table_cursor):
results, next_cursor, more = Table.query() \
.filter(OrganizationRetention.__table_with_backup_predicate()) \
.order(Table.last_checked, Table.key) \
.fetch_page(
page_size=cls.QUERY_PAGE_SIZE,
start_cursor=table_cursor
)
tasks = [cls.__create_table_retention_task(result)
for result in results]
Tasks.schedule(queue_name='table-retention', tasks=tasks)
if more and next_cursor:
task = Task(
method='GET',
url='/cron/retention',
params={
'cursor': next_cursor.urlsafe(),
})

Tasks.schedule(queue_name='table-retention-scheduler', tasks=[task])

@staticmethod
def __table_with_backup_predicate():
months_after_table_should_not_have_any_backups = \
configuration.grace_period_after_source_table_deletion_in_months + 1

age_threshold_datetime = datetime.datetime.today() - relativedelta(
months=months_after_table_should_not_have_any_backups)

return Table.last_checked >= age_threshold_datetime

@staticmethod
def __create_table_retention_task(table):
params = {'projectId': table.project_id,
'datasetId': table.dataset_id,
'tableId': table.table_id,
'tableKey': table.key.urlsafe()}
if table.partition_id:
params['partitionId'] = table.partition_id
return Task(
method='GET',
url='/tasks/retention/table',
params=params)
40 changes: 2 additions & 38 deletions src/retention/organization_retention_handler.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,11 @@
import webapp2
from google.appengine.api.taskqueue import Task
from google.appengine.datastore.datastore_query import Cursor

from src.commons.tasks import Tasks
from src.backup.datastore.Table import Table
from src.retention.organization_retention import OrganizationRetention


class OrganizationRetentionHandler(webapp2.RequestHandler):

QUERY_PAGE_SIZE = 2500

def get(self):
cursor = Cursor(urlsafe=self.request.get('cursor'))
self.__schedule_retention_starting_from_cursor(cursor)

@classmethod
def __schedule_retention_starting_from_cursor(cls, table_cursor):
results, next_cursor, more = Table.query().fetch_page(
page_size=cls.QUERY_PAGE_SIZE,
start_cursor=table_cursor
)
tasks = [cls.__create_table_retention_task(result)
for result in results]
Tasks.schedule(queue_name='table-retention', tasks=tasks)
if more and next_cursor:
task = Task(
method='GET',
url='/cron/retention',
params={
'cursor': next_cursor.urlsafe(),
})

Tasks.schedule(queue_name='table-retention-scheduler', tasks=[task])

@staticmethod
def __create_table_retention_task(table):
params = {'projectId': table.project_id,
'datasetId': table.dataset_id,
'tableId': table.table_id,
'tableKey': table.key.urlsafe()}
if table.partition_id:
params['partitionId'] = table.partition_id
return Task(
method='GET',
url='/tasks/retention/table',
params=params)
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(cursor)
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from apiclient.errors import HttpError

from src.commons.big_query.big_query_table_metadata import BigQueryTableMetadata
from src.commons.config.configuration import configuration


class GracePeriodAfterDeletionFilter(object):
GRACE_PERIOD_AFTER_DELETION_IN_MONTHS = 7

def filter(self, backups, table_reference):
if self.__should_keep_backups(backups, table_reference):
Expand All @@ -18,7 +18,7 @@ def filter(self, backups, table_reference):

def __should_keep_backups(self, backups, table_reference):
age_threshold_date = datetime.date.today() - relativedelta(
months=self.GRACE_PERIOD_AFTER_DELETION_IN_MONTHS)
months=configuration.grace_period_after_source_table_deletion_in_months)
old_backups = [b for b in backups
if b.created.date() < age_threshold_date]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
BackupAgeDivider


class OnlyOneVersionAbove7MonthsFilter(object):
class OnlyOneVersionForOldBackupFilter(object):

def filter(self, backups, table_reference):
sorted_backups = Backup.sort_backups_by_create_time_desc(backups)
Expand Down
4 changes: 2 additions & 2 deletions src/retention/policy/filter/utils/backup_age_divider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from dateutil.relativedelta import relativedelta

NUMBER_OF_MONTHS_TO_KEEP = 7
from src.commons.config.configuration import configuration


class BackupAgeDivider(object):

@staticmethod
def divide_backups_by_age(backups):
age_threshold_date = datetime.date.today() - relativedelta(
months=NUMBER_OF_MONTHS_TO_KEEP)
months=configuration.young_old_generation_threshold_in_months)

young_backups = [b for b in backups
if b.created.date() >= age_threshold_date]
Expand Down
6 changes: 3 additions & 3 deletions src/retention/policy/retention_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
GracePeriodAfterDeletionFilter
from src.retention.policy.filter.most_recent_daily_backup_filter import \
MostRecentDailyBackupFilter
from src.retention.policy.filter.only_one_version_old_backup_filter import \
OnlyOneVersionForOldBackupFilter
from src.retention.policy.filter.ten_young_backup_versions_filter import \
TenYoungBackupVersionsFilter
from src.retention.policy.filter.only_one_version_above_7_months_filter import \
OnlyOneVersionAbove7MonthsFilter


class RetentionPolicy(object):
def __init__(self):
self.filters = [MostRecentDailyBackupFilter(),
TenYoungBackupVersionsFilter(),
OnlyOneVersionAbove7MonthsFilter(),
OnlyOneVersionForOldBackupFilter(),
GracePeriodAfterDeletionFilter()]

def get_backups_eligible_for_deletion(self, backups, table_reference):
Expand Down
6 changes: 6 additions & 0 deletions tests/commons/config/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def test_should_be_able_to_read__projects_to_skip(self):
def test_should_be_able_to_read__custom_project_list(self):
self.__is_list_and_each_item_instance_of(self.configuration.projects_to_skip, str)

def test_should_be_able_to_read_grace_period_after_source_table_deletion_in_months(self):
self.__instance_of(self.configuration.grace_period_after_source_table_deletion_in_months, int)

def test_should_be_able_to_read_young_old_generation_threshold_in_months(self):
self.__instance_of(self.configuration.young_old_generation_threshold_in_months, int)

def __instance_of(self, obj, expected_type):
self.assertTrue(isinstance(obj, expected_type))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@

from mock import patch
from src.backup.datastore.Backup import Backup
from src.retention.policy.filter.only_one_version_above_7_months_filter import \
OnlyOneVersionAbove7MonthsFilter

from src.commons.table_reference import TableReference
from src.retention.policy.filter.only_one_version_old_backup_filter import \
OnlyOneVersionForOldBackupFilter
from tests.utils.backup_utils import create_backup


class TestOnlyOneVersionAbove7MonthsFilter(unittest.TestCase):
class TestOnlyOneVersionForOldBackupFilter(unittest.TestCase):
def setUp(self):
patch('googleapiclient.discovery.build').start()
patch('oauth2client.client.GoogleCredentials.get_application_default')\
.start()
self.under_test = OnlyOneVersionAbove7MonthsFilter()
self.under_test = OnlyOneVersionForOldBackupFilter()

def tearDown(self):
patch.stopall()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import datetime
import unittest

import webapp2
from dateutil.relativedelta import relativedelta
from google.appengine.ext import testbed

import webtest
from src.commons.test_utils import utils
from mock import patch

from src.backup.datastore.Table import Table
from src.retention.organization_retention_handler import \
OrganizationRetentionHandler
from src.commons.config.configuration import configuration
from src.commons.test_utils import utils
from src.retention.organization_retention import OrganizationRetention


class TestOrganizationRetentionHandler(unittest.TestCase):
class TestOrganizationRetention(unittest.TestCase):

def setUp(self):
app = webapp2.WSGIApplication(
[('/cron/retention', OrganizationRetentionHandler)])
self.under_test = webtest.TestApp(app)

self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_memcache_stub()
Expand All @@ -27,10 +24,10 @@ def setUp(self):
def tearDown(self):
self.testbed.deactivate()

def test_should_schedul_retention_with_empty_datastore(self):
def test_should_schedule_retention_with_empty_datastore(self):
# given
# when
self.under_test.get('/cron/retention')
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(None)

# then
tasks = self.taskqueue_stub.get_filtered_tasks()
Expand All @@ -41,7 +38,7 @@ def test_should_schedule_single_non_partitioned_table(self):
self._create_table_entity('non_partitioned_table')

# when
self.under_test.get('/cron/retention')
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(None)

# then
tasks = self.taskqueue_stub.get_filtered_tasks()
Expand All @@ -59,7 +56,7 @@ def test_should_schedule_single_partitioned_table(self):
self._create_table_entity('partitioned_table', '20170605')

# when
self.under_test.get('/cron/retention')
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(None)

# then
tasks = self.taskqueue_stub.get_filtered_tasks()
Expand All @@ -73,16 +70,52 @@ def test_should_schedule_single_partitioned_table(self):
'&tableKey='),
msg='Actual url: {}'.format(tasks[0].url))

def test_should_schedule_only_recently_seen_tables(self):
# given
self._create_table_entity('recently_seen_partitioned_table', '20170605')
self._create_table_entity('recently_seen_not_partitioned_table')
self._create_table_entity('not_seen_since_threshold_date_table', last_checked= datetime.datetime.now() - relativedelta(
months=(configuration.grace_period_after_source_table_deletion_in_months + 2)))

# when
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(None)

# then
tasks = self.taskqueue_stub.get_filtered_tasks()
self.assertEqual(len(tasks), 2)
self.assertTrue(tasks[0].url.startswith
('/tasks/retention/table'
'?projectId=example-proj-name'
'&partitionId=20170605'
'&tableId=recently_seen_partitioned_table'
'&datasetId=example-dataset-name'
'&tableKey='),
msg='Actual url: {}'.format(tasks[0].url))
self.assertTrue(tasks[1].url.startswith
('/tasks/retention/table'
'?projectId=example-proj-name'
'&tableId=recently_seen_not_partitioned_table'
'&datasetId=example-dataset-name'
'&tableKey='),
msg='Actual url: {}'.format(tasks[1].url))

def test_should_schedule_using_cursor(self):
# given
self._create_table_entity('non_partitioned_table1')
self._create_table_entity('non_partitioned_table2')

_, cursor, _1 = Table.query().fetch_page(page_size=1)
age_threshold_datetime = datetime.datetime.today() - relativedelta(
months=(configuration.grace_period_after_source_table_deletion_in_months + 1))

_, cursor, _1 = Table.query() \
.filter(Table.last_checked >= age_threshold_datetime) \
.order(Table.last_checked, Table.key) \
.fetch_page(
page_size=1,
)

# when
self.under_test.get(
'/cron/retention?cursor={}'.format(cursor.urlsafe()))
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(cursor)
# then
tasks = self.taskqueue_stub.get_filtered_tasks()
self.assertEqual(len(tasks), 1)
Expand All @@ -92,14 +125,14 @@ def test_should_schedule_using_cursor(self):
msg='Actual url: {}'.format(tasks[0].url))

@patch('src.retention.organization_retention_handler.'
'OrganizationRetentionHandler.QUERY_PAGE_SIZE', 3)
'OrganizationRetention.QUERY_PAGE_SIZE', 3)
def test_should_schedule_retention_task_at_the_end(self):
# given
for i in range(0, 6):
self._create_table_entity('non_partitioned_table_{}'.format(i))

# when
self.under_test.get('/cron/retention')
OrganizationRetention.schedule_retention_tasks_starting_from_cursor(None)

# then
tasks = self.taskqueue_stub.get_filtered_tasks(
Expand All @@ -110,12 +143,12 @@ def test_should_schedule_retention_task_at_the_end(self):
msg='Actual url: {}'.format(tasks[0].url))

@staticmethod
def _create_table_entity(table_id, partition_id=None):
def _create_table_entity(table_id, partition_id=None, last_checked=datetime.datetime.now()):
non_partitioned_table = Table(
project_id='example-proj-name',
dataset_id='example-dataset-name',
table_id=table_id,
partition_id=partition_id,
last_checked=datetime.datetime(2017, 02, 1, 16, 30)
last_checked=last_checked
)
non_partitioned_table.put()
Loading

0 comments on commit f691ddd

Please sign in to comment.