Skip to content

Commit

Permalink
Changes to entry points (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jan 28, 2024
1 parent 9d661b6 commit 9398fcb
Show file tree
Hide file tree
Showing 8 changed files with 714 additions and 590 deletions.
585 changes: 0 additions & 585 deletions dfimagetools/entry_points.py

This file was deleted.

53 changes: 53 additions & 0 deletions dfimagetools/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
"""Helper functions for CLI tools."""

from dfvfs.lib import definitions as dfvfs_definitions

from dfimagetools import decorators


# This function is deprecated use the one in backend.py instead.
@decorators.deprecated
def SetDFVFSBackEnd(back_end):
"""Sets the dfVFS back-end.
Args:
back_end (str): dfVFS back-end.
"""
if back_end == 'APM':
dfvfs_definitions.PREFERRED_APM_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_APM)

elif back_end == 'EXT':
dfvfs_definitions.PREFERRED_EXT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_EXT)

elif back_end == 'FAT':
dfvfs_definitions.PREFERRED_FAT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_FAT)

elif back_end == 'GPT':
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_GPT)

elif back_end == 'HFS':
dfvfs_definitions.PREFERRED_HFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_HFS)

elif back_end == 'NTFS':
dfvfs_definitions.PREFERRED_NTFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_NTFS)

elif back_end == 'TSK':
dfvfs_definitions.PREFERRED_APM_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_APM_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_FAT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION)
dfvfs_definitions.PREFERRED_HFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_NTFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
196 changes: 196 additions & 0 deletions dfimagetools/scripts/extract_data_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Console script to extract data streams."""

import argparse
import logging
import os
import sys

from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry

from dfvfs.lib import errors as dfvfs_errors

from dfimagetools import artifact_filters
from dfimagetools import data_stream_writer
from dfimagetools import file_entry_lister
from dfimagetools import windows_registry
from dfimagetools.helpers import command_line


def Main():
"""Entry point of console script to extract data streams.
Returns:
int: exit code that is provided to sys.exit().
"""
argument_parser = argparse.ArgumentParser(description=(
'Extracts data streams from a storage media image.'))

# TODO: add filter group
argument_parser.add_argument(
'--artifact_definitions', '--artifact-definitions',
dest='artifact_definitions', type=str, metavar='PATH', action='store',
help=('Path to a directory or file containing the artifact definition '
'.yaml files.'))

argument_parser.add_argument(
'--artifact_filters', '--artifact-filters', dest='artifact_filters',
type=str, default=None, metavar='NAMES', action='store', help=(
'Comma separated list of names of artifact definitions to extract.'))

argument_parser.add_argument(
'--custom_artifact_definitions', '--custom-artifact-definitions',
dest='custom_artifact_definitions', type=str, metavar='PATH',
action='store', help=(
'Path to a directory or file containing custom artifact definition '
'.yaml files. '))

# TODO: add output group
argument_parser.add_argument(
'-t', '--target', dest='target', action='store', metavar='PATH',
default=None, help=(
'target (or destination) path of a directory where the extracted '
'data streams should be stored.'))

# TODO: add source group
command_line.AddStorageMediaImageCLIArguments(argument_parser)

argument_parser.add_argument(
'source', nargs='?', action='store', metavar='image.raw',
default=None, help='path of the storage media image.')

options = argument_parser.parse_args()

if not options.source:
print('Source value is missing.')
print('')
argument_parser.print_help()
print('')
return 1

if options.artifact_filters:
if (not options.artifact_definitions and
not options.custom_artifact_definitions):
print('[ERROR] artifact filters were specified but no paths to '
'artifact definitions were provided.')
print('')
return 1

# TODO: improve this, for now this script needs at least 1 filter.
if not options.artifact_filters:
print('[ERROR] no artifact filters were specified.')
print('')
return 1

target_path = options.target
if not target_path:
source_name = os.path.basename(options.source)
target_path = os.path.join(os.getcwd(), f'{source_name:s}.extracted')

if not os.path.exists(target_path):
os.makedirs(target_path)

elif not os.path.isdir(target_path):
print('[ERROR] target path is not a directory.')
print('')
return 1

logging.basicConfig(
level=logging.INFO, format='[%(levelname)s] %(message)s')

mediator, volume_scanner_options = (
command_line.ParseStorageMediaImageCLIArguments(options))

if options.artifact_filters:
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()

if options.artifact_definitions:
if os.path.isdir(options.artifact_definitions):
registry.ReadFromDirectory(reader, options.artifact_definitions)
elif os.path.isfile(options.artifact_definitions):
registry.ReadFromFile(reader, options.artifact_definitions)

if options.custom_artifact_definitions:
if os.path.isdir(options.custom_artifact_definitions):
registry.ReadFromDirectory(
reader, options.custom_artifact_definitions)
elif os.path.isfile(options.custom_artifact_definitions):
registry.ReadFromFile(reader, options.custom_artifact_definitions)

entry_lister = file_entry_lister.FileEntryLister(mediator=mediator)
find_specs_generated = False

try:
base_path_specs = entry_lister.GetBasePathSpecs(
options.source, options=volume_scanner_options)
if not base_path_specs:
print('No supported file system found in source.')
print('')
return 1

for base_path_spec in base_path_specs:
if not options.artifact_filters:
find_specs = []
else:
windows_directory = entry_lister.GetWindowsDirectory(base_path_spec)
if not windows_directory:
environment_variables = []
else:
winregistry_collector = windows_registry.WindowsRegistryCollector(
base_path_spec, windows_directory)

environment_variables = (
winregistry_collector.CollectSystemEnvironmentVariables())

filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator(
registry, environment_variables, [])

names = options.artifact_filters.split(',')
find_specs = list(filter_generator.GetFindSpecs(names))
if not find_specs:
continue

find_specs_generated = True

file_entries_generator = entry_lister.ListFileEntriesWithFindSpecs(
[base_path_spec], find_specs)

stream_writer = data_stream_writer.DataStreamWriter()
for file_entry, path_segments in file_entries_generator:
for data_stream in file_entry.data_streams:
display_path = stream_writer.GetDisplayPath(
path_segments, data_stream.name)
destination_path = stream_writer.GetSanitizedPath(
path_segments, data_stream.name, target_path)
logging.info(f'Extracting: {display_path:s} to: {destination_path:s}')

destination_directory = os.path.dirname(destination_path)
os.makedirs(destination_directory, exist_ok=True)

stream_writer.WriteDataStream(
file_entry, data_stream.name, destination_path)

except dfvfs_errors.ScannerError as exception:
print(f'[ERROR] {exception!s}', file=sys.stderr)
print('')
return 1

except KeyboardInterrupt:
print('Aborted by user.', file=sys.stderr)
print('')
return 1

if options.artifact_filters and not find_specs_generated:
print('[ERROR] an artifact filter was specified but no corresponding '
'file system find specifications were generated.')
print('')
return 1

return 0


if __name__ == '__main__':
sys.exit(Main())

0 comments on commit 9398fcb

Please sign in to comment.