Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
08bc950
chore: use default flag messages
mshanemc Feb 27, 2024
9c0490d
chore: don't support dynamic bin/separator
mshanemc Feb 27, 2024
3af0705
refactor: correct async stream listeners, more functions
mshanemc Feb 27, 2024
cd94e94
refactor: oclif option flag
mshanemc Feb 27, 2024
27507a4
refactor: minor flags, batcher, v2 type changes
mshanemc Feb 27, 2024
2f8b73f
refactor: v2 bulk types, prune base command
mshanemc Feb 27, 2024
97e323a
fix: don't send 0 polling timeout
mshanemc Feb 27, 2024
51958e4
test: all nuts passing
mshanemc Feb 27, 2024
7049dcb
refactor: pruning base command and messages
mshanemc Feb 27, 2024
8c973fe
refactor: remove job from base
mshanemc Feb 27, 2024
8df23c0
refactor: functions and nut fix
mshanemc Feb 27, 2024
3bf3fda
chore: partial refactor of reporters
mshanemc Feb 28, 2024
a0787df
refactor: split humanReporter
mshanemc Feb 28, 2024
42f7c06
refactor: extract csvReporter
mshanemc Feb 28, 2024
0f4d057
refactor: humanReporter
mshanemc Feb 28, 2024
a45c00e
refactor: csvReporter with more UT testability
mshanemc Feb 28, 2024
651725d
style: nicer results banner
mshanemc Feb 29, 2024
2ad8f94
test: handy file for manual QA use
mshanemc Feb 29, 2024
fcd9a3e
feat: use jsforce polling inProgress event
mshanemc Feb 29, 2024
0e4183a
feat: allow tooling with bulk
mshanemc Feb 29, 2024
e457f51
refactor: remove more dynamic "sfdx" and ":" in messages
mshanemc Feb 29, 2024
675a0af
test: remove only
mshanemc Feb 29, 2024
02e4e38
test: nut change for output change
mshanemc Feb 29, 2024
3fd5223
style: formatting
mshanemc Feb 29, 2024
2d94c5f
chore: back to regular jsforce
mshanemc Feb 29, 2024
8c54d70
fix: do json last
mshanemc Feb 29, 2024
d59076a
chore: use jsforce-node
mshanemc Mar 1, 2024
3b7780b
feat: return after job open for async query
mshanemc Mar 1, 2024
bc8dd6e
chore: lockfile
mshanemc Mar 1, 2024
7bf7903
refactor: generic tuple/object
mshanemc Mar 1, 2024
20871a6
test: testing subquery logic
mshanemc Mar 1, 2024
f3a2ff4
refactor: slit reporter tests to match code
mshanemc Mar 1, 2024
20ece2e
chore: restore tsconfigs
mshanemc Mar 1, 2024
9e47fcf
chore: dep cleanup
mshanemc Mar 1, 2024
1a7dff0
Merge remote-tracking branch 'origin/main' into sm/qa-jsforce-3-bulk
mshanemc Mar 20, 2024
f51037a
chore: remove log util
mshanemc Mar 20, 2024
c288317
chore: respects wait time with bulk query
mshanemc Mar 21, 2024
d93b682
refactor: construct a bulk2 instead of using the one from Connection
mshanemc Mar 21, 2024
6382014
refactor: restore exclusivity
mshanemc Mar 21, 2024
554a5ad
refactor: spinner output
mshanemc Mar 21, 2024
98df3de
chore: parenetheses matter for ??
mshanemc Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions messages/batcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Batch Status

Will poll the batch statuses every %s seconds.
To fetch the status on your own, press CTRL+C and use the command:
%s force%sdata%sbulk%sstatus -i %s -b [<batchId>]
sf force data bulk status -i %s -b [<batchId>]

# ExternalIdRequired

Expand All @@ -27,12 +27,12 @@ An External ID is required on %s to perform an upsert.
# TimeOut

The operation timed out. Check the status with command:
%s force%sdata%sbulk%sstatus -i %s -b %s
sf force data bulk status -i %s -b %s

# CheckStatusCommand

Check batch #%s’s status with the command:
%s force%sdata%sbulk%sstatus -i %s -b %s
sf force data bulk status -i %s -b %s

# BatchQueued

Expand Down
8 changes: 4 additions & 4 deletions messages/bulk.base.command.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ Bulk %s request %s started successfully

# checkStatus

Run command %s data %s resume -i %s -o %s to check status.
Run command sf data %s resume -i %s -o %s to check status.

