diff --git a/influxdb/helper.py b/influxdb/helper.py index fa79c079..f49f40ad 100644 --- a/influxdb/helper.py +++ b/influxdb/helper.py @@ -41,6 +41,8 @@ class Meta: # Only applicable if autocommit is True. autocommit = True # If True and no bulk_size, then will set bulk_size to 1. + retention_policy = 'your_retention_policy' + # Specify the retention policy for the data points time_precision = "h"|"m"|s"|"ms"|"u"|"ns" # Default is ns (nanoseconds) # Setting time precision while writing point @@ -83,6 +85,8 @@ def __new__(cls, *args, **kwargs): 'In {0}, time_precision is set, but invalid use any of {}.' .format(cls.__name__, ','.join(allowed_time_precisions))) + cls._retention_policy = getattr(_meta, 'retention_policy', None) + cls._client = getattr(_meta, 'client', None) if cls._autocommit and not cls._client: raise AttributeError( @@ -154,9 +158,11 @@ def commit(cls, client=None): """ if not client: client = cls._client + rtn = client.write_points( cls._json_body_(), - time_precision=cls._time_precision) + time_precision=cls._time_precision, + retention_policy=cls._retention_policy) # will be None if not set and will default to ns cls._reset_() return rtn diff --git a/influxdb/tests/helper_test.py b/influxdb/tests/helper_test.py index 6aa8f15a..16924936 100644 --- a/influxdb/tests/helper_test.py +++ b/influxdb/tests/helper_test.py @@ -376,3 +376,54 @@ class Meta: .format(WarnBulkSizeNoEffect)) self.assertIn('has no affect', str(w[-1].message), 'Warning message did not contain "has not affect".') + + def testSeriesWithRetentionPolicy(self): + """Test that the data is saved with the specified retention policy.""" + my_policy = 'my_policy' + + class RetentionPolicySeriesHelper(SeriesHelper): + + class Meta: + client = InfluxDBClient() + series_name = 'events.stats.{server_name}' + fields = ['some_stat', 'time'] + tags = ['server_name', 'other_tag'] + bulk_size = 2 + autocommit = True + retention_policy = my_policy + + fake_write_points = mock.MagicMock() + RetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=159, other_tag='gg') + RetentionPolicySeriesHelper._client.write_points = fake_write_points + RetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=158, other_tag='aa') + + kall = fake_write_points.call_args + args, kwargs = kall + self.assertTrue('retention_policy' in kwargs) + self.assertEqual(kwargs['retention_policy'], my_policy) + + def testSeriesWithoutRetentionPolicy(self): + """Test that the data is saved without any retention policy.""" + class NoRetentionPolicySeriesHelper(SeriesHelper): + + class Meta: + client = InfluxDBClient() + series_name = 'events.stats.{server_name}' + fields = ['some_stat', 'time'] + tags = ['server_name', 'other_tag'] + bulk_size = 2 + autocommit = True + + fake_write_points = mock.MagicMock() + NoRetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=159, other_tag='gg') + NoRetentionPolicySeriesHelper._client.write_points = fake_write_points + NoRetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=158, other_tag='aa') + + kall = fake_write_points.call_args + args, kwargs = kall + self.assertTrue('retention_policy' in kwargs) + self.assertEqual(kwargs['retention_policy'], None)