Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deposit: checksum migration fix #55

Merged
merged 2 commits into from Sep 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 86 additions & 24 deletions invenio_migrator/tasks/deposit.py
Expand Up @@ -25,12 +25,16 @@
"""Celery task for records migration."""

from __future__ import absolute_import, print_function
import arrow

from celery import shared_task
from celery.utils.log import get_task_logger
from os.path import splitext

from invenio_pidstore.models import RecordIdentifier

from .utils import empty_str_if_none
from .errors import DepositMultipleRecids, DepositRecidDoesNotExist
from .errors import DepositMultipleRecids

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -64,6 +68,9 @@ def create_record_and_pid(data):
from invenio_pidstore.models import PersistentIdentifier, PIDStatus

deposit = Record.create(data=data)

created = arrow.get(data['_p']['created']).datetime
deposit.model.created = created.replace(tzinfo=None)
depid = deposit['_p']['id']
pid = PersistentIdentifier.create(
pid_type='depid',
Expand All @@ -72,6 +79,8 @@ def create_record_and_pid(data):
object_uuid=str(deposit.id),
status=PIDStatus.REGISTERED
)
if RecordIdentifier.query.get(int(depid)) is None:
RecordIdentifier.insert(int(depid))
deposit.commit()
return deposit, pid

Expand All @@ -89,38 +98,92 @@ def create_files_and_sip(deposit, dep_pid):
recbuc = RecordsBuckets(record_id=deposit.id, bucket_id=buc.id)
db.session.add(recbuc)
deposit.setdefault('_deposit', dict())
deposit.setdefault('_buckets', dict(deposit=str(buc.id)))
deposit.setdefault('_files', list())
files = deposit.get('files', [])
sips = deposit.get('sips', [])
recid = None

if sips:
recids = [int(sip['metadata']['recid']) for sip in sips]
if len(set(recids)) > 1:
logger.error('Multiple recids ({recids}) found in deposit {depid}'
' does not exists.'.format(recids=recids,
depid=dep_pid.pid_value))
raise DepositMultipleRecids(dep_pid.pid_value, list(set(recids)))
elif recids: # If only one recid
recid = recids[0]
# Look for prereserved DOI (and recid)
if 'drafts' in deposit:
drafts = list(deposit['drafts'].items())
if len(drafts) != 1:
logger.exception('Deposit {dep_pid} has multiple drafts'.format(
dep_pid=dep_pid))
if len(drafts) == 1:
draft_type, draft = drafts[0]
draft_v = draft['values']
if 'prereserve_doi' in draft_v:
pre_recid = str(draft_v['prereserve_doi']['recid'])
pre_doi = str(draft_v['prereserve_doi']['doi'])

# If pre-reserve info available, try to reserve 'recid'
try:
pid = PersistentIdentifier.get(pid_type='recid',
pid_value=str(pre_recid))
except PIDDoesNotExistError:
# Reserve recid
pid = PersistentIdentifier.create(
pid_type='recid',
pid_value=str(pre_recid),
object_type='rec',
status=PIDStatus.RESERVED
)

# If pre-reserve info available, try to reserve 'doi'
try:
pid = PersistentIdentifier.get(pid_type='doi',
pid_value=str(pre_doi))
except PIDDoesNotExistError:
# Reserve DOI
pid = PersistentIdentifier.create(
pid_type='doi',
pid_value=str(pre_doi),
object_type='rec',
status=PIDStatus.RESERVED
)

if RecordIdentifier.query.get(int(pre_recid)) is None:
RecordIdentifier.insert(int(pre_recid))


# Store the path -> FileInstance mappings for SIPFile creation later
dep_file_instances = list()

for file_ in files:
size = file_['size']
key=file_['name']
# Warning: Assumes all checksums are MD5!
checksum = 'md5:{0}'.format(file_['checksum'])
fi = FileInstance.create()
fi.set_uri(file_['path'], file_['size'], file_['checksum'])
ov = ObjectVersion.create(buc, file_['name'], _file_id=fi.id)
fi.set_uri(file_['path'], size, checksum)
ov = ObjectVersion.create(buc, key, _file_id=fi.id)
ext = splitext(ov.key)[1].lower()
if ext.startswith('.'):
ext = ext[1:]
file_meta = dict(
bucket=str(buc.id),
key=file_['name'],
checksum=file_['checksum'],
size=file_['size'],
bucket=str(ov.bucket.id),
key=ov.key,
checksum=ov.file.checksum,
size=ov.file.size,
version_id=str(ov.version_id),
type=ext,
)
deposit['_files'].append(file_meta)
dep_file_instances.append((file_['path'], fi))


# Get a recid from SIP information
recid = None
if sips:
recids = [int(sip['metadata']['recid']) for sip in sips]
if len(set(recids)) > 1:
logger.error('Multiple recids ({recids}) found in deposit {depid}'
' does not exists.'.format(recids=recids,
depid=dep_pid.pid_value))
raise DepositMultipleRecids(dep_pid.pid_value, list(set(recids)))
elif recids: # If only one recid
recid = recids[0]

