Skip to content

Commit

Permalink
ref #865 - mongoengine modeling for shared storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
jortel committed Jul 20, 2015
1 parent 97e0a0c commit f83eeea
Show file tree
Hide file tree
Showing 4 changed files with 671 additions and 126 deletions.
218 changes: 218 additions & 0 deletions server/pulp/server/content/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import os
import errno
import shutil

from hashlib import sha256

from pulp.server.config import config


def mkdir(path):
"""
Create a directory at the specified path.
Directory (and intermediate) directories are only created if they
don't already exist.
:param path: The absolute path to the leaf directory to be created.
:type path: str
"""
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise


class ContentStorage(object):
"""
Base class for content storage.
"""

def put(self, unit, path):
"""
Put the content (bits) associated with the specified content unit into storage.
The file (or directory) at the specified *path* is transferred into storage.
:param unit: The content unit to be stored.
:type unit: pulp.sever.db.model.ContentUnit
:param path: The absolute path to the file (or directory) to be stored.
:type path: str
"""
raise NotImplementedError()

def get(self, unit):
"""
Get the content (bits) associated with the specified content unit from storage.
Note: This method included for symmetry and to demonstrate
the full potential of this model.
:return: A file-like object used to stream the content.
:rtype: file
"""
raise NotImplementedError()

def open(self):
"""
Open the storage.
"""
pass

def close(self):
"""
Close the storage.
"""
pass

def __enter__(self):
self.open()
return self

def __exit__(self, *unused):
self.close()


class FileStorage(ContentStorage):
"""
Direct storage for files and directories.
"""

def put(self, unit, path):
"""
Put the content defined by the content unit into storage.
The file (or directory) at the specified *path* is transferred into storage.
:param unit: The content unit to be stored.
:type unit: pulp.sever.db.model.ContentUnit
:param path: The absolute path to the file (or directory) to be stored.
:type path: str
"""
storage_dir = os.path.join(
config.get('server', 'storage_dir'),
'content',
'units')
destination = os.path.join(storage_dir, unit.unit_type_id, unit.id[0:4], unit.id)
mkdir(os.path.dirname(destination))
if os.path.isdir(path):
shutil.copytree(path, destination)
else:
shutil.copy(path, destination)
unit.storage_path = destination

def get(self, unit):
"""
Get the content (bits) associated with the specified content unit from storage.
Note: This method included for symmetry and to demonstrate
the full potential of this model.
:return: A file-like object used to stream the content.
:rtype: file
"""
pass


class SharedStorage(ContentStorage):
"""
Direct shared storage.
:ivar storage_id: A shared storage identifier.
:ivar storage_id: str
"""

def __init__(self, storage_id):
"""
:param storage_id: A shared storage identifier.
:ivar storage_id: str
"""
super(SharedStorage, self).__init__()
self.storage_id = sha256(storage_id).hexdigest()

def put(self, unit, path=None):
"""
Put the content (bits) associated with the specified content unit into storage.
The file (or directory) at the specified *path* is transferred into storage.
:param unit: The content unit to be stored.
:type unit: pulp.sever.db.model.ContentUnit
:param path: The absolute path to the file (or directory) to be stored.
:type path: str
"""
self.link(unit)

def get(self, unit):
"""
Get the content (bits) associated with the specified content unit from storage.
Note: This method included for symmetry and to demonstrate
the full potential of this model.
:return: A file-like object used to stream the content.
:rtype: file
"""
pass

def open(self):
"""
Open the shared storage.
The shared storage location is created as needed.
"""
mkdir(self.content_dir)
mkdir(self.links_dir)

@property
def shared_dir(self):
"""
The root location of the shared storage.
:return: The absolute path to the shared storage.
:rtype: str
"""
storage_dir = os.path.join(
config.get('server', 'storage_dir'),
'content',
'shared')
path = os.path.join(storage_dir, self.storage_id)
return path

@property
def content_dir(self):
"""
The location within the shared storage for storing content.
:return: The absolute path to the location within the
shared storage for storing content.
:rtype: str
"""
path = os.path.join(self.shared_dir, 'content')
return path

@property
def links_dir(self):
"""
The location within the shared storage for links.
:return: The absolute path to the location within the
shared storage for storing links.
:rtype: str
"""
path = os.path.join(self.shared_dir, 'links')
return path

def link(self, unit):
"""
Link the specified content unit (by id) to the shared content.
:param unit: The content unit to be linked.
:type unit: pulp.sever.db.model.ContentUnit
"""
target = self.content_dir
link = os.path.join(self.links_dir, unit.id)
try:
os.symlink(target, link)
except OSError, e:
if e.errno == errno.EEXIST and os.path.islink(link) and os.readlink(link) == target:
pass # identical
else:
raise
unit.storage_path = link
140 changes: 91 additions & 49 deletions server/pulp/server/db/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import copy
import errno
import logging
import os
import shutil
import uuid
from collections import namedtuple

