/
index.ts
286 lines (251 loc) · 7.06 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
/**
* @packageDocumentation
*
* A Blockstore implementation that stores blocks on Amazon S3.
*
* @example Quickstart
*
* If the flag `createIfMissing` is not set or is false, then the bucket must be created prior to using blockstore-s3. Please see the AWS docs for information on how to configure the S3 instance. A bucket name is required to be set at the s3 instance level, see the below example.
*
* ```js
* import { S3 } from '@aws-sdk/client-s3'
* import { S3Blockstore } from 'blockstore-s3'
*
* const s3 = new S3({
* region: 'region',
* credentials: {
* accessKeyId: 'myaccesskey',
* secretAccessKey: 'mysecretkey'
* }
* })
*
* const store = new S3Blockstore(
* s3,
* 'my-bucket',
* { createIfMissing: false }
* )
* ```
*
* @example Using with Helia
*
* See [examples/helia](./examples/helia) for a full example of how to use Helia with an S3 backed blockstore.
*/
import {
PutObjectCommand,
CreateBucketCommand,
GetObjectCommand,
HeadObjectCommand,
DeleteObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import { BaseBlockstore } from 'blockstore-core/base'
import * as Errors from 'blockstore-core/errors'
import toBuffer from 'it-to-buffer'
import { fromString as unint8arrayFromString } from 'uint8arrays'
import { NextToLast, type ShardingStrategy } from './sharding.js'
import type { S3 } from '@aws-sdk/client-s3'
import type { Pair } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
export type { ShardingStrategy }
export interface S3BlockstoreInit {
/**
* Whether to try to create the bucket if it is missing when `.open` is called
*/
createIfMissing?: boolean
/**
* Control how CIDs map to paths and back
*/
shardingStrategy?: ShardingStrategy
}
/**
* A blockstore backed by AWS S3
*/
export class S3Blockstore extends BaseBlockstore {
public createIfMissing: boolean
private readonly s3: S3
private readonly bucket: string
private readonly shardingStrategy: ShardingStrategy
constructor (s3: S3, bucket: string, init?: S3BlockstoreInit) {
super()
if (s3 == null) {
throw new Error('An S3 instance must be supplied. See the blockstore-s3 README for examples.')
}
if (bucket == null) {
throw new Error('An bucket must be supplied. See the blockstore-s3 README for examples.')
}
this.s3 = s3
this.bucket = bucket
this.createIfMissing = init?.createIfMissing ?? false
this.shardingStrategy = init?.shardingStrategy ?? new NextToLast()
}
/**
* Store the given value under the key.
*/
async put (key: CID, val: Uint8Array, options?: AbortOptions): Promise<CID> {
try {
await this.s3.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: this.shardingStrategy.encode(key),
Body: val
}), {
abortSignal: options?.signal
}
)
return key
} catch (err: any) {
throw Errors.putFailedError(err)
}
}
/**
* Read from s3
*/
async get (key: CID, options?: AbortOptions): Promise<Uint8Array> {
try {
const data = await this.s3.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
)
if (data.Body == null) {
throw new Error('Response had no body')
}
// If a body was returned, ensure it's a Uint8Array
if (data.Body instanceof Uint8Array) {
return data.Body
}
if (typeof data.Body === 'string') {
return unint8arrayFromString(data.Body)
}
if (data.Body instanceof Blob) {
const buf = await data.Body.arrayBuffer()
return new Uint8Array(buf, 0, buf.byteLength)
}
// @ts-expect-error s3 types define their own Blob as an empty interface
return await toBuffer(data.Body)
} catch (err: any) {
if (err.statusCode === 404) {
throw Errors.notFoundError(err)
}
throw err
}
}
/**
* Check for the existence of the given key
*/
async has (key: CID, options?: AbortOptions): Promise<boolean> {
try {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
)
return true
} catch (err: any) {
// doesn't exist and permission policy includes s3:ListBucket
if (err.$metadata?.httpStatusCode === 404) {
return false
}
// doesn't exist, permission policy does not include s3:ListBucket
if (err.$metadata?.httpStatusCode === 403) {
return false
}
throw err
}
}
/**
* Delete the record under the given key
*/
async delete (key: CID, options?: AbortOptions): Promise<void> {
try {
await this.s3.send(
new DeleteObjectCommand({
Bucket: this.bucket,
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
)
} catch (err: any) {
throw Errors.deleteFailedError(err)
}
}
async * getAll (options?: AbortOptions): AsyncIterable<Pair> {
const params: Record<string, any> = {}
try {
while (true) {
const data = await this.s3.send(
new ListObjectsV2Command({
Bucket: this.bucket,
...params
}), {
abortSignal: options?.signal
}
)
if (options?.signal?.aborted === true) {
return
}
if (data == null || data.Contents == null) {
throw new Error('Not found')
}
for (const d of data.Contents) {
if (d.Key == null) {
throw new Error('Not found')
}
// Remove the path from the key
const cid = this.shardingStrategy.decode(d.Key)
yield {
cid,
block: await this.get(cid, options)
}
}
// If we didn't get all records, recursively query
if (data.IsTruncated === true) {
// If NextMarker is absent, use the key from the last result
params.StartAfter = data.Contents[data.Contents.length - 1].Key
// recursively fetch keys
continue
}
break
}
} catch (err: any) {
throw new Error(err.code)
}
}
/**
* This will check the s3 bucket to ensure access and existence
*/
async open (options?: AbortOptions): Promise<void> {
try {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: ''
}), {
abortSignal: options?.signal
}
)
} catch (err: any) {
if (err.statusCode !== 404) {
if (this.createIfMissing) {
await this.s3.send(
new CreateBucketCommand({
Bucket: this.bucket
}), {
abortSignal: options?.signal
}
)
return
}
throw Errors.openFailedError(err)
}
}
}
}