/
s3_url.py
149 lines (124 loc) · 4.93 KB
/
s3_url.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from datetime import datetime, timedelta
from typing import List, Optional
from botocore.exceptions import ClientError
from fastapi import HTTPException, Path, Request
from fastapi import Response
from maggma.api.models import S3URLDoc
from maggma.api.models import Response as ResponseModel
from maggma.api.resource import Resource, HeaderProcessor
from maggma.api.utils import serialization_helper
from maggma.stores.aws import S3Store
import orjson
class S3URLResource(Resource):
"""
Implements a REST Compatible Resource as a GET URL endpoint
that provides pre-signed S3 URLs.
"""
def __init__(
self,
store: S3Store,
url_lifetime: int,
tags: Optional[List[str]] = None,
header_processor: Optional[HeaderProcessor] = None,
disable_validation: bool = False,
include_in_schema: Optional[bool] = True,
sub_path: Optional[str] = "/",
):
"""
Args:
store: The Maggma Store to get data from
url_lifetime: URL lifetime in seconds
header_processor: The header processor to use for this resource
disable_validation: Whether to use ORJSON and provide a direct FastAPI response.
Note this will disable auto JSON serialization and response validation with the
provided model.
include_in_schema: Whether the endpoint should be shown in the documented schema.
sub_path: sub-URL path for the resource.
"""
self.store = store
self.url_lifetime = url_lifetime
self.tags = tags or []
self.header_processor = header_processor
self.disable_validation = disable_validation
self.include_in_schema = include_in_schema
self.sub_path = sub_path
self.response_model = ResponseModel[S3URLDoc] # type: ignore
super().__init__(S3URLDoc)
def prepare_endpoint(self):
"""
Internal method to prepare the endpoint by setting up default handlers
for routes
"""
self.build_get_by_key()
def build_get_by_key(self):
key_name = self.store.key
model_name = self.model.__name__
def get_by_key(
request: Request,
temp_response: Response,
key: str = Path(
...,
alias=key_name,
title=f"The {key_name} of the {model_name} to get",
),
):
f"""
Get's a document by the primary key in the store
Args:
{key_name}: the id of a single {model_name}
Returns:
A single pre-signed URL {model_name} document
"""
self.store.connect()
if self.store.sub_dir is not None:
key = self.store.sub_dir.strip("/") + "/" + key
# Make sure object is in bucket
try:
self.store.s3.Object(self.store.bucket, key).load()
except ClientError:
raise HTTPException(
status_code=404,
detail="No object found for {} = {}".format(
self.store.key, key.split("/")[-1]
),
)
# Get URL
try:
url = self.store.s3.meta.client.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": self.store.bucket, "Key": key},
ExpiresIn=self.url_lifetime,
)
except Exception:
raise HTTPException(
status_code=404,
detail="Problem obtaining URL for {} = {}".format(
self.store.key, key.split("/")[-1]
),
)
if self.store._coll:
self.store.close()
requested_datetime = datetime.utcnow()
expiry_datetime = requested_datetime + timedelta(seconds=self.url_lifetime)
item = S3URLDoc(
url=url,
requested_datetime=requested_datetime,
expiry_datetime=expiry_datetime,
)
response = {"data": [item.dict()]} # type: ignore
if self.disable_validation:
response = Response( # type: ignore
orjson.dumps(response, default=serialization_helper)
)
if self.header_processor is not None:
self.header_processor.process_header(temp_response, request)
return response
self.router.get(
f"{self.sub_path}{{{key_name}}}/",
summary=f"Get a {model_name} document by by {key_name}",
response_description=f"Get a {model_name} document by {key_name}",
response_model=self.response_model,
response_model_exclude_unset=True,
tags=self.tags,
include_in_schema=self.include_in_schema,
)(get_by_key)