Skip to content

Commit

Permalink
Worked on script to extract data streams #1 (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Feb 10, 2024
1 parent db8106a commit f08d3f6
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 53 deletions.
55 changes: 42 additions & 13 deletions dfimagetools/artifact_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,35 @@
class ArtifactDefinitionFiltersGenerator(object):
"""Generator of filters based on artifact definitions."""

def __init__(self, artifacts_registry, environment_variables, user_accounts):
# TODO: passing environment_variables and user_accounts via __init__ is
# deprecated.

def __init__(
self, artifacts_registry, environment_variables=None, user_accounts=None):
"""Initializes an artifact definition filters generator.
Args:
artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifact
definitions registry.
environment_variables (list[EnvironmentVariable]): environment variables.
user_accounts (list[UserAccount]): user accounts.
environment_variables (Optional[list[EnvironmentVariable]]): environment
variables.
user_accounts (Optional[list[UserAccount]]]): user accounts.
"""
super(ArtifactDefinitionFiltersGenerator, self).__init__()
self._artifacts_registry = artifacts_registry
self._environment_variables = environment_variables
self._path_resolver = path_resolver.PathResolver()
self._user_accounts = user_accounts

def _BuildFindSpecsFromArtifactDefinition(self, name):
def _BuildFindSpecsFromArtifactDefinition(
self, name, environment_variables=None, user_accounts=None):
"""Builds find specifications from an artifact definition.
Args:
name (str): name of the artifact definition.
environment_variables (Optional[list[EnvironmentVariable]]): environment
variables.
user_accounts (Optional[list[UserAccount]]): user accounts.
Yields:
dfvfs.FindSpec: file system (dfVFS) find specification.
Expand Down Expand Up @@ -61,7 +70,8 @@ def _BuildFindSpecsFromArtifactDefinition(self, name):
if source_type == artifacts_definitions.TYPE_INDICATOR_ARTIFACT_GROUP:
for source_name in set(source.names):
for find_spec in self._BuildFindSpecsFromArtifactDefinition(
source_name):
source_name, environment_variables=environment_variables,
user_accounts=user_accounts):
yield find_spec

elif source_type in (
Expand All @@ -70,15 +80,22 @@ def _BuildFindSpecsFromArtifactDefinition(self, name):
artifacts_definitions.TYPE_INDICATOR_PATH):
for source_path in set(source.paths):
for find_spec in self._BuildFindSpecsFromFileSourcePath(
source_path, source.separator):
source_path, source.separator,
environment_variables=environment_variables,
user_accounts=user_accounts):
yield find_spec

def _BuildFindSpecsFromFileSourcePath(self, source_path, path_separator):
def _BuildFindSpecsFromFileSourcePath(
self, source_path, path_separator, environment_variables=None,
user_accounts=None):
"""Builds find specifications from a file source type.
Args:
source_path (str): file system path defined by the source.
path_separator (str): file system path segment separator.
environment_variables (Optional[list[EnvironmentVariable]]): environment
variables.
user_accounts (Optional[list[UserAccount]]): user accounts.
Yields:
dfvfs.FindSpec: file system (dfVFS) find specification.
Expand All @@ -87,11 +104,11 @@ def _BuildFindSpecsFromFileSourcePath(self, source_path, path_separator):
source_path, path_separator):

for path in self._path_resolver.ExpandUsersVariable(
path_glob, path_separator, self._user_accounts):
path_glob, path_separator, user_accounts):

if '%' in path:
path = self._path_resolver.ExpandEnvironmentVariables(
path, path_separator, self._environment_variables)
path, path_separator, environment_variables)

if not path.startswith(path_separator):
continue
Expand All @@ -108,15 +125,27 @@ def _BuildFindSpecsFromFileSourcePath(self, source_path, path_separator):

yield find_spec

def GetFindSpecs(self, names):
def GetFindSpecs(
self, names=None, environment_variables=None, user_accounts=None):
"""Retrieves find specifications for one or more artifact definitions.
Args:
names (list[str]): names of the artifact definitions to filter on.
names (Optional[list[str]]): names of the artifact definitions to filter
on.
environment_variables (Optional[list[EnvironmentVariable]]): environment
variables.
user_accounts (Optional[list[UserAccount]]): user accounts.
Yields:
dfvfs.FindSpec: file system (dfVFS) find specification.
"""
for name in set(names):
for find_spec in self._BuildFindSpecsFromArtifactDefinition(name):
if self._environment_variables:
environment_variables = self._environment_variables
if self._user_accounts:
user_accounts = self._user_accounts

for name in set(names or []):
for find_spec in self._BuildFindSpecsFromArtifactDefinition(
name, environment_variables=environment_variables,
user_accounts=user_accounts):
yield find_spec
55 changes: 55 additions & 0 deletions dfimagetools/path_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
"""Helper for filtering based on a path."""

