Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIBFCREPO-1259. Added a --publish flag to the import CLI command. #269

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions plastron-cli/src/plastron/cli/commands/importcommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TextIO

from plastron.cli.commands import BaseCommand
from plastron.jobs.imports import ImportJob
from plastron.jobs.imports import ImportConfig, ImportJobs
from plastron.models import get_model_class, ModelClassNotFoundError
from plastron.rdf import uri_or_curie
from plastron.utils import datetimestamp
Expand Down Expand Up @@ -126,6 +126,11 @@ def configure_cli(subparsers):
metavar='MIME_TYPES',
action='store'
)
parser.add_argument(
'--publish',
help='automatically publish all items in this import',
action='store_true',
)
parser.add_argument(
'import_file', nargs='?',
help='name of the file to import from',
Expand Down Expand Up @@ -164,27 +169,30 @@ def __call__(self, args: Namespace):
# TODO: generate a more unique id? add in user and hostname?
args.job_id = f"import-{datetimestamp()}"

job = ImportJob(args.job_id, self.jobs_dir)
jobs = ImportJobs(self.jobs_dir)
if args.resume:
self.run(job.resume(
repo=self.context.repo,
limit=args.limit,
percentage=args.percentage,
validate_only=args.validate_only,
))
logger.info(f'Resuming saved job {args.job_id}')
job = jobs.get_job(args.job_id)
else:
self.run(job.start(
repo=self.context.repo,
import_file=args.import_file,
logger.info(f'Creating new job {args.job_id}')
job = jobs.create_job(config=ImportConfig(
job_id=args.job_id,
model=args.model,
access=args.access,
member_of=args.member_of,
container=args.container,
binaries_location=args.binaries_location,
limit=args.limit,
percentage=args.percentage,
validate_only=args.validate_only,
))

logger.debug(f'Running job {job.id}')
self.run(job.run(
repo=self.context.repo,
import_file=args.import_file,
limit=args.limit,
percentage=args.percentage,
validate_only=args.validate_only,
publish=args.publish,
))

for key, value in self.result['count'].items():
logger.info(f"{key.title().replace('_', ' ')}: {value}")
6 changes: 3 additions & 3 deletions plastron-cli/tests/commands/test_importcommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from plastron.cli.commands.importcommand import Command
from plastron.client import Client, Endpoint
from plastron.jobs import JobConfigError
from plastron.jobs import JobConfigError, JobNotFoundError


@pytest.fixture
Expand Down Expand Up @@ -38,11 +38,11 @@ def test_cannot_resume_without_job_directory(plastron_context):
plastron_context.args = args
command = Command(context=plastron_context)

with pytest.raises(RuntimeError) as excinfo:
with pytest.raises(JobNotFoundError) as excinfo:
for _ in command(args):
pass

assert "no such job directory" in str(excinfo.value)
assert "does not exist" in str(excinfo.value)


def test_cannot_resume_without_config_file(plastron_context):
Expand Down
4 changes: 4 additions & 0 deletions plastron-repo/src/plastron/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class JobConfigError(JobError):
pass


class JobNotFoundError(JobError):
pass


def annotate_from_files(item, mime_types):
for member in item.has_member.objects:
# extract text from HTML files
Expand Down
102 changes: 53 additions & 49 deletions plastron-repo/src/plastron/jobs/imports/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dataclasses
import logging
import os
import re
Expand All @@ -17,7 +16,7 @@

from plastron.client import ClientError
from plastron.files import BinarySource, ZipFileSource, RemoteFileSource, HTTPFileSource, LocalFileSource
from plastron.jobs import JobConfigError, JobError, annotate_from_files
from plastron.jobs import JobConfigError, JobError, annotate_from_files, JobNotFoundError
from plastron.jobs.imports.spreadsheet import LineReference, MetadataSpreadsheet, InvalidRow, Row
from plastron.models import get_model_class, ModelClassNotFoundError
from plastron.namespaces import umdaccess
Expand Down Expand Up @@ -76,12 +75,10 @@ def save(self, filename: Union[str, Path]):


class ImportJob:
def __init__(self, job_id, jobs_dir, ssh_private_key: str = None):
def __init__(self, job_id: str, job_dir: Path, ssh_private_key: str = None):
self.id = job_id
# URL-escaped ID that can be used as a path segment on a filesystem or URL
self.safe_id = urllib.parse.quote(job_id, safe='')
self.dir = Path(jobs_dir) / self.safe_id
self.config = ImportConfig(job_id=job_id)
self.dir = job_dir
self.config = None
self._model_class = None

# record of items that are successfully loaded
Expand Down Expand Up @@ -112,9 +109,9 @@ def model_class(self):
self._model_class = get_model_class(self.config.model)
return self._model_class

def load_config(self) -> ImportConfig:
def load_config(self) -> 'ImportJob':
self.config = ImportConfig.from_file(self.config_filename)
return self.config
return self

def store_metadata_file(self, input_file: IO):
with open(self.metadata_filename, mode='w') as file:
Expand Down Expand Up @@ -154,47 +151,24 @@ def latest_run(self) -> Optional['ImportRun']:
def get_metadata(self) -> MetadataSpreadsheet:
return MetadataSpreadsheet(metadata_filename=self.metadata_filename, model_class=self.model_class)

def start(
def run(
self,
repo: Repository,
model: str,
access: URIRef = None,
member_of: URIRef = None,
container: str = None,
binaries_location: str = None,
extract_text_types: str = None,
**kwargs,
limit: int = None,
percentage: int = None,
validate_only: bool = False,
import_file: IO = None,
publish: bool = False,
) -> Generator[Dict[str, Any], None, Dict[str, Any]]:
config = {
'model': model,
'access': access,
'member_of': member_of,
'container': container,
'binaries_location': binaries_location,
'extract_text_types': extract_text_types,
}
# store the relevant config
self.config = dataclasses.replace(self.config, **config)
# if we are not resuming, make sure the directory exists
os.makedirs(self.dir, exist_ok=True)
self.config.save(self.config_filename)

run = self.new_run()
return run(repo=repo, **kwargs)

def resume(self, repo: Repository, **kwargs) -> Generator[Dict[str, Any], None, Dict[str, Any]]:
if not self.exists:
raise RuntimeError(f'Cannot resume job "{self.id}": no such job directory: "{self.dir}"')

logger.info(f'Resuming saved job {self.id}')
try:
# load stored config from the previous run of this job
self.load_config()
except FileNotFoundError:
raise RuntimeError(f'Cannot resume job {self.id}: no config.yml found in {self.dir}')

run = self.new_run()
return run(repo=repo, **kwargs)
return run(
repo=repo,
limit=limit,
percentage=percentage,
validate_only=validate_only,
import_file=import_file,
publish=publish,
)

@property
def access(self) -> Optional[URIRef]:
Expand Down Expand Up @@ -280,11 +254,13 @@ def get_file(self, base_location: str, filename: str) -> File:


class ImportRow:
def __init__(self, job: ImportJob, repo: Repository, row: Row, validate_only: bool = False):
def __init__(self, job: ImportJob, repo: Repository, row: Row, validate_only: bool = False, publish: bool = False):
self.job = job
self.row = row
self.repo = repo
self.item = row.get_object(repo, read_from_repo=not validate_only)
if publish:
self.item.rdf_type.add(umdaccess.Published)

def __str__(self):
return str(self.row.line_reference)
Expand Down Expand Up @@ -448,9 +424,10 @@ def run(
self,
repo: Repository,
limit: int = None,
percentage=None,
percentage: int = None,
validate_only: bool = False,
import_file: IO = None,
publish: bool = False,
) -> Generator[Dict[str, Any], None, Dict[str, Any]]:
"""Execute this import run. Returns a generator that yields a dictionary of
current status after each item. The generator also returns a final status
Expand Down Expand Up @@ -482,6 +459,8 @@ def generator(repo):
logger.info(f'Loading {percentage}% of the total items')
if validate_only:
logger.info('Validation-only mode, skipping imports')
if publish:
logger.info('Publishing all imported items')

# if an import file was provided, save that as the new CSV metadata file
if import_file is not None:
Expand Down Expand Up @@ -518,7 +497,7 @@ def generator(repo):
continue

logger.debug(f'Row data: {row.data}')
import_row = ImportRow(self.job, repo, row, validate_only)
import_row = ImportRow(self.job, repo, row, validate_only, publish)

# count the number of files referenced in this row
count['files'] += len(row.filenames)
Expand Down Expand Up @@ -641,3 +620,28 @@ def complete(self, item, line_reference, status):
:return:
"""
self.job.complete(item, line_reference, status)


class ImportJobs:
def __init__(self, directory: Union[Path, str]):
self.dir = Path(directory)

def create_job(self, job_id: str = None, config: ImportConfig = None) -> ImportJob:
if config is None:
if job_id is None:
raise RuntimeError('Must specify either a job_id or config')
config = ImportConfig(job_id=job_id)
safe_id = urllib.parse.quote(config.job_id, safe='')
job_dir = self.dir / safe_id
if job_dir.exists():
raise RuntimeError(f'Job directory {job_dir} for job id {config.job_id} already exists')
job_dir.mkdir(parents=True, exist_ok=True)
config.save(job_dir / 'config.yml')
return ImportJob(job_id=config.job_id, job_dir=job_dir).load_config()

def get_job(self, job_id: str) -> ImportJob:
safe_id = urllib.parse.quote(job_id, safe='')
job_dir = self.dir / safe_id
if not job_dir.exists():
raise JobNotFoundError(f'Job directory {job_dir} for job id {job_id} does not exist')
return ImportJob(job_id=job_id, job_dir=job_dir).load_config()
90 changes: 65 additions & 25 deletions plastron-repo/tests/jobs/test_import_job.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,58 @@
from contextlib import nullcontext
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from plastron.cli.commands.publish import get_publication_status
from plastron.jobs.imports import ImportJob
from plastron.jobs.imports import ImportJobs, ImportConfig
from plastron.repo import Repository, RepositoryResource


def test_safe_job_id():
job = ImportJob('foo', '/tmp')
assert job.id == 'foo'
assert job.safe_id == 'foo'
@pytest.fixture
def jobs_dir(datadir) -> Path:
return datadir


@pytest.fixture
def jobs(jobs_dir) -> ImportJobs:
return ImportJobs(jobs_dir)

def test_job_id_with_slashes():
job = ImportJob('foo/bar', '/tmp')
assert job.id == 'foo/bar'
assert job.safe_id == 'foo%2Fbar'

@pytest.mark.parametrize(
('job_id', 'safe_id'),
[
# basic
('foo', 'foo'),
# with slashes
('foo/bar', 'foo%2Fbar'),
# URI as job ID
('http://localhost:3000/import-jobs/17', 'http%3A%2F%2Flocalhost%3A3000%2Fimport-jobs%2F17'),
]
)
def test_safe_job_id(jobs, job_id, safe_id):
job = jobs.create_job(job_id=job_id)
assert job.id == job_id
assert job.dir == jobs.dir / safe_id


def test_uri_as_job_id():
job = ImportJob('http://localhost:3000/import-jobs/17', '/tmp')
assert job.id == 'http://localhost:3000/import-jobs/17'
assert job.safe_id == 'http%3A%2F%2Flocalhost%3A3000%2Fimport-jobs%2F17'
class MockContainer:
obj = None
_resource_class = None
path = '/foo'

def create_child(self, resource_class, description):
self.obj = description
self._resource_class = resource_class
return MagicMock(spec=RepositoryResource, url='/foo/bar')

def test_import_job_create_resource(datadir):
class MockContainer:
obj = None
_resource_class = None
path = '/foo'

def create_child(self, resource_class, description):
self.obj = description
self._resource_class = resource_class
return MagicMock(spec=RepositoryResource, url='/foo/bar')
@pytest.fixture
def import_file(datadir):
return datadir / 'item.csv'


def test_import_job_create_resource(import_file, jobs):
mock_container = MockContainer()
mock_repo = MagicMock(spec=Repository)
mock_repo.transaction.return_value = nullcontext()
Expand All @@ -51,8 +69,30 @@ def create_child(self, resource_class, description):
'UnpublishedHidden',
'Published',
]
import_job = ImportJob('123', datadir)
import_file = datadir / 'item.csv'
for i, stats in enumerate(import_job.start(repo=mock_repo, model='Item', import_file=import_file.open())):
import_job = jobs.create_job(config=ImportConfig(job_id='123', model='Item'))
for i, stats in enumerate(import_job.run(repo=mock_repo, import_file=import_file.open())):
assert mock_container.obj is not None
assert get_publication_status(mock_container.obj) == expected_publication_statuses[i]


def test_import_job_create_resource_publish_all(import_file, jobs):
mock_container = MockContainer()
mock_repo = MagicMock(spec=Repository)
mock_repo.transaction.return_value = nullcontext()
mock_repo.__getitem__.return_value = mock_container

expected_publication_statuses = [
'Published',
'Published',
'PublishedHidden',
'PublishedHidden',
'Published',
'Published',
'Published',
'PublishedHidden',
'Published',
]
import_job = jobs.create_job(config=ImportConfig(job_id='123', model='Item'))
for i, stats in enumerate(import_job.run(repo=mock_repo, publish=True, import_file=import_file.open())):
assert mock_container.obj is not None
assert get_publication_status(mock_container.obj) == expected_publication_statuses[i]
Loading