This repository has been archived by the owner on Aug 27, 2023. It is now read-only.
/
object_store.py
114 lines (95 loc) · 3.78 KB
/
object_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
""" Store packages in S3 """
import logging
from binascii import hexlify
from hashlib import md5
from pyramid.httpexceptions import HTTPFound
from pyramid.settings import asbool
from smart_open import open as _open
from pypicloud.models import Package
from .base import IStorage
LOG = logging.getLogger(__name__)
class ObjectStoreStorage(IStorage):
"""Storage backend base class containing code that is common between
supported object stores (S3 / GCS)
"""
test = False
def __init__(
self,
request=None,
expire_after=None,
bucket_prefix=None,
prepend_hash=None,
redirect_urls=None,
sse=None,
object_acl=None,
storage_class=None,
region_name=None,
public_url=False,
**kwargs
):
super(ObjectStoreStorage, self).__init__(request, **kwargs)
self.expire_after = expire_after
self.bucket_prefix = bucket_prefix
self.prepend_hash = prepend_hash
self.redirect_urls = redirect_urls
self.sse = sse
self.object_acl = object_acl
self.storage_class = storage_class
self.region_name = region_name
self.public_url = public_url
def _generate_url(self, package: Package) -> str:
"""Subclasses must implement a method for generating signed URLs to
the package in the object store
"""
raise NotImplementedError
@classmethod
def package_from_object(cls, obj, factory):
"""Subclasses must implement a method for constructing a Package
instance from the backend's storage object format
"""
raise NotImplementedError
@classmethod
def _subclass_specific_config(cls, settings, common_config):
"""Method to allow subclasses to extract configuration parameters
specific to them and not covered in the common configuration
in this class.
"""
return {}
@classmethod
def configure(cls, settings):
kwargs = super(ObjectStoreStorage, cls).configure(settings)
kwargs["expire_after"] = int(settings.get("storage.expire_after", 60 * 60 * 24))
kwargs["bucket_prefix"] = settings.get("storage.prefix", "")
kwargs["prepend_hash"] = asbool(settings.get("storage.prepend_hash", True))
kwargs["object_acl"] = settings.get("storage.object_acl", None)
kwargs["storage_class"] = settings.get("storage.storage_class")
kwargs["redirect_urls"] = asbool(settings.get("storage.redirect_urls", True))
kwargs["region_name"] = settings.get("storage.region_name")
kwargs["public_url"] = asbool(settings.get("storage.public_url"))
kwargs.update(cls._subclass_specific_config(settings, kwargs))
return kwargs
def calculate_path(self, package):
"""Calculates the path of a package"""
path = package.name + "/" + package.filename
if self.prepend_hash:
m = md5()
m.update(package.filename.encode("utf-8"))
prefix = hexlify(m.digest()).decode("utf-8")[:4]
path = prefix + "/" + path
return path
def get_path(self, package):
"""Get the fully-qualified bucket path for a package"""
if "path" not in package.data:
filename = self.calculate_path(package)
package.data["path"] = self.bucket_prefix + filename
return package.data["path"]
def get_url(self, package):
if self.redirect_urls:
return super(ObjectStoreStorage, self).get_url(package)
else:
return self._generate_url(package)
def download_response(self, package):
return HTTPFound(location=self._generate_url(package))
def open(self, package):
url = self._generate_url(package)
return _open(url, "rb", compression="disable")