# checkJobViaUi

To review the details of this job, run:
%s org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s"
sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s"

# remainingTimeStatus

Remaining time: %d minutes.
Remaining time: %d minutes

# remainingRecordsStatus

%d/%d/%d records successful/failed/processed.
Processed %d | Success %d | Fail %d

# bulkJobFailed

Expand Down
4 changes: 0 additions & 4 deletions messages/bulk.resume.command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ ID of the job you want to resume.

Use the ID of the most recently-run bulk job.

# flags.targetOrg.summary

Org alias or username to use for the target org.

# flags.wait.summary

Number of minutes to wait for the command to complete before displaying the results.
4 changes: 2 additions & 2 deletions messages/importApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ Data plan file %s did not validate against the schema. Errors: %s.

# dataPlanValidationErrorActions

- Did you run the "%s data%sexport%stree" command with the --plan flag?
- Did you run the "sf data export tree" command with the --plan flag?

- Make sure you're importing a plan definition file.

- Get help with the import plan schema by running "%s data%simport%stree --config-help".
- Get help with the import plan schema by running "sf data import tree --config-help".

# FlsError

Expand Down
2 changes: 1 addition & 1 deletion messages/reporter.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# bulkV2Result

Job %s Status %s Records processed %d. Records failed %d.
Job %s | Status %s | Records processed %d | Records failed %d
4 changes: 0 additions & 4 deletions messages/soql.query.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ Include deleted records. By default, deleted records are not returned.

Time to wait for the command to finish, in minutes.

# flags.targetOrg.summary

Org alias or username to use for the target org.

# displayQueryRecordsRetrieved

