Skip to content

Commit 0594fa4

Browse files
feat: StorageWriter retries (#396)
1 parent 0a03c7a commit 0594fa4

20 files changed

+352
-191
lines changed

src/Configuration.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ export interface Configuration extends LoggingConfiguration, BitcoinRPCConfigura
4242
readonly batchCreationIntervalInSeconds: number
4343

4444
readonly readDirectoryIntervalInSeconds: number
45+
46+
readonly uploadClaimIntervalInSeconds: number
47+
readonly uploadClaimMaxAttempts: number
4548
}
4649

4750
export interface LoggingConfiguration {
@@ -67,6 +70,7 @@ export interface ExchangeConfiguration {
6770
readonly exchangeIpfsHashTxId: string
6871
readonly exchangePoetAnchorDownloaded: string
6972
readonly exchangeClaimsDownloaded: string
73+
readonly exchangeStorageWriterStoreNextClaim: string
7074
}
7175

7276
const defaultConfiguration: Configuration = {
@@ -106,6 +110,9 @@ const defaultConfiguration: Configuration = {
106110

107111
readDirectoryIntervalInSeconds: 30,
108112

113+
uploadClaimIntervalInSeconds: 30,
114+
uploadClaimMaxAttempts: 10,
115+
109116
forceBlockHeight: undefined,
110117

111118
exchangeBatchReaderReadNextDirectoryRequest: 'BATCH_READER::READ_NEXT_DIRECTORY_REQUEST',
@@ -117,6 +124,7 @@ const defaultConfiguration: Configuration = {
117124
exchangeIpfsHashTxId: 'IPFS_HASH_TX_ID',
118125
exchangePoetAnchorDownloaded: 'POET_ANCHOR_DOWNLOADED',
119126
exchangeClaimsDownloaded: 'CLAIMS_DOWNLOADED',
127+
exchangeStorageWriterStoreNextClaim: 'STORAGE_WRITER::STORE_NEXT_CLAIM',
120128
}
121129

122130
export const configurationPath = () => path.join(homedir(), '/.po.et/configuration.json')

src/StorageWriter/ClaimController.test.ts

Lines changed: 0 additions & 47 deletions
This file was deleted.
Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,85 @@
11
import { Claim } from '@po.et/poet-js'
22
import { inject, injectable } from 'inversify'
3-
import { Collection, Db } from 'mongodb'
43
import * as Pino from 'pino'
4+
import { pipeP } from 'ramda'
55

66
import { childWithFileName } from 'Helpers/Logging'
7-
import { Messaging } from 'Messaging/Messaging'
87

9-
import { ExchangeConfiguration } from './ExchangeConfiguration'
8+
import { DAOClaims } from './DAOClaims'
109
import { IPFS } from './IPFS'
1110

11+
enum LogTypes {
12+
info = 'info',
13+
trace = 'trace',
14+
error = 'error',
15+
}
16+
17+
interface StoreNextClaimData {
18+
readonly claim: Claim
19+
readonly ipfsFileHash?: string
20+
}
21+
1222
@injectable()
1323
export class ClaimController {
1424
private readonly logger: Pino.Logger
15-
private readonly db: Db
16-
private readonly collection: Collection
17-
private readonly messaging: Messaging
25+
private readonly daoClaims: DAOClaims
1826
private readonly ipfs: IPFS
19-
private readonly exchange: ExchangeConfiguration
2027

2128
constructor(
2229
@inject('Logger') logger: Pino.Logger,
23-
@inject('DB') db: Db,
24-
@inject('Messaging') messaging: Messaging,
25-
@inject('IPFS') ipfs: IPFS,
26-
@inject('ExchangeConfiguration') exchange: ExchangeConfiguration
30+
@inject('DAOClaims') daoClaims: DAOClaims,
31+
@inject('IPFS') ipfs: IPFS
2732
) {
2833
this.logger = childWithFileName(logger, __filename)
29-
this.db = db
30-
this.collection = this.db.collection('storageWriter')
31-
this.messaging = messaging
34+
this.daoClaims = daoClaims
3235
this.ipfs = ipfs
33-
this.exchange = exchange
3436
}
3537

36-
async create(claim: Claim): Promise<void> {
38+
private readonly log = (level: LogTypes) => (message: string) => async (value: any) => {
39+
const logger = this.logger
40+
logger[level]({ value }, message)
41+
return value
42+
}
43+
44+
public readonly create = async (claim: Claim): Promise<void> => {
3745
const logger = this.logger.child({ method: 'create' })
3846

39-
logger.trace({ claim }, 'Storing Claim')
47+
logger.trace({ claim }, 'Adding Claim')
48+
49+
await this.daoClaims.addClaim(claim)
4050

41-
const ipfsFileHash = await this.ipfs.addText(JSON.stringify(claim))
51+
logger.trace({ claim }, 'Added Claim')
52+
}
53+
54+
private readonly findNextClaim = async (): Promise<StoreNextClaimData> => {
55+
const claim = await this.daoClaims.findNextClaim()
56+
return { claim }
57+
}
4258

43-
logger.info({ claim, ipfsFileHash }, 'Claim Stored')
59+
private readonly uploadClaim = (claim: Claim) => this.ipfs.addText(JSON.stringify(claim))
4460

45-
await this.collection.insertOne({
46-
claimId: claim.id,
47-
ipfsFileHash,
48-
})
49-
await this.messaging.publish(this.exchange.claimIpfsHash, {
50-
claimId: claim.id,
61+
private readonly storeClaim = async (data: StoreNextClaimData): Promise<StoreNextClaimData> => {
62+
const { claim } = data
63+
const ipfsFileHash = await this.uploadClaim(claim)
64+
return {
65+
...data,
5166
ipfsFileHash,
52-
})
67+
}
5368
}
69+
70+
private readonly addIPFSHashToClaim = async (data: StoreNextClaimData): Promise<StoreNextClaimData> => {
71+
const { claim, ipfsFileHash } = data
72+
await this.daoClaims.addClaimHash(claim.id, ipfsFileHash)
73+
return data
74+
}
75+
76+
public storeNextClaim = pipeP(
77+
this.log(LogTypes.trace)('Finding Claim'),
78+
this.findNextClaim,
79+
this.log(LogTypes.trace)('Storing Claim'),
80+
this.storeClaim,
81+
this.log(LogTypes.trace)('Adding IPFS hash to Claim Entry'),
82+
this.addIPFSHashToClaim,
83+
this.log(LogTypes.trace)('Finished Storing Claim')
84+
)
5485
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { Claim } from '@po.et/poet-js'
2+
import { FindAndModifyWriteOpResultObject } from 'mongodb'
3+
import { describe, Try } from 'riteway'
4+
5+
import { getClaimFromFindAndUpdateResponse, throwIfClaimNotFound } from './DAOClaims'
6+
7+
describe('DOAClaims.getClaimFromFindAndUpdateResponse', async should => {
8+
const { assert } = should('return the correct value')
9+
10+
{
11+
const response: FindAndModifyWriteOpResultObject = {}
12+
assert({
13+
given: 'a response that does not contain a value',
14+
actual: getClaimFromFindAndUpdateResponse(response),
15+
expected: undefined,
16+
})
17+
}
18+
19+
{
20+
const response: FindAndModifyWriteOpResultObject = { value: {} }
21+
assert({
22+
given: 'a response that does not contain a claim',
23+
actual: getClaimFromFindAndUpdateResponse(response),
24+
expected: undefined,
25+
})
26+
}
27+
28+
{
29+
const claim = { id: 'bar' } as Claim
30+
const response: FindAndModifyWriteOpResultObject = {
31+
value: {
32+
claim,
33+
},
34+
}
35+
assert({
36+
given: 'a response that contians a claim',
37+
actual: getClaimFromFindAndUpdateResponse(response),
38+
expected: claim,
39+
})
40+
}
41+
})
42+
43+
describe('DOAClaims.throwIfClaimNotFound', async should => {
44+
const { assert } = should()
45+
46+
assert({
47+
given: 'null',
48+
should: 'throw',
49+
actual: Try(throwIfClaimNotFound, null),
50+
expected: new Error(),
51+
})
52+
53+
assert({
54+
given: 'undefined',
55+
should: 'throw',
56+
actual: Try(throwIfClaimNotFound, undefined),
57+
expected: new Error(),
58+
})
59+
60+
{
61+
const claim: Claim = { id: 'bar' } as Claim
62+
assert({
63+
given: 'a claim',
64+
should: 'return the claim',
65+
actual: throwIfClaimNotFound(claim),
66+
expected: claim,
67+
})
68+
}
69+
})

src/StorageWriter/DAOClaims.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { Claim } from '@po.et/poet-js'
2+
import { inject, injectable } from 'inversify'
3+
import { Collection, Db, FindAndModifyWriteOpResultObject } from 'mongodb'
4+
import { isNil, pipeP, lensPath, view } from 'ramda'
5+
6+
import { NoMoreEntriesException } from './Exceptions'
7+
8+
export interface DAOClaimsConfiguration {
9+
readonly maxStorageAttempts: number
10+
}
11+
12+
const L = {
13+
valueClaim: lensPath(['value', 'claim']),
14+
}
15+
16+
export const getClaimFromFindAndUpdateResponse = (response: FindAndModifyWriteOpResultObject): Claim | undefined =>
17+
view(L.valueClaim, response)
18+
19+
export const throwIfClaimNotFound = (claim: Claim): Claim => {
20+
if (isNil(claim)) throw new NoMoreEntriesException('No claims found')
21+
return claim
22+
}
23+
24+
@injectable()
25+
export class DAOClaims {
26+
private readonly collection: Collection
27+
private readonly maxStorageAttempts: number
28+
29+
constructor(@inject('DB') db: Db, @inject('DAOClaimsConfiguration') configuration: DAOClaimsConfiguration) {
30+
this.collection = db.collection('storageWriterClaims')
31+
this.maxStorageAttempts = configuration.maxStorageAttempts
32+
}
33+
34+
public readonly start = async () => {
35+
await this.collection.createIndex({ 'claim.id': 1 }, { unique: true })
36+
}
37+
38+
public readonly addClaim = async (claim: Claim) => {
39+
await this.collection.insertOne({ claim, storageAttempts: 0, ipfsFileHash: null })
40+
}
41+
42+
public readonly addClaimHash = async (claimId: string, ipfsFileHash: string) => {
43+
await this.collection.updateOne({ 'claim.id': claimId }, { $set: { ipfsFileHash } })
44+
}
45+
46+
private readonly findClaimToStore = () =>
47+
this.collection.findOneAndUpdate(
48+
{
49+
$and: [{ ipfsFileHash: null }, { storageAttempts: { $lt: this.maxStorageAttempts } }],
50+
},
51+
{
52+
$inc: { storageAttempts: 1 },
53+
$set: { lastStorageAttemptTime: new Date().getTime() },
54+
}
55+
)
56+
57+
public readonly findNextClaim = pipeP(
58+
this.findClaimToStore,
59+
getClaimFromFindAndUpdateResponse,
60+
throwIfClaimNotFound
61+
)
62+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { describe } from 'riteway'
2+
3+
import { NoMoreEntriesException, isNoMoreEntriesException } from './Exceptions'
4+
5+
describe('Exceptions.isNoMoreEntriesException', async should => {
6+
const { assert } = should('return the correct boolean')
7+
assert({
8+
given: 'a normal error',
9+
actual: isNoMoreEntriesException(new Error()),
10+
expected: false,
11+
})
12+
assert({
13+
given: 'a NoMoreEntriesException error',
14+
actual: isNoMoreEntriesException(new NoMoreEntriesException()),
15+
expected: true,
16+
})
17+
})

src/StorageWriter/Exceptions.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export class NoMoreEntriesException extends Error {
2+
constructor(message?: string) {
3+
super(message)
4+
}
5+
}
6+
7+
export const isNoMoreEntriesException = (err: any) => err instanceof NoMoreEntriesException

src/StorageWriter/ExchangeConfiguration.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export interface ExchangeConfiguration {
33
readonly newClaim?: string
44
readonly poetAnchorDownloaded?: string
55
readonly claimsDownloaded?: string
6+
readonly storageWriterStoreNextClaim: string
67
}

src/StorageWriter/IPFS.test.ts

Lines changed: 0 additions & 23 deletions
This file was deleted.

src/StorageWriter/IPFS.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ import { inject, injectable } from 'inversify'
33
import fetch from 'node-fetch'
44
import * as str from 'string-to-stream'
55

6-
import { IPFSConfiguration } from './IPFSConfiguration'
6+
export interface IPFSConfiguration {
7+
readonly ipfsUrl: string
8+
}
79

810
/**
911
* Wrapper around IPFS' RPC

0 commit comments

Comments
 (0)