import re

from dfvfs.helpers import file_system_searcher as dfvfs_file_system_searcher


class PathFiltersGenerator(object):
"""Generator of filters based on a path."""

_PARTITION_REGEX = re.compile(r'^p[1-9][0-9]*$')

def __init__(self, path):
"""Initializes a path filters generator.
Args:
path (str): path.
Raises:
ValueError: if the path is missing.
"""
# TODO: add option to not look for partition and volume in path
# TODO: determine file system path segment separator.

super(PathFiltersGenerator, self).__init__()
self._partition = None
self._path_segments = path.split('/')

if self._path_segments and not self._path_segments[0]:
self._path_segments.pop(0)

if self._path_segments and self._PARTITION_REGEX.match(
self._path_segments[0]):
self._partition = self._path_segments.pop(0)

if not self._path_segments:
raise ValueError('Missing path')

@property
def partition(self):
"""Retrieves the partition.
Returns:
str: partition defined by the path filter or None if not available.
"""
return self._partition

def GetFindSpecs(self):
"""Retrieves find specifications.
Yields:
dfvfs.FindSpec: file system (dfVFS) find specification.
"""
yield dfvfs_file_system_searcher.FindSpec(location=self._path_segments)
62 changes: 47 additions & 15 deletions dfimagetools/scripts/extract_data_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dfimagetools import artifact_filters
from dfimagetools import data_stream_writer
from dfimagetools import file_entry_lister
from dfimagetools import path_filters
from dfimagetools import windows_registry
from dfimagetools.helpers import command_line

Expand Down Expand Up @@ -47,7 +48,18 @@ def Main():
'Path to a directory or file containing custom artifact definition '
'.yaml files. '))

argument_parser.add_argument(
'--path', dest='path_filter', type=str, default=None, metavar='PATH',
action='store', help='Path of data stream to extract.')

# TODO: add output group
argument_parser.add_argument(
'--no_aliases', '--no-aliases', dest='use_aliases', action='store_false',
default=True, help=(
'Disable the use of partition and/or volume aliases such as '
'/apfs{f449e580-e355-4e74-8880-05e46e4e3b1e} and use indices '
'such as /apfs1 instead.'))

argument_parser.add_argument(
'-t', '--target', dest='target', action='store', metavar='PATH',
default=None, help=(
Expand Down Expand Up @@ -78,9 +90,8 @@ def Main():
print('')
return 1

# TODO: improve this, for now this script needs at least 1 filter.
if not options.artifact_filters:
print('[ERROR] no artifact filters were specified.')
elif not options.path_filter:
print('[ERROR] no extraction filters were specified.')
print('')
return 1

Expand Down Expand Up @@ -120,7 +131,21 @@ def Main():
elif os.path.isfile(options.custom_artifact_definitions):
registry.ReadFromFile(reader, options.custom_artifact_definitions)

entry_lister = file_entry_lister.FileEntryLister(mediator=mediator)
filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator(
registry)

elif options.path_filter:
filter_generator = path_filters.PathFiltersGenerator(options.path_filter)

if filter_generator.partition and options.partitions:
print(('[WARNING] partition specified in path filter will override '
'--partitions command line argument.'))
print('')

volume_scanner_options.partitions = [filter_generator.partition]

entry_lister = file_entry_lister.FileEntryLister(
mediator=mediator, use_aliases=options.use_aliases)
find_specs_generated = False

try:
Expand All @@ -132,24 +157,31 @@ def Main():
return 1

for base_path_spec in base_path_specs:
if not options.artifact_filters:
find_specs = []
else:
find_specs = []

if options.artifact_filters:
environment_variables = []
user_accounts = []

windows_directory = entry_lister.GetWindowsDirectory(base_path_spec)
if not windows_directory:
environment_variables = []
else:
if windows_directory:
winregistry_collector = windows_registry.WindowsRegistryCollector(
base_path_spec, windows_directory)

environment_variables = (
winregistry_collector.CollectSystemEnvironmentVariables())

filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator(
registry, environment_variables, [])
# TODO: determine user accounts.

names = options.artifact_filters.split(',')
find_specs = list(filter_generator.GetFindSpecs(names))
find_specs = list(filter_generator.GetFindSpecs(
names=names, environment_variables=environment_variables,
user_accounts=user_accounts))

elif options.path_filter:
find_specs = list(filter_generator.GetFindSpecs())

if filter_generator:
if not find_specs:
continue

Expand Down Expand Up @@ -184,8 +216,8 @@ def Main():
return 1

if options.artifact_filters and not find_specs_generated:
print('[ERROR] an artifact filter was specified but no corresponding '
'file system find specifications were generated.')
print('[ERROR] an extraction filter was specified but no corresponding '
'find specifications were generated.')
print('')
return 1

Expand Down

0 comments on commit f08d3f6

Please sign in to comment.