/
index.ts
323 lines (285 loc) · 8.42 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
/**
* @packageDocumentation
*
* A Datastore implementation that stores data on Amazon S3.
*
* @example Quickstart
*
* If the flag `createIfMissing` is not set or is false, then the bucket must be created prior to using datastore-s3. Please see the AWS docs for information on how to configure the S3 instance. A bucket name is required to be set at the s3 instance level, see the below example.
*
* ```js
* import { S3 } from '@aws-sdk/client-s3'
* import { S3Datastore } from 'datastore-s3'
*
* const s3 = new S3({
* region: 'region',
* credentials: {
* accessKeyId: 'myaccesskey',
* secretAccessKey: 'mysecretkey'
* }
* })
*
* const store = new S3Datastore(
* s3,
* 'my-bucket',
* { path: '.ipfs/datastore', createIfMissing: false }
* )
* ```
*
* @example Using with Helia
*
* See [examples/helia](./examples/helia) for a full example of how to use Helia with an S3 backed datastore.
*/
import {
PutObjectCommand,
CreateBucketCommand,
GetObjectCommand,
HeadObjectCommand,
DeleteObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import { BaseDatastore } from 'datastore-core/base'
import * as Errors from 'datastore-core/errors'
import { Key, type KeyQuery, type Pair, type Query } from 'interface-datastore'
import filter from 'it-filter'
import toBuffer from 'it-to-buffer'
import { fromString as unint8arrayFromString } from 'uint8arrays'
import type { S3 } from '@aws-sdk/client-s3'
import type { AbortOptions } from 'interface-store'
export interface S3DatastoreInit {
/**
* An optional path to use within the bucket for all files - this setting can
* affect S3 performance as it does internal sharding based on 'prefixes' -
* these can be delimited by '/' so it's often better to wrap this datastore in
* a sharding datastore which will generate prefixed datastore keys for you.
*
* See - https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
* and https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
*/
path?: string
/**
* Whether to try to create the bucket if it is missing when `.open` is called
*/
createIfMissing?: boolean
}
/**
* A datastore backed by AWS S3
*/
export class S3Datastore extends BaseDatastore {
public path?: string
public createIfMissing: boolean
private readonly s3: S3
private readonly bucket: string
constructor (s3: S3, bucket: string, init?: S3DatastoreInit) {
super()
if (s3 == null) {
throw new Error('An S3 instance must be supplied. See the datastore-s3 README for examples.')
}
if (bucket == null) {
throw new Error('An bucket must be supplied. See the datastore-s3 README for examples.')
}
this.path = init?.path
this.s3 = s3
this.bucket = bucket
this.createIfMissing = init?.createIfMissing ?? false
}
/**
* Returns the full key which includes the path to the ipfs store
*/
_getFullKey (key: Key): string {
// Avoid absolute paths with s3
return [this.path, key.toString()].filter(Boolean).join('/').replace(/\/\/+/g, '/')
}
/**
* Store the given value under the key.
*/
async put (key: Key, val: Uint8Array, options?: AbortOptions): Promise<Key> {
try {
await this.s3.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key),
Body: val
}), {
abortSignal: options?.signal
}
)
return key
} catch (err: any) {
throw Errors.dbWriteFailedError(err)
}
}
/**
* Read from s3
*/
async get (key: Key, options?: AbortOptions): Promise<Uint8Array> {
try {
const data = await this.s3.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
// @ts-expect-error the AWS AbortSignal types are different to the @types/node version
}, {
abortSignal: options?.signal
})
)
if (data.Body == null) {
throw new Error('Response had no body')
}
// If a body was returned, ensure it's a Uint8Array
if (data.Body instanceof Uint8Array) {
return data.Body
}
if (typeof data.Body === 'string') {
return unint8arrayFromString(data.Body)
}
if (data.Body instanceof Blob) {
const buf = await data.Body.arrayBuffer()
return new Uint8Array(buf, 0, buf.byteLength)
}
// @ts-expect-error s3 types define their own Blob as an empty interface
return await toBuffer(data.Body)
} catch (err: any) {
if (err.statusCode === 404) {
throw Errors.notFoundError(err)
}
throw err
}
}
/**
* Check for the existence of the given key
*/
async has (key: Key, options?: AbortOptions): Promise<boolean> {
try {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
}), {
abortSignal: options?.signal
}
)
return true
} catch (err: any) {
// doesn't exist and permission policy includes s3:ListBucket
if (err.$metadata?.httpStatusCode === 404) {
return false
}
// doesn't exist, permission policy does not include s3:ListBucket
if (err.$metadata?.httpStatusCode === 403) {
return false
}
throw err
}
}
/**
* Delete the record under the given key
*/
async delete (key: Key, options?: AbortOptions): Promise<void> {
try {
await this.s3.send(
new DeleteObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
}), {
abortSignal: options?.signal
}
)
} catch (err: any) {
throw Errors.dbDeleteFailedError(err)
}
}
/**
* Recursively fetches all keys from s3
*/
async * _listKeys (params: { Prefix?: string, StartAfter?: string }, options?: AbortOptions): AsyncIterable<Key> {
try {
const data = await this.s3.send(
new ListObjectsV2Command({
Bucket: this.bucket,
...params
}), {
abortSignal: options?.signal
}
)
if (options?.signal?.aborted === true) {
return
}
if (data == null || data.Contents == null) {
throw new Error('Not found')
}
for (const d of data.Contents) {
if (d.Key == null) {
throw new Error('Not found')
}
// Remove the path from the key
yield new Key(d.Key.slice((this.path ?? '').length), false)
}
// If we didn't get all records, recursively query
if (data.IsTruncated === true) {
// If NextMarker is absent, use the key from the last result
params.StartAfter = data.Contents[data.Contents.length - 1].Key
// recursively fetch keys
yield * this._listKeys(params)
}
} catch (err: any) {
throw new Error(err.code)
}
}
async * _all (q: Query, options?: AbortOptions): AsyncIterable<Pair> {
for await (const key of this._allKeys({ prefix: q.prefix }, options)) {
try {
const res: Pair = {
key,
value: await this.get(key, options)
}
yield res
} catch (err: any) {
// key was deleted while we are iterating over the results
if (err.statusCode !== 404) {
throw err
}
}
}
}
async * _allKeys (q: KeyQuery, options?: AbortOptions): AsyncIterable<Key> {
const prefix = [this.path, q.prefix ?? ''].filter(Boolean).join('/').replace(/\/\/+/g, '/')
// Get all the keys via list object, recursively as needed
let it = this._listKeys({
Prefix: prefix
}, options)
if (q.prefix != null) {
it = filter(it, k => k.toString().startsWith(`${q.prefix ?? ''}`))
}
yield * it
}
/**
* This will check the s3 bucket to ensure access and existence
*/
async open (options?: AbortOptions): Promise<void> {
try {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this.path ?? ''
}), {
abortSignal: options?.signal
}
)
} catch (err: any) {
if (err.statusCode !== 404) {
if (this.createIfMissing) {
await this.s3.send(
new CreateBucketCommand({
Bucket: this.bucket
}), {
abortSignal: options?.signal
}
)
return
}
throw Errors.dbOpenFailedError(err)
}
}
}
}