From 6f8037fa822d7df7922fb312dd8f9938340d8cd6 Mon Sep 17 00:00:00 2001 From: rick-slin Date: Thu, 22 Jun 2023 11:09:21 -0400 Subject: [PATCH 1/6] implemented DateLessLogFormatHelper --- plaso/containers/events.py | 54 ++++++++++++++++- plaso/lib/dateless_helper.py | 109 +++++++++++++++++++++++++++++++++++ tests/containers/events.py | 33 +++++++++++ tests/lib/dateless_helper.py | 93 ++++++++++++++++++++++++++++++ 4 files changed, 288 insertions(+), 1 deletion(-) create mode 100644 plaso/lib/dateless_helper.py create mode 100644 tests/lib/dateless_helper.py diff --git a/plaso/containers/events.py b/plaso/containers/events.py index f1e1ee845a..300d6ed3c6 100644 --- a/plaso/containers/events.py +++ b/plaso/containers/events.py @@ -82,6 +82,57 @@ def CalculateEventValuesHash(event_data, event_data_stream): return md5_context.hexdigest() +class DateLessLogHelper(interface.AttributeContainer): + """Date-less log helper attribute container. + + Attributes: + earliest_date (datetime.datetime): earliest possible date the event data + stream was created. + latest_date (datetime.datetime): last relative date determined by the + date-less log helper. + """ + + CONTAINER_TYPE = 'date_less_log_helper' + + SCHEMA = { + '_event_data_stream_identifier': 'AttributeContainerIdentifier', + 'earliest_date': 'datetime.datetime', + 'latest_date': 'datetime.datetime'} + + _SERIALIZABLE_PROTECTED_ATTRIBUTES = [ + '_event_data_stream_identifier'] + + def __init__(self): + """Initializes a date-less log helper attribute container.""" + super(DateLessLogHelper, self).__init__() + self._event_data_stream_identifier = None + self.earliest_date = None + self.latest_date = None + + def GetEventDataStreamIdentifier(self): + """Retrieves the identifier of the associated event data stream. + + The event data stream identifier is a storage specific value that requires + special handling during serialization. + + Returns: + AttributeContainerIdentifier: event data stream or None when not set. + """ + return self._event_data_stream_identifier + + def SetEventDataStreamIdentifier(self, event_data_stream_identifier): + """Sets the identifier of the associated event data stream. + + The event data stream identifier is a storage specific value that requires + special handling during serialization. + + Args: + event_data_stream_identifier (AttributeContainerIdentifier): event data + stream identifier. + """ + self._event_data_stream_identifier = event_data_stream_identifier + + class EventData(interface.AttributeContainer): """Event data attribute container. @@ -439,4 +490,5 @@ def SetEventDataStreamIdentifier(self, event_data_stream_identifier): manager.AttributeContainersManager.RegisterAttributeContainers([ - EventData, EventDataStream, EventObject, EventTag, YearLessLogHelper]) + DateLessLogHelper, EventData, EventDataStream, EventObject, EventTag, + YearLessLogHelper]) diff --git a/plaso/lib/dateless_helper.py b/plaso/lib/dateless_helper.py new file mode 100644 index 0000000000..862940c6cc --- /dev/null +++ b/plaso/lib/dateless_helper.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +"""The date-less log format helper mix-in.""" + +from datetime import datetime +from dfvfs.lib import definitions as dfvfs_definitions +from dfvfs.resolver import resolver as path_spec_resolver + +from plaso.containers import events + + +class DateLessLogFormatHelper(object): + """Date-less log format helper mix-in.""" + + _VALID_DAYS = frozenset(range(1, 32)) + + _VALID_MONTHS = frozenset(range(1, 13)) + + def __init__(self): + """Initializes the date-less log format helper mix-in.""" + super(DateLessLogFormatHelper, self).__init__() + self._maximum_date = None + self._minimum_date = None + self._date = None + + def _GetDatesFromFileEntry(self, file_entry): + """Retrieves the dates from the file entry date and time values. + + Args: + file_entry (dfvfs.FileEntry): file entry. + + Returns: + set[datetime.datetime]: dates of the file entry. + """ + dates = set() + + for attribute_name in ('change_time', 'creation_time', 'modification_time'): + date_time = getattr(file_entry, attribute_name, None) + if date_time: + year, month, day = date_time.GetDate() + new_date = datetime(year=year, month=month, day=day) + dates.add(new_date) + + return dates + + def _GetDate(self): + """Retrieves the date. + + Returns: + datetime.datetime: date. + """ + return self._date + + def _SetDate(self, year, month, day): + """Sets the date. + + Args: + year (int): year. + month (int): month. + day (int): day. + + Raise: + ValueError: if month or day contains an unsupported value. + """ + if day not in self._VALID_DAYS: + raise ValueError('Invalid day: {0!s}'.format(day)) + + if month not in self._VALID_MONTHS: + raise ValueError('Invalid month: {0!s}'.format(month)) + + date = datetime(year=year, month=month, day=day) + self._date = date + + def _SetEstimatedDate(self, parser_mediator): + self._maximum_date = None + self._minimum_date = None + self._date = None + + dates = set() + + file_entry = parser_mediator.GetFileEntry() + if file_entry: + dates = self._GetDatesFromFileEntry(file_entry) + + if not dates and file_entry.type_indicator in ( + dfvfs_definitions.TYPE_INDICATOR_COMPRESSED_STREAM, + dfvfs_definitions.TYPE_INDICATOR_GZIP): + + parent_file_entry = path_spec_resolver.Resolver.OpenFileEntry( + file_entry.path_spec.parent, + resolver_context=parser_mediator.resolver_context) + if parent_file_entry: + dates = self._GetDatesFromFileEntry(parent_file_entry) + + if dates: + self._maximum_date = max(dates) + self._minimum_date = min(dates) + self._date = self._minimum_date + + def GetDateLessLogHelper(self): + """Retrieves a date-less log helper attribute container. + + Returns: + DateLessLogHelper: date-less log helper. + """ + date_less_log_helper = events.DateLessLogHelper() + date_less_log_helper.latest_date = self._maximum_date + date_less_log_helper.earliest_date = self._minimum_date + + return date_less_log_helper diff --git a/tests/containers/events.py b/tests/containers/events.py index 0d896c9534..0f02f726c0 100644 --- a/tests/containers/events.py +++ b/tests/containers/events.py @@ -9,6 +9,39 @@ from tests import test_lib as shared_test_lib +class DateLessLogHelperTest(shared_test_lib.BaseTestCase): + """Tests for the date-less log helper attribute container.""" + + # pylint: disable=protected-access + + def testGetAttributeNames(self): + """Tests the GetAttributeNames function.""" + attribute_container = events.DateLessLogHelper() + + expected_attribute_names = [ + '_event_data_stream_identifier', 'earliest_date', 'latest_date'] + + attribute_names = sorted(attribute_container.GetAttributeNames()) + + self.assertEqual(attribute_names, expected_attribute_names) + + def testGetEventDataStreamIdentifier(self): + """Tests the GetEventDataStreamIdentifier function.""" + attribute_container = events.DateLessLogHelper() + + identifier = attribute_container.GetEventDataStreamIdentifier() + self.assertIsNone(identifier) + + def testSetEventDataStreamIdentifier(self): + """Tests the SetEventDataStreamIdentifier function.""" + attribute_container = events.DateLessLogHelper() + + attribute_container.SetEventDataStreamIdentifier('Foo') + + self.assertEqual( + attribute_container._event_data_stream_identifier, 'Foo') + + class EventValuesHelperTest(shared_test_lib.BaseTestCase): """Tests for the event values helper functions.""" diff --git a/tests/lib/dateless_helper.py b/tests/lib/dateless_helper.py new file mode 100644 index 0000000000..261f6bbfa7 --- /dev/null +++ b/tests/lib/dateless_helper.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +"""Tests for the date-less log format helper mix-in.""" + +from datetime import datetime +import unittest + +from dfvfs.lib import definitions as dfvfs_definitions +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import resolver as path_spec_resolver + +from plaso.lib import dateless_helper +from plaso.parsers import mediator as parsers_mediator + +from tests import test_lib as shared_test_lib + + +class DateLessLogFormatHelperTest(shared_test_lib.BaseTestCase): + """Date-less log format definition helper mix-in tests.""" + + # pylint: disable=protected-access + + def test_GetDateFromFileEntry(self): + """Tests the _GetDateFromFileEntry function.""" + test_path = self._GetTestFilePath(['syslog.gz']) + os_path_spec = path_spec_factory.Factory.NewPathSpec( + dfvfs_definitions.TYPE_INDICATOR_OS, location=test_path) + gzip_path_spec = path_spec_factory.Factory.NewPathSpec( + dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec) + file_entry = path_spec_resolver.Resolver.OpenFileEntry(gzip_path_spec) + + test_helper = dateless_helper.DateLessLogFormatHelper() + + dates = test_helper._GetDatesFromFileEntry(file_entry) + + self.assertEqual(dates, {datetime(year=2012, month=7, day=28)}) + + def test_GetDate(self): + """Tests the _GetDate function.""" + test_helper = dateless_helper.DateLessLogFormatHelper() + test_helper._SetDate(2013, 7, 22) + + date = test_helper._GetDate() + + self.assertEqual(date, datetime(year=2013, month=7, day=22)) + + def test_SetDate(self): + """Tests the _SetDate function.""" + test_helper = dateless_helper.DateLessLogFormatHelper() + + self.assertEqual(test_helper._date, None) + + test_helper._SetDate(2013, 7, 22) + + self.assertEqual(test_helper._date, datetime(year=2013, month=7, day=22)) + + def test_SetEstimatedDate(self): + """Tests the _SetEstimatedDate function.""" + parser_mediator = parsers_mediator.ParserMediator() + + test_path = self._GetTestFilePath(['syslog.gz']) + os_path_spec = path_spec_factory.Factory.NewPathSpec( + dfvfs_definitions.TYPE_INDICATOR_OS, location=test_path) + gzip_path_spec = path_spec_factory.Factory.NewPathSpec( + dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec) + file_entry = path_spec_resolver.Resolver.OpenFileEntry(gzip_path_spec) + + parser_mediator.SetFileEntry(file_entry) + + test_helper = dateless_helper.DateLessLogFormatHelper() + + self.assertIsNone(test_helper._maximum_date) + self.assertIsNone(test_helper._minimum_date) + self.assertIsNone(test_helper._date) + + test_helper._SetEstimatedDate(parser_mediator) + + self.assertEqual( + test_helper._maximum_date, datetime(year=2012, month=7, day=28)) + self.assertEqual( + test_helper._minimum_date, datetime(year=2012, month=7, day=28)) + self.assertEqual( + test_helper._date, datetime(year=2012, month=7, day=28)) + + def test_GetDateLessLogHelper(self): + """Tests the GetDateLessLogHelper function.""" + test_helper = dateless_helper.DateLessLogFormatHelper() + + date_less_log_helper = test_helper.GetDateLessLogHelper() + self.assertIsNotNone(date_less_log_helper) + + +if __name__ == '__main__': + unittest.main() From 37728be2a502c34bed136e5463f7fda672a45522 Mon Sep 17 00:00:00 2001 From: rick-slin Date: Thu, 22 Jun 2023 14:21:42 -0400 Subject: [PATCH 2/6] implemented DateLessLogHelper and integrated into EventDataTimeliner --- plaso/engine/timeliner.py | 114 +++++++++++++++++++++-- plaso/lib/dateless_helper.py | 6 +- plaso/multi_process/extraction_engine.py | 4 +- plaso/multi_process/merge_helpers.py | 1 + tests/engine/timeliner.py | 73 ++++++++++++++- tests/lib/dateless_helper.py | 17 ++-- 6 files changed, 196 insertions(+), 19 deletions(-) diff --git a/plaso/engine/timeliner.py b/plaso/engine/timeliner.py index ea2d7674bc..80cb0a59b7 100644 --- a/plaso/engine/timeliner.py +++ b/plaso/engine/timeliner.py @@ -9,6 +9,7 @@ from dfdatetime import interface as dfdatetime_interface from dfdatetime import semantic_time as dfdatetime_semantic_time +from dfdatetime import time_elements as dfdatetime_time_elements from plaso.containers import events from plaso.containers import warnings @@ -46,7 +47,9 @@ def __init__( """ super(EventDataTimeliner, self).__init__() self._attribute_mappings = {} + self._base_dates = {} self._base_years = {} + self._current_date = self._GetCurrentDate() self._current_year = self._GetCurrentYear() self._data_location = data_location self._place_holder_event = set() @@ -75,6 +78,74 @@ def _CreateTimeZonePerPathSpec(self, system_configurations): self._time_zone_per_path_spec[path_spec.parent] = ( system_configuration.time_zone) + def _GetBaseDate(self, storage_writer, event_data): + """Retrieves the base date. + + Args: + storage_writer (StorageWriter): storage writer. + event_data (EventData): event data. + + Returns: + datetime.datetime: base date. + """ + event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() + lookup_key = event_data_stream_identifier.CopyToString() + + base_date = self._base_dates.get(lookup_key, None) + if base_date: + return base_date + + filter_expression = '_event_data_stream_identifier == "{0:s}"'.format( + lookup_key) + date_less_log_helpers = list(storage_writer.GetAttributeContainers( + events.DateLessLogHelper.CONTAINER_TYPE, + filter_expression=filter_expression)) + if not date_less_log_helpers: + message = ( + 'missing date-less log helper, defaulting to current date: ' + '{0:d}').format(self._current_date) + self._ProduceTimeliningWarning(storage_writer, event_data, message) + + base_date = self._current_date + + else: + earliest_date = date_less_log_helpers[0].earliest_date + latest_date = date_less_log_helpers[0].latest_date + + if earliest_date is None and latest_date is None: + message = ( + 'missing earliest and latest date in date-less log helper, ' + 'defaulting to current date: {0:d}').format(self._current_date) + self._ProduceTimeliningWarning(storage_writer, event_data, message) + + base_date = self._current_year + + elif earliest_date < self._current_date: + base_date = earliest_date + + elif latest_date < self._current_year: + message = ( + 'earliest date: {0:d} as base date would exceed current date: ' + '{1:d}, using latest date: {2:d}').format( + earliest_date, self._current_year, latest_date) + self._ProduceTimeliningWarning(storage_writer, event_data, message) + + base_date = latest_date + + else: + message = ( + 'earliest date: {0:d} and latest: date: {1:d} as base date ' + 'would exceed current date: {2:d}, using current ' + 'date').format( + earliest_date, latest_date, self._current_year) + self._ProduceTimeliningWarning(storage_writer, event_data, message) + + base_date = self._current_year + + self._base_dates[lookup_key] = base_date + + return base_date + def _GetBaseYear(self, storage_writer, event_data): """Retrieves the base year. @@ -156,6 +227,17 @@ def _GetBaseYear(self, storage_writer, event_data): return base_year + def _GetCurrentDate(self): + """Retrieves the current date. + + Returns: + datetime.datetime: the current date. + """ + current_time = datetime.datetime.now() + current_date = datetime.datetime(*current_time.timetuple()[:3]) + return current_date + + def _GetCurrentYear(self): """Retrieves current year. @@ -183,16 +265,34 @@ def _GetEvent( """ timestamp = None if date_time.is_delta: - base_year = self._GetBaseYear(storage_writer, event_data) + if ( + date_time.year == 0 and + date_time.month == 0 and + date_time.day_of_month == 0): + base_date = self._GetBaseDate(storage_writer, event_data) + base_year, base_month, base_day = base_date.timetuple()[:3] + hours = date_time.hours + minutes = date_time.minutes + seconds = date_time.seconds + fractions = date_time.fraction_of_second + + date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( + time_elements_tuple=( + base_year, base_month, base_day, hours, minutes, seconds, 0)) + date_time.fraction_of_second = fractions - try: + else: + base_year = self._GetBaseYear(storage_writer, event_data) date_time = date_time.NewFromDeltaAndYear(base_year) - except ValueError as exception: - self._ProduceTimeliningWarning( - storage_writer, event_data, str(exception)) - date_time = dfdatetime_semantic_time.InvalidTime() - timestamp = 0 + try: + date_time = date_time.NewFromDeltaAndYear(base_year) + except ValueError as exception: + self._ProduceTimeliningWarning( + storage_writer, event_data, str(exception)) + + date_time = dfdatetime_semantic_time.InvalidTime() + timestamp = 0 if timestamp is None: timestamp = date_time.GetPlasoTimestamp() diff --git a/plaso/lib/dateless_helper.py b/plaso/lib/dateless_helper.py index 862940c6cc..6491f9ce81 100644 --- a/plaso/lib/dateless_helper.py +++ b/plaso/lib/dateless_helper.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """The date-less log format helper mix-in.""" -from datetime import datetime +import datetime from dfvfs.lib import definitions as dfvfs_definitions from dfvfs.resolver import resolver as path_spec_resolver @@ -37,7 +37,7 @@ def _GetDatesFromFileEntry(self, file_entry): date_time = getattr(file_entry, attribute_name, None) if date_time: year, month, day = date_time.GetDate() - new_date = datetime(year=year, month=month, day=day) + new_date = datetime.datetime(year, month, day) dates.add(new_date) return dates @@ -67,7 +67,7 @@ def _SetDate(self, year, month, day): if month not in self._VALID_MONTHS: raise ValueError('Invalid month: {0!s}'.format(month)) - date = datetime(year=year, month=month, day=day) + date = datetime.datetime(year, month, day) self._date = date def _SetEstimatedDate(self, parser_mediator): diff --git a/plaso/multi_process/extraction_engine.py b/plaso/multi_process/extraction_engine.py index c53a231681..dfde1ce254 100644 --- a/plaso/multi_process/extraction_engine.py +++ b/plaso/multi_process/extraction_engine.py @@ -96,6 +96,7 @@ class ExtractionMultiProcessEngine(task_engine.TaskMultiProcessEngine): _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE _CONTAINER_TYPE_YEAR_LESS_LOG_HELPER = events.YearLessLogHelper.CONTAINER_TYPE + _CONTAINER_TYPE_DATE_LESS_LOG_HELPER = events.DateLessLogHelper.CONTAINER_TYPE # Maximum number of dfVFS file system objects to cache in the foreman process. _FILE_SYSTEM_CACHE_SIZE = 3 @@ -396,7 +397,8 @@ def _MergeAttributeContainer(self, storage_writer, merge_helper, container): if container.CONTAINER_TYPE in ( self._CONTAINER_TYPE_EVENT_DATA, - self._CONTAINER_TYPE_YEAR_LESS_LOG_HELPER): + self._CONTAINER_TYPE_YEAR_LESS_LOG_HELPER, + self._CONTAINER_TYPE_DATE_LESS_LOG_HELPER): event_data_stream_identifier = container.GetEventDataStreamIdentifier() event_data_stream_lookup_key = None if event_data_stream_identifier: diff --git a/plaso/multi_process/merge_helpers.py b/plaso/multi_process/merge_helpers.py index d5b51a25a7..b3219d2aff 100644 --- a/plaso/multi_process/merge_helpers.py +++ b/plaso/multi_process/merge_helpers.py @@ -120,6 +120,7 @@ class ExtractionTaskMergeHelper(BaseTaskMergeHelper): # data by the timeliner and therefore needs to be merged before event # data containers. events.YearLessLogHelper.CONTAINER_TYPE, + events.DateLessLogHelper.CONTAINER_TYPE, events.EventData.CONTAINER_TYPE, warnings.ExtractionWarning.CONTAINER_TYPE, warnings.RecoveryWarning.CONTAINER_TYPE, diff --git a/tests/engine/timeliner.py b/tests/engine/timeliner.py index f7b7ecb3c9..9521fc03af 100644 --- a/tests/engine/timeliner.py +++ b/tests/engine/timeliner.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- """Tests for the event data timeliner.""" +import datetime import unittest from dfdatetime import semantic_time as dfdatetime_semantic_time @@ -55,12 +56,13 @@ class EventDataTimelinerTest(test_lib.EngineTestCase): # pylint: disable=protected-access # pylint: disable=arguments-differ - def _CreateStorageWriter(self, event_data, base_year=None): + def _CreateStorageWriter(self, event_data, base_year=None, base_date=None): """Creates a storage writer object. Args: event_data (EventData): event data. base_year (Optional[int]): base year. + base_date (Optional[datetime.datetime]): base date. Returns: FakeStorageWriter: storage writer. @@ -82,11 +84,52 @@ def _CreateStorageWriter(self, event_data, base_year=None): event_data_stream_identifier) storage_writer.AddAttributeContainer(year_less_log_helper) + elif base_date: + date_less_log_helper = events.DateLessLogHelper() + date_less_log_helper.earliest_date = base_date + + date_less_log_helper.SetEventDataStreamIdentifier( + event_data_stream_identifier) + storage_writer.AddAttributeContainer(date_less_log_helper) + event_data.SetEventDataStreamIdentifier(event_data_stream_identifier) storage_writer.AddAttributeContainer(event_data) return storage_writer + def testGetBaseDate(self): + """Tests the _GetBaseDate function.""" + event_data_timeliner = timeliner.EventDataTimeliner( + data_location=shared_test_lib.TEST_DATA_PATH) + + current_date = event_data_timeliner._GetCurrentDate() + + event_data = TestEventData1() + event_data.value = 'MyValue' + + # Test with year-less log helper. + storage_writer = self._CreateStorageWriter( + event_data, base_date=datetime.datetime(2013, 3, 22)) + + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} + + base_date = event_data_timeliner._GetBaseDate(storage_writer, event_data) + self.assertEqual(base_date, datetime.datetime(2013, 3, 22)) + + # Test missing date-less log helper. + storage_writer = self._CreateStorageWriter(event_data) + + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} + + base_date = event_data_timeliner._GetBaseDate(storage_writer, event_data) + self.assertEqual(base_date, current_date) + + number_of_warnings = storage_writer.GetNumberOfAttributeContainers( + 'timelining_warning') + self.assertEqual(number_of_warnings, 1) + def testGetBaseYear(self): """Tests the _GetBaseYear function.""" event_data_timeliner = timeliner.EventDataTimeliner( @@ -125,6 +168,15 @@ def testGetBaseYear(self): # TODO: improve test coverage. + def testGetCurrentDate(self): + """Tests the _GetCurrentDate function.""" + event_data_timeliner = timeliner.EventDataTimeliner( + data_location=shared_test_lib.TEST_DATA_PATH) + + current_date = event_data_timeliner._GetCurrentDate() + print(current_date) + self.assertIsNotNone(current_date) + def testGetCurrentYear(self): """Tests the _GetCurrentYear function.""" event_data_timeliner = timeliner.EventDataTimeliner( @@ -210,6 +262,25 @@ def testGetEvent(self): self.assertEqual(event.date_time.year, 4) self.assertEqual(event.timestamp, -62035818808570124) + # Test with the dateless helper + storage_writer = self._CreateStorageWriter( + event_data, base_date=datetime.datetime(2013, 7, 22)) + + date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( + is_delta=True, + time_elements_tuple=(0, 0, 0, 20, 6, 31, 429876)) + + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} + + event = event_data_timeliner._GetEvent( + storage_writer, event_data, None, date_time, 'Test Time') + self.assertIsNotNone(event) + self.assertIsNotNone(event.date_time) + self.assertFalse(event.date_time.is_delta) + self.assertEqual(event.timestamp, 1374523591429876) + + # TODO: add tests for _ProduceTimeliningWarning # TODO: add tests for _ReadConfigurationFile diff --git a/tests/lib/dateless_helper.py b/tests/lib/dateless_helper.py index 261f6bbfa7..450b9968f7 100644 --- a/tests/lib/dateless_helper.py +++ b/tests/lib/dateless_helper.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Tests for the date-less log format helper mix-in.""" -from datetime import datetime +import datetime import unittest from dfvfs.lib import definitions as dfvfs_definitions @@ -32,7 +32,7 @@ def test_GetDateFromFileEntry(self): dates = test_helper._GetDatesFromFileEntry(file_entry) - self.assertEqual(dates, {datetime(year=2012, month=7, day=28)}) + self.assertEqual(dates, {datetime.datetime(2012, 7, 28)}) def test_GetDate(self): """Tests the _GetDate function.""" @@ -41,7 +41,7 @@ def test_GetDate(self): date = test_helper._GetDate() - self.assertEqual(date, datetime(year=2013, month=7, day=22)) + self.assertEqual(date, datetime.datetime(2013, 7, 22)) def test_SetDate(self): """Tests the _SetDate function.""" @@ -51,7 +51,8 @@ def test_SetDate(self): test_helper._SetDate(2013, 7, 22) - self.assertEqual(test_helper._date, datetime(year=2013, month=7, day=22)) + self.assertEqual( + test_helper._date, datetime.datetime(2013, 7, 22)) def test_SetEstimatedDate(self): """Tests the _SetEstimatedDate function.""" @@ -75,11 +76,13 @@ def test_SetEstimatedDate(self): test_helper._SetEstimatedDate(parser_mediator) self.assertEqual( - test_helper._maximum_date, datetime(year=2012, month=7, day=28)) + test_helper._maximum_date, + datetime.datetime(year=2012, month=7, day=28)) self.assertEqual( - test_helper._minimum_date, datetime(year=2012, month=7, day=28)) + test_helper._minimum_date, + datetime.datetime(year=2012, month=7, day=28)) self.assertEqual( - test_helper._date, datetime(year=2012, month=7, day=28)) + test_helper._date, datetime.datetime(year=2012, month=7, day=28)) def test_GetDateLessLogHelper(self): """Tests the GetDateLessLogHelper function.""" From c27ff69bb9b81d1a010b60383765c5c241d3f5ba Mon Sep 17 00:00:00 2001 From: rick Date: Fri, 23 Feb 2024 08:27:29 -0500 Subject: [PATCH 3/6] fixed style errors, fixed _GetBaseDate return value when missing earliest_date and latest_date, added tests for dateless_helper --- plaso/containers/events.py | 6 +++--- plaso/engine/timeliner.py | 3 +-- tests/lib/dateless_helper.py | 6 ++++++ 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/plaso/containers/events.py b/plaso/containers/events.py index 300d6ed3c6..af17a910e5 100644 --- a/plaso/containers/events.py +++ b/plaso/containers/events.py @@ -95,9 +95,9 @@ class DateLessLogHelper(interface.AttributeContainer): CONTAINER_TYPE = 'date_less_log_helper' SCHEMA = { - '_event_data_stream_identifier': 'AttributeContainerIdentifier', - 'earliest_date': 'datetime.datetime', - 'latest_date': 'datetime.datetime'} + '_event_data_stream_identifier': 'AttributeContainerIdentifier', + 'earliest_date': 'datetime.datetime', + 'latest_date': 'datetime.datetime'} _SERIALIZABLE_PROTECTED_ATTRIBUTES = [ '_event_data_stream_identifier'] diff --git a/plaso/engine/timeliner.py b/plaso/engine/timeliner.py index 80cb0a59b7..93c3c09611 100644 --- a/plaso/engine/timeliner.py +++ b/plaso/engine/timeliner.py @@ -118,7 +118,7 @@ def _GetBaseDate(self, storage_writer, event_data): 'defaulting to current date: {0:d}').format(self._current_date) self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_date = self._current_year + base_date = self._current_date elif earliest_date < self._current_date: base_date = earliest_date @@ -237,7 +237,6 @@ def _GetCurrentDate(self): current_date = datetime.datetime(*current_time.timetuple()[:3]) return current_date - def _GetCurrentYear(self): """Retrieves current year. diff --git a/tests/lib/dateless_helper.py b/tests/lib/dateless_helper.py index 450b9968f7..f2048f7631 100644 --- a/tests/lib/dateless_helper.py +++ b/tests/lib/dateless_helper.py @@ -43,6 +43,12 @@ def test_GetDate(self): self.assertEqual(date, datetime.datetime(2013, 7, 22)) + with self.assertRaises(ValueError): + test_helper._SetDate(2013, 7, 32) + + with self.assertRaises(ValueError): + test_helper._SetDate(2013, 13, 1) + def test_SetDate(self): """Tests the _SetDate function.""" test_helper = dateless_helper.DateLessLogFormatHelper() From 7258f0b1f6b579d3fc1120580cf34e040c2141f8 Mon Sep 17 00:00:00 2001 From: rick-slin Date: Thu, 22 Jun 2023 11:09:21 -0400 Subject: [PATCH 4/6] implemented DateLessLogFormatHelper --- plaso/lib/dateless_helper.py | 5 +++-- tests/lib/dateless_helper.py | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/plaso/lib/dateless_helper.py b/plaso/lib/dateless_helper.py index 6491f9ce81..2677ae178c 100644 --- a/plaso/lib/dateless_helper.py +++ b/plaso/lib/dateless_helper.py @@ -2,6 +2,7 @@ """The date-less log format helper mix-in.""" import datetime + from dfvfs.lib import definitions as dfvfs_definitions from dfvfs.resolver import resolver as path_spec_resolver @@ -37,7 +38,7 @@ def _GetDatesFromFileEntry(self, file_entry): date_time = getattr(file_entry, attribute_name, None) if date_time: year, month, day = date_time.GetDate() - new_date = datetime.datetime(year, month, day) + new_date = datetime.datetime(year=year, month=month, day=day) dates.add(new_date) return dates @@ -67,7 +68,7 @@ def _SetDate(self, year, month, day): if month not in self._VALID_MONTHS: raise ValueError('Invalid month: {0!s}'.format(month)) - date = datetime.datetime(year, month, day) + date = datetime.datetime(year=year, month=month, day=day) self._date = date def _SetEstimatedDate(self, parser_mediator): diff --git a/tests/lib/dateless_helper.py b/tests/lib/dateless_helper.py index f2048f7631..bfb50b35c5 100644 --- a/tests/lib/dateless_helper.py +++ b/tests/lib/dateless_helper.py @@ -2,6 +2,7 @@ """Tests for the date-less log format helper mix-in.""" import datetime + import unittest from dfvfs.lib import definitions as dfvfs_definitions @@ -32,7 +33,7 @@ def test_GetDateFromFileEntry(self): dates = test_helper._GetDatesFromFileEntry(file_entry) - self.assertEqual(dates, {datetime.datetime(2012, 7, 28)}) + self.assertEqual(dates, {datetime.datetime(year=2012, month=7, day=28)}) def test_GetDate(self): """Tests the _GetDate function.""" @@ -41,7 +42,7 @@ def test_GetDate(self): date = test_helper._GetDate() - self.assertEqual(date, datetime.datetime(2013, 7, 22)) + self.assertEqual(date, datetime.datetime(year=2013, month=7, day=22)) with self.assertRaises(ValueError): test_helper._SetDate(2013, 7, 32) @@ -58,7 +59,7 @@ def test_SetDate(self): test_helper._SetDate(2013, 7, 22) self.assertEqual( - test_helper._date, datetime.datetime(2013, 7, 22)) + test_helper._date, datetime.datetime(year=2013, month=7, day=22)) def test_SetEstimatedDate(self): """Tests the _SetEstimatedDate function.""" From 43d6710aa4b4f550c559d706de56c44cf2f13b38 Mon Sep 17 00:00:00 2001 From: rick-slin Date: Thu, 22 Jun 2023 14:21:42 -0400 Subject: [PATCH 5/6] implemented DateLessLogHelper and integrated into EventDataTimeliner --- tests/lib/dateless_helper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/lib/dateless_helper.py b/tests/lib/dateless_helper.py index bfb50b35c5..34d1fbb1da 100644 --- a/tests/lib/dateless_helper.py +++ b/tests/lib/dateless_helper.py @@ -2,7 +2,6 @@ """Tests for the date-less log format helper mix-in.""" import datetime - import unittest from dfvfs.lib import definitions as dfvfs_definitions From 928605ec65b4767aa61071c9f6575ba6ec048522 Mon Sep 17 00:00:00 2001 From: rick Date: Fri, 8 Mar 2024 20:41:03 -0500 Subject: [PATCH 6/6] fixed merge error --- plaso/engine/timeliner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/plaso/engine/timeliner.py b/plaso/engine/timeliner.py index 93c3c09611..40008419f5 100644 --- a/plaso/engine/timeliner.py +++ b/plaso/engine/timeliner.py @@ -282,7 +282,6 @@ def _GetEvent( else: base_year = self._GetBaseYear(storage_writer, event_data) - date_time = date_time.NewFromDeltaAndYear(base_year) try: date_time = date_time.NewFromDeltaAndYear(base_year)