Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions src/bulkDataRequestCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

import { TTLConfig, Global, Logger, Messages, Org, ConfigAggregator, OrgConfigProperties } from '@salesforce/core';
import { TTLConfig, Global, Logger, Messages, Org } from '@salesforce/core';
import { Duration } from '@salesforce/kit';
import type { ResumeBulkExportOptions, ResumeBulkImportOptions } from './types.js';
import { ColumnDelimiterKeys } from './bulkUtils.js';
Expand Down Expand Up @@ -71,10 +71,7 @@ export abstract class BulkDataRequestCache extends TTLConfig<TTLConfig.Options,
* @param skipCacheValidatation make this method not throw if you passed a job ID that's not in the cache
* This was only added for `data upsert/delete resume` for backwards compatibility and will be removed after March 2025.
*/
public async resolveResumeOptionsFromCache(
jobIdOrMostRecent: string | boolean,
skipCacheValidatation = false
): Promise<ResumeBulkImportOptions> {
public async resolveResumeOptionsFromCache(jobIdOrMostRecent: string | boolean): Promise<ResumeBulkImportOptions> {
if (typeof jobIdOrMostRecent === 'boolean') {
const key = this.getLatestKey();
if (!key) {
Expand All @@ -92,21 +89,6 @@ export abstract class BulkDataRequestCache extends TTLConfig<TTLConfig.Options,
} else {
const entry = this.get(jobIdOrMostRecent);
if (!entry) {
if (skipCacheValidatation) {
const config = await ConfigAggregator.create();
const aliasOrUsername = config.getInfo(OrgConfigProperties.TARGET_ORG)?.value as string;
if (!aliasOrUsername) {
throw messages.createError('error.skipCacheValidateNoOrg', [jobIdOrMostRecent]);
}

return {
jobInfo: { id: jobIdOrMostRecent },
options: {
connection: (await Org.create({ aliasOrUsername })).getConnection(),
},
};
}

throw messages.createError('error.bulkRequestIdNotFound', [jobIdOrMostRecent]);
}

Expand Down
32 changes: 1 addition & 31 deletions src/bulkIngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,6 @@ export async function bulkIngest(opts: {
stages.setupJobListeners(job);
stages.processingJob();

// cache.resolveResumeOptionsFromCache for `delete/upsert resume --job-id <ID>`
// will not throw if the ID isn't in the cache to support the following scenario:
//
// `sf data delete bulk --wait 10` -> sync operation (successful or not) never created a cache
// `sf data delete resume -i <job-id>` worked b/c the cache resolver returned the ID as a cache entry
// `sf data delete resume --use-most-recent` was never supported for sync runs.
//
// We plan to remove this behavior in March 2025 (only these 2 commands supported this, `resume` commands should only resume jobs started by `sf`)
if (['upsert', 'delete', 'hardDelete'].includes(operation)) {
opts.warnFn(
'Resuming a synchronous operation via `sf data upsert/delete resume` will not be supported after March 2025.'
);
}

try {
await job.poll(5000, timeout.milliseconds);

Expand All @@ -151,18 +137,6 @@ export async function bulkIngest(opts: {
if (jobInfo.numberRecordsFailed) {
stages.error();

if (['delete', 'hardDelete', 'upsert'].includes(opts.operation) && opts.jsonEnabled) {
opts.warnFn(
'Record failures will not be included in JSON output after March 2025, use `sf data bulk results` to get results instead.'
);
return {
jobId: jobInfo.id,
processedRecords: jobInfo.numberRecordsProcessed,
successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0),
failedRecords: jobInfo.numberRecordsFailed,
};
}

throw messages.createError(
'error.failedRecordDetails',
[jobInfo.numberRecordsFailed],
Expand Down Expand Up @@ -223,11 +197,7 @@ export async function bulkIngestResume(opts: {
wait: Duration;
warnFn: (message: SfCommand.Warning) => void;
}): Promise<BulkIngestInfo> {
const resumeOpts = await opts.cache.resolveResumeOptionsFromCache(
opts.jobIdOrMostRecent,
// skip cache validation for only for these 2 commands for backwards compatibility.
['data upsert resume', 'data delete resume'].includes(opts.cmdId) ? true : false
);
const resumeOpts = await opts.cache.resolveResumeOptionsFromCache(opts.jobIdOrMostRecent);

const conn = resumeOpts.options.connection;

Expand Down
2 changes: 1 addition & 1 deletion src/commands/data/delete/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export default class DeleteResume extends SfCommand<BulkResultV2> {

const {
options: { connection: conn },
} = await cache.resolveResumeOptionsFromCache(flags['job-id'] ?? flags['use-most-recent'], true);
} = await cache.resolveResumeOptionsFromCache(flags['job-id'] ?? flags['use-most-recent']);

const job = conn.bulk2.job('ingest', {
id: res.jobId,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/data/upsert/resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export default class UpsertResume extends SfCommand<BulkResultV2> {

const {
options: { connection: conn },
} = await cache.resolveResumeOptionsFromCache(flags['job-id'] ?? flags['use-most-recent'], true);
} = await cache.resolveResumeOptionsFromCache(flags['job-id'] ?? flags['use-most-recent']);

const job = conn.bulk2.job('ingest', {
id: res.jobId,
Expand Down
132 changes: 35 additions & 97 deletions test/commands/data/dataBulk.nut.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import fs from 'node:fs';
import os from 'node:os';
import { expect, config as chaiConfig } from 'chai';
import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit';
import { sleep } from '@salesforce/kit';
import { ensurePlainObject } from '@salesforce/ts-types';
import type { BulkResultV2 } from '../../../src/types.js';
import type { QueryResult } from '../data/query/query.nut.js';
Expand All @@ -19,45 +18,6 @@ chaiConfig.truncateThreshold = 0;

let testSession: TestSession;

/** Verify that the operation completed successfully and results are available before attempting to do stuff with the results */
const isCompleted = async (cmd: string): Promise<void> => {
let complete = false;
while (!complete) {
// eslint-disable-next-line no-await-in-loop
await sleep(2000);
const result = execCmd<BulkResultV2>(cmd);
if (result.jsonOutput?.status === 0) {
if (result.jsonOutput.result.jobInfo.state === 'JobComplete') {
complete = true;
}
}
}
};

/* Check the status of the bulk upsert job using json output to determine progress
* The four states of a job are Queued, JobComplete, InProgress, and Aborted. If Aborted, the test will fail
* Otherwise run status until job is JobComplete
*/
const checkBulkResumeJsonResponse = (jobId: string, operation: 'delete' | 'upsert'): void => {
const statusResponse = execCmd<BulkResultV2>(`data:${operation}:resume --job-id ${jobId} --json`, {
ensureExitCode: 0,
}).jsonOutput?.result;
expect(statusResponse?.jobInfo.state).to.equal('JobComplete');
expect(statusResponse?.records?.successfulResults).to.be.an('array').with.lengthOf(10);
};

/* Check the status of the bulk upsert job using human output to determine progress
* The four states of a job are Queued, JobComplete, InProgress, and Aborted. If Aborted, the test will fail
* Otherwise run status until job is JobComplete
*/
const checkBulkStatusHumanResponse = (statusCommand: string): void => {
const statusResponse = execCmd(statusCommand, {
ensureExitCode: 0,
}).shellOutput.stdout.split(os.EOL);
const jobState = statusResponse.find((line) => line.includes('Status'));
expect(jobState).to.include('JobComplete');
};

describe('data:bulk commands', () => {
before(async () => {
testSession = await TestSession.create({
Expand All @@ -79,20 +39,41 @@ describe('data:bulk commands', () => {
describe('data:bulk verify json and human responses', () => {
describe('data:upsert:bulk then data:upsert:resume then soql:query and data:delete:bulk', () => {
it('should upsert, query, and delete 10 accounts', async () => {
const bulkUpsertResult: BulkResultV2 = bulkInsertAccounts();
let jobInfo = bulkUpsertResult.jobInfo;
expect(jobInfo).to.have.property('id');
await isCompleted(`data:upsert:resume --job-id ${jobInfo.id} --json`);

checkBulkStatusHumanResponse(`data:upsert:resume --job-id ${jobInfo.id}`);
checkBulkResumeJsonResponse(jobInfo.id, 'upsert');

const bulkDeleteResult = queryAndBulkDelete();
jobInfo = bulkDeleteResult.jobInfo;
await isCompleted(`data:delete:resume --job-id ${jobInfo.id} --json`);

checkBulkStatusHumanResponse(`data:delete:resume --job-id ${jobInfo.id}`);
checkBulkResumeJsonResponse(jobInfo.id, 'delete');
const cmd = `data:upsert:bulk --sobject Account --file ${path.join(
'.',
'data',
'bulkUpsert.csv'
)} --external-id Id --json --wait 10 --column-delimiter COMMA`;
const rawResponse = execCmd(cmd);
const response: BulkResultV2 | undefined = rawResponse.jsonOutput?.result as BulkResultV2;

expect(response.jobInfo).to.have.property('id');
expect(response.jobInfo.state).to.equal('JobComplete');
expect(response.records?.successfulResults).to.be.an('array').with.lengthOf(10);
const bulkUpsertResult = response.records?.successfulResults[0];
assert(Object.keys(ensurePlainObject(bulkUpsertResult)).includes('sf__Id'));

const records = queryAccountRecords();

const accountIds = records.map((account) => account.Id);
const idsFile = path.join(testSession.project?.dir ?? '.', 'data', 'deleteAccounts.csv');
fs.writeFileSync(idsFile, `Id${os.EOL}${accountIds.join(os.EOL)}${os.EOL}`);

// Run bulk delete
const deleteResponse: BulkResultV2 | undefined = execCmd<Awaited<BulkResultV2>>(
`data:delete:bulk --sobject Account --file ${idsFile} --json --wait 10 --line-ending ${
os.platform() === 'win32' ? 'CRLF' : 'LF'
}`,
{
ensureExitCode: 0,
}
).jsonOutput?.result;

expect(deleteResponse?.jobInfo).to.have.property('id');
expect(deleteResponse?.jobInfo.state).to.equal('JobComplete');
expect(deleteResponse?.records?.successfulResults.length).to.equal(10);
const bulkDeleteResult = deleteResponse?.records?.successfulResults[0];
assert(Object.keys(ensurePlainObject(bulkDeleteResult)).includes('sf__Id'));
});
});
});
Expand Down Expand Up @@ -145,46 +126,3 @@ const queryAccountRecords = () => {
expect(queryResponse).to.have.property('records').with.lengthOf.above(9);
return queryResponse.records;
};

const queryAndBulkDelete = (): BulkResultV2 => {
const records = queryAccountRecords();

const accountIds = records.map((account) => account.Id);
const idsFile = path.join(testSession.project?.dir ?? '.', 'data', 'deleteAccounts.csv');
fs.writeFileSync(idsFile, `Id${os.EOL}${accountIds.join(os.EOL)}${os.EOL}`);

// Run bulk delete
const deleteResponse: BulkResultV2 | undefined = execCmd<Awaited<BulkResultV2>>(
`data:delete:bulk --sobject Account --file ${idsFile} --json --wait 10 --line-ending ${
os.platform() === 'win32' ? 'CRLF' : 'LF'
}`,
{
ensureExitCode: 0,
}
).jsonOutput?.result;

assert.equal(deleteResponse?.records?.successfulResults.length, 10);
const bulkDeleteResult = deleteResponse?.records?.successfulResults[0];
assert('Id' in bulkDeleteResult);
return deleteResponse;
};

/** Bulk upsert 10 accounts */
const bulkInsertAccounts = (): BulkResultV2 => {
const cmd = `data:upsert:bulk --sobject Account --file ${path.join(
'.',
'data',
'bulkUpsert.csv'
)} --external-id Id --json --wait 10 --column-delimiter COMMA`;
const rawResponse = execCmd(cmd);
const response: BulkResultV2 | undefined = rawResponse.jsonOutput?.result as BulkResultV2;
if (response?.records) {
const records = response.records?.successfulResults;
assert.equal(records?.length, 10);
const bulkUpsertResult = response.records?.successfulResults[0];
assert(Object.keys(ensurePlainObject(bulkUpsertResult)).includes('sf__Id'));
const jobInfo = response.jobInfo;
assert('id' in jobInfo);
}
return response;
};
Loading