Skip to content

Commit

Permalink
feat(#84): separate request to get pending, and null if unknown
Browse files Browse the repository at this point in the history
  • Loading branch information
witash committed Jul 31, 2024
1 parent e676313 commit 31c01a4
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 17 deletions.
91 changes: 91 additions & 0 deletions couch2pg/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions couch2pg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"author": "",
"license": "ISC",
"dependencies": {
"axios": "^1.7.2",
"dotenv": "^16.4.5",
"pg": "^8.11.5",
"pouchdb-adapter-http": "^8.0.1",
Expand Down
14 changes: 7 additions & 7 deletions couch2pg/src/importer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const BATCH_SIZE = process.env.BATCH_SIZE || 1000;

import * as db from './db.js';
import axios from 'axios';

const SELECT_SEQ_STMT = `SELECT seq FROM ${db.postgresProgressTable} WHERE source = $1`;
const INSERT_SEQ_STMT = `
Expand Down Expand Up @@ -41,10 +42,6 @@ const removeSecurityDetails = (doc) => {
}
};

const getNumberFromSeq = (seq) => {
return Number(seq.split('-')[0]);
};

const getSeq = async (source) => {
const client = await db.getPgClient();
const result = await client.query(SELECT_SEQ_STMT, [source]);
Expand Down Expand Up @@ -142,7 +139,7 @@ const importChangesBatch = async (couchDb, source) => {
pending = await getPending(couchDb, seq);
} catch (error) {
console.error('Error getting pending:', error);
pending = 0;
pending = null;
}

const changes = await couchDb.changes({ limit: BATCH_SIZE, since: seq, seq_interval: BATCH_SIZE });
Expand All @@ -164,8 +161,11 @@ const importChangesBatch = async (couchDb, source) => {
};

const getPending = async (couchDb, seq) => {
const info = await couchDb.info();
return getNumberFromSeq(info.update_seq) - getNumberFromSeq(seq);
const res = await axios.get(`${couchDb.name}/_changes?limit=0&since=${seq}`);
if (res.status === 200 && res.data?.pending) {
return res.data.pending;
}
return null;
};

export default async (couchdb) => {
Expand Down
20 changes: 10 additions & 10 deletions couch2pg/tests/unit/importer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('importer', () => {

expect(pgClient.query.calledTwice).to.equal(true);
expect(pgClient.query.args[0]).to.deep.equal([getSeqMatch(), ['thehost/medic']]);
expect(pgClient.query.args[1]).to.deep.equal([updateSeqMatch(), ['21-vvv', 0, 'thehost/medic']]);
expect(pgClient.query.args[1]).to.deep.equal([updateSeqMatch(), ['21-vvv', null, 'thehost/medic']]);
});

it('should start with 0 seq if no checkpointer is found', async () => {
Expand All @@ -94,7 +94,7 @@ describe('importer', () => {
expect(pgClient.query.calledThrice).to.equal(true);
expect(pgClient.query.args[0]).to.deep.equal([getSeqMatch(), ['host/db']]);
expect(pgClient.query.args[1]).to.deep.equal([insertSeqMatch(), [0, null, 'host/db']]);
expect(pgClient.query.args[2]).to.deep.equal([updateSeqMatch(), ['73-1', 0, 'host/db']]);
expect(pgClient.query.args[2]).to.deep.equal([updateSeqMatch(), ['73-1', null, 'host/db']]);
});

it('should start with checkpointer seq when found', async () => {
Expand Down Expand Up @@ -135,8 +135,8 @@ describe('importer', () => {

expect(seqQueries.update.calledTwice).to.equal(true);
expect(seqQueries.update.args).to.deep.equal([
[updateSeqMatch(), ['23-ppp', 0, 'thehost/medic']],
[updateSeqMatch(), ['25-vvv', 0, 'thehost/medic']],
[updateSeqMatch(), ['23-ppp', null, 'thehost/medic']],
[updateSeqMatch(), ['25-vvv', null, 'thehost/medic']],
]);

expect(couchDb.allDocs.calledOnce).to.equal(true);
Expand Down Expand Up @@ -206,10 +206,10 @@ describe('importer', () => {

expect(seqQueries.update.callCount).to.equal(4);
expect(seqQueries.update.args).to.deep.equal([
[updateSeqMatch(), ['3-seq', 0, 'thehost/medic']],
[updateSeqMatch(), ['6-seq', 0, 'thehost/medic']],
[updateSeqMatch(), ['9-seq', 0, 'thehost/medic']],
[updateSeqMatch(), ['9-seq', 0, 'thehost/medic']],
[updateSeqMatch(), ['3-seq', null, 'thehost/medic']],
[updateSeqMatch(), ['6-seq', null, 'thehost/medic']],
[updateSeqMatch(), ['9-seq', null, 'thehost/medic']],
[updateSeqMatch(), ['9-seq', null, 'thehost/medic']],
]);

expect(couchDb.allDocs.callCount).to.equal(3);
Expand Down Expand Up @@ -500,8 +500,8 @@ describe('importer', () => {

expect(seqQueries.update.calledTwice).to.equal(true);
expect(seqQueries.update.args).to.deep.equal([
[updateSeqMatch(), ['23-ppp', 0, 'thehost/medic']],
[updateSeqMatch(), ['25-vvv', 0, 'thehost/medic']],
[updateSeqMatch(), ['23-ppp', null, 'thehost/medic']],
[updateSeqMatch(), ['25-vvv', null, 'thehost/medic']],
]);

expect(couchDb.allDocs.calledOnce).to.equal(true);
Expand Down

0 comments on commit 31c01a4

Please sign in to comment.