Expand All @@ -11,8 +9,10 @@
from mongoengine import signals

from pulp.common import constants, dateutils, error_codes

from pulp.server import exceptions
from pulp.server.content.storage import FileStorage, SharedStorage
from pulp.plugins.model import Repository as plugin_repo
from pulp.server import config, exceptions
from pulp.server.async.emit import send as send_taskstatus_message
from pulp.server.db.fields import ISO8601StringField
from pulp.server.db.model.reaper_base import ReaperMixin
Expand Down Expand Up @@ -404,11 +404,6 @@ class ContentUnit(Document):

_NAMED_TUPLE = None

def __init__(self, *args, **kwargs):
super(ContentUnit, self).__init__(*args, **kwargs)
self._source_location = None
self._relative_path = None

@classmethod
def attach_signals(cls):
"""
Expand Down Expand Up @@ -455,47 +450,6 @@ def pre_save_signal(cls, sender, document, **kwargs):
document.id = str(uuid.uuid4())
document.last_updated = dateutils.now_utc_timestamp()

# If content was set on this unit, copy the content into place
if document._source_location:
server_storage_dir = config.config.get('server', 'storage_dir')
platform_storage_location = os.path.join(server_storage_dir, 'units',
document.unit_type_id,
str(document.id)[0],
str(document.id)[1:3],
str(document.id))
# Make if source is a directory, recursively copy it, otherwise copy the file
if os.path.isdir(document._source_location):
shutil.copytree(document._source_location, platform_storage_location)
else:
target_file_name = os.path.basename(document._source_location)
# Make sure the base directory exists
try:
os.makedirs(platform_storage_location)
except OSError as e:
if e.errno != errno.EEXIST:
raise
# Copy the file
document_full_storage_location = os.path.join(platform_storage_location,
target_file_name)
shutil.copy(document._source_location, document_full_storage_location)
platform_storage_location = document_full_storage_location
document.storage_path = platform_storage_location

def set_content(self, source_location):
"""
Store the source of the content for the unit and the relative path
where it should be stored within the plugin content directory.
:param source_location: The absolute path to the content in the plugin working directory.
:type source_location: str
:raises PulpCodedException: PLP0036 if the source_location doesn't exist.
"""
if not os.path.exists(source_location):
raise exceptions.PulpCodedException(error_code=error_codes.PLP0036,
source_location=source_location)
self._source_location = source_location

def get_repositories(self):
"""
Get an iterable of Repository models for all the repositories that contain this unit
Expand Down Expand Up @@ -536,6 +490,94 @@ def __hash__(self):
return hash(self.unit_type_id + self.unit_key_str)


class FileContentUnit(ContentUnit):
"""
A content unit representing content that is of type *file* or *directory*.
:ivar _source_location: The absolute path to file or directory
to be copied to the platform storage location when the unit
is saved. See: set_content().
:type _source_location: str
"""

meta = {
'abstract': True,
}

def __init__(self, *args, **kwargs):
super(FileContentUnit, self).__init__(*args, **kwargs)
self._source_location = None

@classmethod
def pre_save_signal(cls, sender, document, **kwargs):
"""
The signal that is triggered before a unit is saved, this is used to
support the legacy behavior of generating the unit id and setting
the last_updated timestamp
:param sender: sender class
:type sender: object
:param document: Document that sent the signal
:type document: FileContentUnit
"""
super(FileContentUnit, cls).pre_save_signal(sender, document, **kwargs)
if not document._source_location:
# no content
return
with FileStorage() as storage:
storage.put(document, document._source_location)

def set_content(self, source_location):
"""
Store the source of the content for the unit and the relative path
where it should be stored within the plugin content directory.
:param source_location: The absolute path to the content in the plugin working directory.
:type source_location: str
:raises PulpCodedException: PLP0036 if the source_location doesn't exist.
"""
if not os.path.exists(source_location):
raise exceptions.PulpCodedException(error_code=error_codes.PLP0036,
source_location=source_location)
self._source_location = source_location


class SharedContentUnit(ContentUnit):
"""
A content unit representing content that is stored in a
shared storage facility.
"""

meta = {
'abstract': True,
}

@property
def storage_id(self):
"""
The identifier for the shared storage location.
:return: An identifier for shared storage.
:rtype: str
"""
raise NotImplementedError()

@classmethod
def pre_save_signal(cls, sender, document, **kwargs):
"""
The signal that is triggered before a unit is saved.
Set the storage_path on the document and add the symbolic link.
:param sender: sender class
:type sender: object
:param document: Document that sent the signal
:type document: SharedContentUnit
"""
super(SharedContentUnit, cls).pre_save_signal(sender, document, **kwargs)
with SharedStorage(document.storage_id) as storage:
storage.link(document)


class CeleryBeatLock(Document):
"""
Single document collection which gives information about the current celerybeat lock.
Expand Down
Loading

0 comments on commit f83eeea

Please sign in to comment.