Total number of records retrieved: %s.
Expand Down
6 changes: 2 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"version": "oclif readme"
},
"dependencies": {
"@jsforce/jsforce-node": "^3.0.0-next.2",
"@oclif/core": "^3.23.0",
"@salesforce/core": "^6.7.1",
"@salesforce/kit": "^3.0.15",
Expand All @@ -116,16 +117,13 @@
"chalk": "^5.3.0",
"change-case": "^5.4.3",
"csv-parse": "^4.16.3",
"csv-stringify": "^6.4.6",
"jsforce": "^2.0.0-beta.29"
"csv-stringify": "^6.4.6"
},
"devDependencies": {
"@oclif/plugin-command-snapshot": "^5.1.1",
"@salesforce/cli-plugins-testkit": "^5.1.11",
"@salesforce/dev-scripts": "^8.4.2",
"@salesforce/plugin-command-reference": "^3.0.70",
"@types/chai-as-promised": "^7.1.8",
"chai-as-promised": "^7.1.1",
"eslint-plugin-sf-plugin": "^1.17.4",
"oclif": "^4.5.5",
"ts-node": "^10.9.2",
Expand Down
214 changes: 105 additions & 109 deletions src/BulkBaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,128 +5,124 @@
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/


import { SfCommand } from '@salesforce/sf-plugins-core';
import { BulkOperation, IngestJobV2, IngestOperation, JobInfoV2, JobStateV2 } from 'jsforce/lib/api/bulk.js';
import { SfCommand, Spinner } from '@salesforce/sf-plugins-core';
import { IngestJobV2, JobInfoV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js';
import { Duration } from '@salesforce/kit';
import { capitalCase } from 'change-case';
import { Connection, Lifecycle, Messages } from '@salesforce/core';
import { Schema } from 'jsforce';
import { getResultMessage } from './reporters.js';
import { BulkResultV2 } from './types.js';
import { Messages } from '@salesforce/core';
import { Schema } from '@jsforce/jsforce-node';
import { getResultMessage } from './reporters/reporters.js';
import { BulkDataRequestCache } from './bulkDataRequestCache.js';

Messages.importMessagesDirectoryFromMetaUrl(import.meta.url)
Messages.importMessagesDirectoryFromMetaUrl(import.meta.url);
const messages = Messages.loadMessages('@salesforce/plugin-data', 'bulk.base.command');

export abstract class BulkBaseCommand extends SfCommand<BulkResultV2> {
public static readonly enableJsonFlag = true;
protected lifeCycle = Lifecycle.getInstance();
protected job!: IngestJobV2<Schema, IngestOperation>;
protected connection: Connection | undefined;
protected cache: BulkDataRequestCache | undefined;
protected isAsync = false;
protected operation!: BulkOperation;
protected endWaitTime = 0;
protected wait = 0;
private numberRecordsProcessed = 0;
private numberRecordsFailed = 0;
private numberRecordSucceeded = 0;
private timeout = false;
export const setupLifecycleListeners = ({
job,
cache,
username,
apiVersion,
cmd,
isAsync,
endWaitTime,
}: {
job: IngestJobV2<Schema>;
cache?: BulkDataRequestCache;
username?: string;
apiVersion?: string;
cmd: SfCommand<unknown>;
isAsync: boolean;
endWaitTime: number;
}): void => {
// the event emitted by jsforce's polling function
job.on('inProgress', (jobInfo: JobInfoV2) => {
cmd.spinner.status = formatSpinnerProgress(isAsync, endWaitTime, jobInfo);
});
// the event emitted other places in the plugin
job.on('jobProgress', () => {
const handler = async (): Promise<void> => {
const jobInfo = await job.check();
cmd.spinner.status = formatSpinnerProgress(isAsync, endWaitTime, jobInfo);
};
handler().catch((err) => eventListenerErrorHandler(err));
});

job.on('failed', throwAndStopSpinner(cmd.spinner));
job.on('error', throwAndStopSpinner(cmd.spinner));

job.once('jobTimeout', () => {
const handler = async (): Promise<void> => {
await cache?.createCacheEntryForRequest(job.id ?? '', username, apiVersion);
displayBulkV2Result({ jobInfo: await job.check(), username, isAsync, cmd });
};
handler().catch((err) => eventListenerErrorHandler(err));
});
};

protected displayBulkV2Result(jobInfo: JobInfoV2): void {
if (this.isAsync) {
this.logSuccess(messages.getMessage('success', [this.operation, jobInfo.id]));
this.info(
messages.getMessage('checkStatus', [
this.config.bin,
this.operation,
jobInfo.id,
this.connection?.getUsername(),
])
);
} else {
this.log();
this.info(getResultMessage(jobInfo));
if ((jobInfo.numberRecordsFailed ?? 0) > 0 || jobInfo.state === 'Failed') {
this.info(messages.getMessage('checkJobViaUi', [this.config.bin, this.connection?.getUsername(), jobInfo.id]));
process.exitCode = 1;
}
if (jobInfo.state === 'InProgress' || jobInfo.state === 'Open') {
this.info(
messages.getMessage('checkStatus', [
this.config.bin,
this.operation,
jobInfo.id,
this.connection?.getUsername(),
])
);
}
if (jobInfo.state === 'Failed') {
throw messages.createError('bulkJobFailed', [jobInfo.id]);
}
export const displayBulkV2Result = ({
jobInfo,
isAsync,
cmd,
username = 'unspecified user',
}: {
jobInfo: JobInfoV2;
isAsync: boolean;
cmd: SfCommand<unknown>;
username?: string;
}): void => {
if (isAsync && jobInfo.state !== 'JobComplete' && jobInfo.state !== 'Failed') {
cmd.logSuccess(messages.getMessage('success', [jobInfo.operation, jobInfo.id]));
cmd.info(messages.getMessage('checkStatus', [jobInfo.operation, jobInfo.id, username]));
} else {
cmd.log();
cmd.info(getResultMessage(jobInfo));
if ((jobInfo.numberRecordsFailed ?? 0) > 0 || jobInfo.state === 'Failed') {
cmd.info(messages.getMessage('checkJobViaUi', [username, jobInfo.id]));
process.exitCode = 1;
}
if (jobInfo.state === 'InProgress' || jobInfo.state === 'Open') {
cmd.info(messages.getMessage('checkStatus', [jobInfo.operation, jobInfo.id, username]));
}
if (jobInfo.state === 'Failed') {
throw messages.createError('bulkJobFailed', [jobInfo.id]).setData(jobInfo);
}
}
};

protected setupLifecycleListeners(): void {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.job.on('jobProgress', async () => {
const jobInfo = await this.job.check();
this.numberRecordsProcessed = jobInfo.numberRecordsProcessed ?? 0;
this.numberRecordsFailed = jobInfo.numberRecordsFailed ?? 0;
this.numberRecordSucceeded = this.numberRecordsProcessed - this.numberRecordsFailed;
this.spinner.status = `${this.getRemainingTimeStatus()}${this.getStage(
jobInfo.state
)}${this.getRemainingRecordsStatus()}`;
});
const eventListenerErrorHandler = (err: unknown): never => {
throw err instanceof Error || typeof err === 'string' ? err : JSON.stringify(err);
};

this.job.on('failed', (err: Error) => {
try {
this.error(err);
} finally {
this.spinner.stop();
}
});
const throwAndStopSpinner =
(spinner: Spinner) =>
(err: Error): void => {
try {
throw err;
} finally {
spinner.stop();
}
};

this.job.on('error', (message: string) => {
try {
this.error(message);
} finally {
this.spinner.stop();
}
});
export const getRemainingTimeStatus = ({ isAsync, endWaitTime }: { isAsync: boolean; endWaitTime: number }): string =>
isAsync ? '' : messages.getMessage('remainingTimeStatus', [Duration.milliseconds(endWaitTime - Date.now()).minutes]);

// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.job.on('jobTimeout', async () => {
if (!this.timeout) {
this.timeout = true;
await this.cache?.createCacheEntryForRequest(
this.job.id ?? '',
this.connection?.getUsername(),
this.connection?.getApiVersion()
);
this.displayBulkV2Result(await this.job.check());
}
});
}
const formatSpinnerProgress = (isAsync: boolean, endWaitTime: number, jobInfo: JobInfoV2): string =>
`${getRemainingTimeStatus({
isAsync,
endWaitTime,
})} | ${getStage(jobInfo.state)} | ${getRemainingRecordsStatus(jobInfo)}`;

protected getRemainingTimeStatus(): string {
return this.isAsync
? ''
: messages.getMessage('remainingTimeStatus', [Duration.milliseconds(this.endWaitTime - Date.now()).minutes]);
}
const getStage = (state: JobInfoV2['state']): string => ` Stage: ${capitalCase(state)}`;

protected getRemainingRecordsStatus(): string {
// the leading space is intentional
return ` ${messages.getMessage('remainingRecordsStatus', [
this.numberRecordSucceeded,
this.numberRecordsFailed,
this.numberRecordsProcessed,
])}`;
}
const getRemainingRecordsStatus = (jobInfo: JobInfoV2): string => {
const numberRecordsProcessed = jobInfo.numberRecordsProcessed ?? 0;
const numberRecordsFailed = jobInfo.numberRecordsFailed ?? 0;
const numberRecordSucceeded = numberRecordsProcessed - numberRecordsFailed;

// eslint-disable-next-line class-methods-use-this
protected getStage(state: JobStateV2): string {
return ` Stage: ${capitalCase(state)}.`;
}
}
// the leading space is intentional
return ` ${messages.getMessage('remainingRecordsStatus', [
numberRecordsProcessed,
numberRecordSucceeded,
numberRecordsFailed,
])}`;
};
3 changes: 1 addition & 2 deletions src/api/data/tree/exportApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import path from 'node:path';
import fs from 'node:fs';

import { Logger, Messages, Org, SfError, Lifecycle } from '@salesforce/core';
import { DescribeSObjectResult, QueryResult } from 'jsforce';
import { DescribeSObjectResult, QueryResult } from '@jsforce/jsforce-node';
import { Ux } from '@salesforce/sf-plugins-core';
import {
BasicRecord,
Expand Down Expand Up @@ -273,7 +273,6 @@ export class ExportApi {
treeRecord[key] = ref && ref !== id ? `@${ref}` : id;
return;
}
// TODO: what to do if ref not found?
const recordId: string = record['Id'] as string;
this.logger.error(`Reference ${relTo} not found for ${key}. Skipping record ${recordId}.`);
return;
Expand Down
11 changes: 2 additions & 9 deletions src/api/data/tree/importApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class ImportApi {
private config!: ImportConfig;
private importPlanConfig: DataPlanPart[] = [];

public constructor(private readonly org: Org, private readonly cli: string, private readonly separator: string) {
public constructor(private readonly org: Org) {
this.logger = Logger.childFromRoot(this.constructor.name);
this.schemaValidator = new SchemaValidator(this.logger, importPlanSchemaFile);
}
Expand Down Expand Up @@ -274,14 +274,7 @@ export class ImportApi {
throw new SfError(
messages.getMessage('dataPlanValidationError', [planPath, error.message]),
INVALID_DATA_IMPORT_ERR_NAME,
messages.getMessages('dataPlanValidationErrorActions', [
this.cli,
this.separator,
this.separator,
this.cli,
this.separator,
this.separator,
])
messages.getMessages('dataPlanValidationErrorActions')
);
}
throw SfError.wrap(error);
Expand Down
Loading