Skip to content

Commit

Permalink
feat: new populatedFields option
Browse files Browse the repository at this point in the history
If `true`, fetches a set of random documents to identify which fields are actually used by
documents. Can be useful for indices with lots of field mappings to increase query/reindex
performance. Defaults to `false`.
  • Loading branch information
walterra committed Nov 7, 2023
1 parent 9962382 commit abc9a06
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -118,6 +118,7 @@ transformer({
- `mappings`: Optional Elasticsearch document mappings. If not set and you're reindexing from another index, the mappings from the existing index will be used.
- `mappingsOverride`: If you're reindexing and this is set to `true`, `mappings` will be applied on top of the source index's mappings. Defaults to `false`.
- `indexMappingTotalFieldsLimit`: Optional field limit for the target index to be created that will be passed on as the `index.mapping.total_fields.limit` setting.
- `populatedFields`: If `true`, fetches a set of random documents to identify which fields are actually used by documents. Can be useful for indices with lots of field mappings to increase query/reindex performance. Defaults to `false`.
- `query`: Optional Elasticsearch [DSL query](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html) to filter documents from the source index.
- `skipHeader`: If true, skips the first line of the source file. Defaults to `false`.
- `transform(line)`: A callback function which allows the transformation of a source line into one or several documents.
Expand Down
39 changes: 35 additions & 4 deletions src/_index-reader.js
Expand Up @@ -14,37 +14,68 @@ export default function indexReaderFactory(
client,
query,
bufferSize = DEFAULT_BUFFER_SIZE,
populatedFields = false,
) {
return async function indexReader() {
const responseQueue = [];
let docsNum = 0;

function search() {
async function fetchPopulatedFields() {
try {
const response = await client.search({
index: sourceIndexName,
size: bufferSize,
query: {
function_score: {
query,
random_score: {},
},
},
});

// Get all field names for each returned doc and flatten it
// to a list of unique field names used across all docs.
return new Set(response.hits.hits.map(d => Object.keys(d._source)).flat(1));
} catch (e) {
console.log('error', e);
}
}

function search(fields) {
return client.search({
index: sourceIndexName,
scroll: '30s',
scroll: '600s',
size: bufferSize,
query,
...(fields ? { _source: fields } : {}),
});
}

function scroll(id) {
return client.scroll({
scroll_id: id,
scroll: '30s',
scroll: '600s',
});
}

let fieldsWithData;

// identify populated fields
if (populatedFields) {
fieldsWithData = await fetchPopulatedFields();
}

// start things off by searching, setting a scroll timeout, and pushing
// our first response into the queue to be processed
const se = await search();
const se = await search(fieldsWithData);
responseQueue.push(se);
progressBar.start(se.hits.total.value, 0);

function processHit(hit) {
docsNum += 1;
try {
const doc = typeof transform === 'function' ? transform(hit._source) : hit._source; // eslint-disable-line no-underscore-dangle

// if doc is undefined we'll skip indexing it
if (typeof doc === 'undefined') {
return;
Expand Down
2 changes: 2 additions & 0 deletions src/main.js
Expand Up @@ -18,6 +18,7 @@ export default async function transformer({
mappings,
mappingsOverride = false,
indexMappingTotalFieldsLimit,
populatedFields = false,
query,
skipHeader = false,
transform,
Expand Down Expand Up @@ -75,6 +76,7 @@ export default async function transformer({
sourceClient,
query,
bufferSize,
populatedFields,
);
}

Expand Down

0 comments on commit abc9a06

Please sign in to comment.