Skip to content

Commit

Permalink
Merge pull request #84 from oasysgames/feat/retry_when_error
Browse files Browse the repository at this point in the history
feat: retry when error. start at the last data fetched
  • Loading branch information
girafferz committed Mar 6, 2024
2 parents e6973f5 + 330fad5 commit 0c6da4a
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 164 deletions.
2 changes: 1 addition & 1 deletion src/cmd/cmdStakerReward.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Arguments, Argv } from 'yargs';
import { main } from '../execute/executeStakerReward';
import main from '../execute/executeStakerReward';
import { stakerRewardArgs } from '../types';
import { LogUtils } from '../utils/Logger';

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/cmdValidatorReward.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Arguments, Argv } from 'yargs';
import { main } from '../execute/executeValidatorReward';
import main from '../execute/executeValidatorReward';
import { validatorRewardArgs } from '../types';
import { LogUtils } from '../utils/Logger';

Expand Down
2 changes: 2 additions & 0 deletions src/contants/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const MAX_RETRIES = 5
export const RETRY_INTERVAL_MS = 5000
208 changes: 136 additions & 72 deletions src/execute/executeStakerReward.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import moment = require('moment-timezone');
import { GoogleSpreadsheet } from 'google-spreadsheet';
import { MAX_RETRIES, RETRY_INTERVAL_MS } from '../contants/common';
import {
exportCsv,
getAdditionalDataForStakerReward,
getEpoches,
getLastDataFetchedByEpoch,
getOasPricesForEpoch,
} from '../module/RewardStakes';
import {
DataExport,
PrepareData,
TimeData,
Verse,
stakerRewardArgs,
} from '../types';
import { PrepareData, TimeData, Verse, stakerRewardArgs } from '../types';
import { generateNumberArray, isValidAddresses, sleep } from '../utils';
import { convertAddressesToArray } from '../utils/convert';
import { getTotalSecondProcess } from '../utils/date';
Expand All @@ -23,48 +19,13 @@ import {
} from '../utils/google';
import { Subgraph } from '../utils/subgraph';

export const main = async (argv: stakerRewardArgs) => {
const startTimeProcess = Date.now();

// validate address
const addresses = convertAddressesToArray(argv.staker_addresses);
if (!isValidAddresses(addresses)) {
return;
}
const subgraph = new Subgraph(argv.chain as Verse);
// header for staker reward
const header: string[] = getHeader(argv);

// get the list of epoches based on the passed options
const epoches = await getEpoches(argv, subgraph);

let doc: GoogleSpreadsheet;
if (argv.export_csv_online) {
doc = await getSpreadSheet();
await doc.loadInfo();
}

const loopAsync: number[] = generateNumberArray(epoches.from, epoches.to);

const prepareData: PrepareData[] = await getPrepareData(
loopAsync,
subgraph,
argv,
);
// data to export
await handleExport(prepareData, subgraph, argv, header);

const totalSecondsProcess = getTotalSecondProcess(startTimeProcess);
console.log(`==> Total: ${totalSecondsProcess} seconds`);
};

const getHeader = (argv: stakerRewardArgs): string[] => {
let header: string[] = HEADER_FOR_STAKING_REWARD;
// if API_KEY exists and price option exists => export that price otherwise export default price
if (process.env.COINGECKO_API_KEY) {
header = argv.price
? [...header, 'Price timestamp UTC', 'Oas price']
: [...header, 'Price timestamp UTC', ...DEFAULT_LIST_PRICE];
? [...header, 'Oas price', 'Price timestamp UTC']
: [...header, ...DEFAULT_LIST_PRICE, 'Price timestamp UTC'];
}
return header;
};
Expand Down Expand Up @@ -113,57 +74,160 @@ const handleExport = async (
subgraph: Subgraph,
argv: stakerRewardArgs,
header: string[],
): Promise<DataExport[]> => {
) => {
// set the address to lowercase
const addresses = convertAddressesToArray(argv.staker_addresses);
let numberOfRetries = 0;

const results: DataExport[] = [];
let doc: GoogleSpreadsheet;
if (Boolean(argv.export_csv_online)) {
doc = await getSpreadSheet();
await doc.loadInfo();
}
for (let i = 0; i < prepareData.length; i++) {
try {
await processExportByEpoch(
prepareData[i],
addresses,
subgraph,
argv,
header,
doc,
);
} catch (error) {
console.log(error);
numberOfRetries += 1;
await sleep(RETRY_INTERVAL_MS);

console.log('\n----------Trying again----------');
console.log('\n----------Please wait!----------');

if (numberOfRetries > MAX_RETRIES) {
throw error;
}
i = -1; // Reset the loop counter to 0 after error
}
}
};

