forked from galaxyproject/galaxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
folder_contents.py
320 lines (276 loc) · 15.3 KB
/
folder_contents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""
API operations on the contents of a library folder.
"""
from galaxy import util
from galaxy import exceptions
from galaxy import managers
from galaxy.managers import folders
from galaxy.web import _future_expose_api as expose_api
from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous
from galaxy.web.base.controller import BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems
import logging
log = logging.getLogger( __name__ )
class FolderContentsController( BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems ):
"""
Class controls retrieval, creation and updating of folder contents.
"""
def __init__( self, app ):
super( FolderContentsController, self ).__init__( app )
self.folder_manager = folders.FolderManager()
self.hda_manager = managers.hdas.HDAManager( app )
@expose_api_anonymous
def index( self, trans, folder_id, **kwd ):
"""
GET /api/folders/{encoded_folder_id}/contents
Displays a collection (list) of a folder's contents
(files and folders). Encoded folder ID is prepended
with 'F' if it is a folder as opposed to a data set
which does not have it. Full path is provided in
response as a separate object providing data for
breadcrumb path building.
:param folder_id: encoded ID of the folder which
contents should be library_dataset_dict
:type folder_id: encoded string
:param kwd: keyword dictionary with other params
:type kwd: dict
:returns: dictionary containing all items and metadata
:type: dict
:raises: MalformedId, InconsistentDatabase, ObjectNotFound,
InternalServerError
"""
is_admin = trans.user_is_admin()
deleted = kwd.get( 'include_deleted', 'missing' )
current_user_roles = trans.get_current_user_roles()
try:
deleted = util.asbool( deleted )
except ValueError:
deleted = False
decoded_folder_id = self.folder_manager.cut_and_decode( trans, folder_id )
folder = self.folder_manager.get( trans, decoded_folder_id )
# Special level of security on top of libraries.
if trans.app.security_agent.can_access_library( current_user_roles, folder.parent_library ) or is_admin:
pass
else:
if trans.user:
log.warning( "SECURITY: User (id: %s) without proper access rights is trying to load folder with ID of %s" % ( trans.user.id, decoded_folder_id ) )
else:
log.warning( "SECURITY: Anonymous user is trying to load restricted folder with ID of %s" % ( decoded_folder_id ) )
raise exceptions.ObjectNotFound( 'Folder with the id provided ( %s ) was not found' % str( folder_id ) )
folder_contents = []
update_time = ''
create_time = ''
# Go through every accessible item (folders, datasets) in the folder and include its metadata.
for content_item in self._load_folder_contents( trans, folder, deleted ):
return_item = {}
encoded_id = trans.security.encode_id( content_item.id )
update_time = content_item.update_time.strftime( "%Y-%m-%d %I:%M %p" )
create_time = content_item.create_time.strftime( "%Y-%m-%d %I:%M %p" )
if content_item.api_type == 'folder':
encoded_id = 'F' + encoded_id
can_modify = is_admin or ( trans.user and trans.app.security_agent.can_modify_library_item( current_user_roles, folder ) )
can_manage = is_admin or ( trans.user and trans.app.security_agent.can_manage_library_item( current_user_roles, folder ) )
return_item.update( dict( can_modify=can_modify, can_manage=can_manage ) )
if content_item.description:
return_item.update( dict( description=content_item.description ) )
if content_item.api_type == 'file':
# Is the dataset public or private?
# When both are False the dataset is 'restricted'
# Access rights are checked on the dataset level, not on the ld or ldda level to maintain consistency
is_unrestricted = trans.app.security_agent.dataset_is_public( content_item.library_dataset_dataset_association.dataset )
if trans.user and trans.app.security_agent.dataset_is_private_to_user( trans, content_item ):
is_private = True
else:
is_private = False
# Can user manage the permissions on the dataset?
can_manage = is_admin or (trans.user and trans.app.security_agent.can_manage_dataset( current_user_roles, content_item.library_dataset_dataset_association.dataset ) )
nice_size = util.nice_size( int( content_item.library_dataset_dataset_association.get_size() ) )
library_dataset_dict = content_item.to_dict()
return_item.update( dict( file_ext=library_dataset_dict[ 'file_ext' ],
date_uploaded=library_dataset_dict[ 'date_uploaded' ],
is_unrestricted=is_unrestricted,
is_private=is_private,
can_manage=can_manage,
file_size=nice_size
) )
if content_item.library_dataset_dataset_association.message:
return_item.update( dict( message=content_item.library_dataset_dataset_association.message ) )
# For every item include the default metadata
return_item.update( dict( id=encoded_id,
type=content_item.api_type,
name=content_item.name,
update_time=update_time,
create_time=create_time,
deleted=content_item.deleted
) )
folder_contents.append( return_item )
# Return the reversed path so it starts with the library node.
full_path = self.build_path( trans, folder )[ ::-1 ]
# Check whether user can add items to the current folder
can_add_library_item = is_admin or trans.app.security_agent.can_add_library_item( current_user_roles, folder )
# Check whether user can modify the current folder
can_modify_folder = is_admin or trans.app.security_agent.can_modify_library_item( current_user_roles, folder )
parent_library_id = None
if folder.parent_library is not None:
parent_library_id = trans.security.encode_id( folder.parent_library.id )
metadata = dict( full_path=full_path,
can_add_library_item=can_add_library_item,
can_modify_folder=can_modify_folder,
folder_name=folder.name,
folder_description=folder.description,
parent_library_id=parent_library_id )
folder_container = dict( metadata=metadata, folder_contents=folder_contents )
return folder_container
def build_path( self, trans, folder ):
"""
Search the path upwards recursively and load the whole route of
names and ids for breadcrumb building purposes.
:param folder: current folder for navigating up
:param type: Galaxy LibraryFolder
:returns: list consisting of full path to the library
:type: list
"""
path_to_root = []
# We are almost in root
if folder.parent_id is None:
path_to_root.append( ( 'F' + trans.security.encode_id( folder.id ), folder.name ) )
else:
# We add the current folder and traverse up one folder.
path_to_root.append( ( 'F' + trans.security.encode_id( folder.id ), folder.name ) )
upper_folder = trans.sa_session.query( trans.app.model.LibraryFolder ).get( folder.parent_id )
path_to_root.extend( self.build_path( trans, upper_folder ) )
return path_to_root
def _load_folder_contents( self, trans, folder, include_deleted ):
"""
Loads all contents of the folder (folders and data sets) but only
in the first level. Include deleted if the flag is set and if the
user has access to undelete it.
:param folder: the folder which contents are being loaded
:type folder: Galaxy LibraryFolder
:param include_deleted: flag, when true the items that are deleted
and can be undeleted by current user are shown
:type include_deleted: boolean
:returns: a list containing the requested items
:type: list
"""
current_user_roles = trans.get_current_user_roles()
is_admin = trans.user_is_admin()
content_items = []
for subfolder in folder.folders:
if subfolder.deleted:
if include_deleted:
if is_admin:
# Admins can see all deleted folders.
subfolder.api_type = 'folder'
content_items.append( subfolder )
else:
# Users with MODIFY permissions can see deleted folders.
can_modify = trans.app.security_agent.can_modify_library_item( current_user_roles, subfolder )
if can_modify:
subfolder.api_type = 'folder'
content_items.append( subfolder )
else:
# Undeleted folders are non-restricted for now. The contents are not.
# TODO decide on restrictions
subfolder.api_type = 'folder'
content_items.append( subfolder )
# if is_admin:
# subfolder.api_type = 'folder'
# content_items.append( subfolder )
# else:
# can_access, folder_ids = trans.app.security_agent.check_folder_contents( trans.user, current_user_roles, subfolder )
# if can_access:
# subfolder.api_type = 'folder'
# content_items.append( subfolder )
for dataset in folder.datasets:
if dataset.deleted:
if include_deleted:
if is_admin:
# Admins can see all deleted datasets.
dataset.api_type = 'file'
content_items.append( dataset )
else:
# Users with MODIFY permissions on the item can see the deleted item.
can_modify = trans.app.security_agent.can_modify_library_item( current_user_roles, dataset )
if can_modify:
dataset.api_type = 'file'
content_items.append( dataset )
else:
if is_admin:
dataset.api_type = 'file'
content_items.append( dataset )
else:
can_access = trans.app.security_agent.can_access_dataset( current_user_roles, dataset.library_dataset_dataset_association.dataset )
if can_access:
dataset.api_type = 'file'
content_items.append( dataset )
return content_items
@expose_api
def create( self, trans, encoded_folder_id, payload, **kwd ):
"""
* POST /api/folders/{encoded_id}/contents
create a new library file from an HDA
:param encoded_folder_id: the encoded id of the folder to import dataset(s) to
:type encoded_folder_id: an encoded id string
:param payload: dictionary structure containing:
:param from_hda_id: (optional) the id of an accessible HDA to copy into the library
:type from_hda_id: encoded id
:param from_hdca_id: (optional) the id of an accessible HDCA to copy into the library
:type from_hdca_id: encoded id
:param ldda_message: (optional) the new message attribute of the LDDA created
:type ldda_message: str
:param extended_metadata: (optional) dub-dictionary containing any extended metadata to associate with the item
:type extended_metadata: dict
:type payload: dict
:returns: a dictionary describing the new item if ``from_hda_id`` is supplied or a list of
such dictionaries describing the new items if ``from_hdca_id`` is supplied.
:rtype: object
:raises: ObjectAttributeInvalidException,
InsufficientPermissionsException, ItemAccessibilityException,
InternalServerError
"""
encoded_folder_id_16 = self.__decode_library_content_id( trans, encoded_folder_id )
from_hda_id = payload.pop( 'from_hda_id', None )
from_hdca_id = payload.pop( 'from_hdca_id', None )
ldda_message = payload.pop( 'ldda_message', '' )
if ldda_message:
ldda_message = util.sanitize_html.sanitize_html( ldda_message, 'utf-8' )
try:
if from_hda_id:
decoded_hda_id = self.decode_id( from_hda_id )
return self._copy_hda_to_library_folder( trans, self.hda_manager, decoded_hda_id, encoded_folder_id_16, ldda_message )
if from_hdca_id:
decoded_hdca_id = self.decode_id( from_hdca_id )
return self._copy_hdca_to_library_folder( trans, self.hda_manager, decoded_hdca_id, encoded_folder_id_16, ldda_message )
except Exception as exc:
# TODO handle exceptions better within the mixins
if 'not accessible to the current user' in str( exc ) or 'You are not allowed to access this dataset' in str( exc ):
raise exceptions.ItemAccessibilityException( 'You do not have access to the requested item' )
else:
log.exception( exc )
raise exc
def __decode_library_content_id( self, trans, encoded_folder_id ):
"""
Identifies whether the id provided is properly encoded
LibraryFolder.
:param encoded_folder_id: encoded id of Galaxy LibraryFolder
:type encoded_folder_id: encoded string
:returns: encoded id of Folder (had 'F' prepended)
:type: string
:raises: MalformedId
"""
if ( ( len( encoded_folder_id ) % 16 == 1 ) and encoded_folder_id.startswith( 'F' ) ):
return encoded_folder_id[ 1: ]
else:
raise exceptions.MalformedId( 'Malformed folder id ( %s ) specified, unable to decode.' % str( encoded_folder_id ) )
@expose_api
def show( self, trans, id, library_id, **kwd ):
"""
GET /api/folders/{encoded_folder_id}/
"""
raise exceptions.NotImplemented( 'Showing the library folder content is not implemented here.' )
@expose_api
def update( self, trans, id, library_id, payload, **kwd ):
"""
PUT /api/folders/{encoded_folder_id}/contents
"""
raise exceptions.NotImplemented( 'Updating the library folder content is not implemented here.' )