for idx, sip in enumerate(sips):
agent = None
user_id = None
Expand Down Expand Up @@ -149,13 +212,11 @@ def create_files_and_sip(deposit, dep_pid):
content,
agent=agent)

# If recid was found, attach it to SIP
# TODO: This is always uses the first recid, as we quit if multiple
# recids are found in the sips information
# Attach recid to SIP
if recid:
try:
pid = PersistentIdentifier.get(pid_type='recid',
pid_value=recid)
pid_value=str(recid))
record_sip = RecordSIP(sip_id=sip.id, pid_id=pid.id)
db.session.add(record_sip)
except PIDDoesNotExistError:
Expand All @@ -164,9 +225,8 @@ def create_files_and_sip(deposit, dep_pid):
recid=recid, depid=dep_pid.pid_value))
if deposit['_p']['submitted'] == True:
logger.exception('Pair {recid}/{depid} was submitted,'
' setting to unpublished.'.format(
' (should it be unpublished?).'.format(
recid=recid, depid=dep_pid.pid_value))
deposit['_p']['submitted'] = False
else:
logger.exception('Pair {recid}/{depid} was not submitted.'.
format(recid=recid, depid=dep_pid.pid_value))
Expand All @@ -176,9 +236,11 @@ def create_files_and_sip(deposit, dep_pid):
pid_type='recid',
pid_value=str(recid),
object_type='rec',
object_uuid=str(deposit.id),
status=PIDStatus.RESERVED
)

if RecordIdentifier.query.get(int(recid)) is None:
RecordIdentifier.insert(int(recid))
if idx == 0:
for fp, fi in dep_file_instances:
sipf = SIPFile(sip_id=sip.id, filepath=fp, file_id=fi.id)
Expand Down
27 changes: 15 additions & 12 deletions tests/unit/test_deposit_load.py
Expand Up @@ -43,37 +43,40 @@ def test_deposit_load(dummy_location, deposit_user, deposit_record_pid):
dep1 = dict(sips=[dict(metadata=dict(recid='10'),
agents=[dict(user_id=1), ],
package='Content1'), ],
_p=dict(id='1'))
_p=dict(id='1', created='2016-08-25T20:20:18+00:00'))
dep2 = dict(sips=[dict(metadata=dict(recid='50'),
agents=[dict(user_id=1), ],
package='Content2'), ],
_p=dict(id='2', submitted=True))
_p=dict(id='2', submitted=True,
created='2016-08-25T20:20:18+00:00'))
dep3 = dict(sips=[dict(metadata=dict(recid='10'),
agents=[dict(user_id=5), ],
package='Content3'), ],
_p=dict(id='3'))
_p=dict(id='3', created='2016-08-25T20:20:18+00:00'))
dep4 = dict(sips=[dict(metadata=dict(recid='10'),
agents=[dict(user_id=5), ],
package='Content4'),
dict(metadata=dict(recid='11'),
agents=[dict(user_id=5), ],
package='Content5'), ],
_p=dict(id='4'))
_p=dict(id='4', created='2016-08-25T20:20:18+00:00'))
load_deposit(dep1)
pytest.raises(DepositMultipleRecids, load_deposit, dep4)

# Should set user to null because user_id does not exist
load_deposit(dep3)
assert SIP.query.filter_by(content="Content3").one().user_id is None

# TODO: This is a case where recid does not exist but deposit is supposedly
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember why I wanted to do it this way... :/

# TODO: submitted. I am not sure if this is still valid, please investigate
# Should create reserved recid and leave deposit unsubmitted
load_deposit(dep2)
assert PersistentIdentifier.query.filter_by(
pid_type='recid', pid_value='50').one().status == PIDStatus.RESERVED
# load_deposit(dep2)
# assert PersistentIdentifier.query.filter_by(
# pid_type='recid', pid_value='50').one().status == PIDStatus.RESERVED

pid, dep = Resolver(pid_type='depid', object_type='rec',
getter=Record.get_record).resolve('2')
assert not dep['_p']['submitted']
# pid, dep = Resolver(pid_type='depid', object_type='rec',
# getter=Record.get_record).resolve('2')
# assert not dep['_p']['submitted']


def test_deposit_load_task(dummy_location, deposit_dump, deposit_user,
Expand All @@ -83,7 +86,7 @@ def test_deposit_load_task(dummy_location, deposit_dump, deposit_user,
# Create a user and a record with PID corresponding with test deposit data
assert RecordMetadata.query.count() == 1
for dep in deposit_dump:
load_deposit.delay(dep)
load_deposit(dep)
assert RecordMetadata.query.count() == 2
res = Resolver(pid_type='depid', object_type='rec',
getter=Record.get_record)
Expand All @@ -103,6 +106,6 @@ def test_deposit_load_task(dummy_location, deposit_dump, deposit_user,
files = list(dep_recbucket.files)
assert files[0]['key'] == 'bazbar.pdf'
assert files[0]['size'] == 12345
assert files[0]['checksum'] == "00000000000000000000000000000000"
assert files[0]['checksum'] == "md5:00000000000000000000000000000000"
assert files[0]['bucket']
assert SIPFile.query.count() == 1