for (const item of prepareData) {
const processExportByEpoch = async (
item: PrepareData,
addresses: string[],
subgraph: Subgraph,
argv: stakerRewardArgs,
header: string[],
doc: GoogleSpreadsheet,
) => {
try {
const { oasPrices, timeData, priceTime } = item;
const { block, epoch, timestamp } = timeData;
const startTimeProcess = Date.now();
console.log('PROCESSING WITH EPOCH', epoch);

const promises = addresses?.map(async (address: string) => {
const listStakerStake = await subgraph.getListStakerStake(
block,
address,
epoch,
);
const result = await getLastDataFetchedByEpoch(
doc,
header,
argv,
timestamp,
'Staker Address',
'staker-reward',
argv.output
);

// format data
const { rowData } = getAdditionalDataForStakerReward(
oasPrices,
listStakerStake,
timeData,
argv.price,
address,
priceTime,
);
if (Number(epoch) < Number(result.epoch)) {
return;
}

await sleep(100);
const startTimeProcess = Date.now();
console.log('PROCESSING WITH EPOCH', epoch);

return {
rowData,
timestamp,
};
const promises = [];

addresses.forEach(async (address: string) => {
const validatorAddress = address;

if (
!(result.epoch == epoch && result.addresses.includes(validatorAddress))
) {
const promise = (async () => {
const listStakerStake = await subgraph.getListStakerStake(
block,
address,
epoch,
);

// format data
const { rowData } = getAdditionalDataForStakerReward(
oasPrices,
listStakerStake,
timeData,
argv.price,
address,
priceTime,
);

await sleep(100);

return {
rowData,
timestamp,
};
})();

promises.push(promise);
}
});

if (promises?.length == 0) {
return;
}

const dataExport = await Promise.all(promises);

// process export
await exportCsv(
dataExport,
Boolean(argv.export_csv_online),
argv.output,
`staker-reward`,
'staker-reward',
header,
doc,
);
results.push(...dataExport);

const totalSecondsEpoch = getTotalSecondProcess(startTimeProcess);
console.info(
`-->Export at Epoch ${epoch} took ${totalSecondsEpoch} seconds`,
);
} catch (error) {
throw error;
}
return results;
};

const main = async (argv: stakerRewardArgs) => {
const startTimeProcess = Date.now();

// validate address
const addresses = convertAddressesToArray(argv.staker_addresses);
if (!isValidAddresses(addresses)) {
return;
}
const subgraph = new Subgraph(argv.chain as Verse);
// header for staker reward
const header: string[] = getHeader(argv);

// get the list of epoches based on the passed options
const epoches = await getEpoches(argv, subgraph);

const loopAsync: number[] = generateNumberArray(epoches.from, epoches.to);

const prepareData: PrepareData[] = await getPrepareData(
loopAsync,
subgraph,
argv,
);
// data to export
await handleExport(prepareData, subgraph, argv, header);

const totalSecondsProcess = getTotalSecondProcess(startTimeProcess);
console.log(`==> Total: ${totalSecondsProcess} seconds`);
};

export default main;

0 comments on commit 0c6da4a

Please sign in to comment.