Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implement: import commands #140

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ function getProjectId(dataset: Dataset): string;
function getProjectId(model: Model): string;
function getProjectId(table: Table): string;
function getProjectId(routine: Routine): string;
function getProjectId(bigquery: BigQuery): string;

function getProjectId(
bqObj: Dataset | Table | Routine | Model | BigQuery,
bqObj: Dataset | Table | Routine | Model,
): string {
if (bqObj?.projectId) {
return bqObj.projectId;
Expand All @@ -60,7 +59,30 @@ function getProjectId(
throw new Error(`Cannot find projectId ${bqObj}`);
}

export { getProjectId };
function getFullResourceId(dataset: Dataset): string;
function getFullResourceId(model: Model): string;
function getFullResourceId(table: Table): string;
function getFullResourceId(routine: Routine): string;
function getFullResourceId(bqObj: Dataset | Table | Routine | Model): string {
if (bqObj instanceof Model) {
return `${bqObj.dataset.projectId}:${bqObj.dataset.id}.${bqObj.id}`;
}

if (bqObj instanceof Table) {
return `${bqObj.dataset.projectId}:${bqObj.dataset.id}.${bqObj.id}`;
}

if (bqObj instanceof Routine) {
const dataset = bqObj.parent as Dataset;
return `${dataset.projectId}:${dataset.id}.${bqObj.id}`;
}

if (bqObj instanceof Dataset) {
return `${bqObj.projectId}:${bqObj.id}`;
}

throw new Error(`Cannot find projectId ${bqObj}`);
}

const buildThrottledBigQueryClient = (
concurrency: number,
Expand Down Expand Up @@ -281,12 +303,39 @@ const extractBigQueryDestinations = async (
return refs.map((r) => JSON.parse(r));
};

const constructDDLfromBigQueryObject = async (
bqObj: Routine,
): Promise<string> => {
const [metadata, _] = await bqObj.getMetadata();
const id = getFullResourceId(bqObj).replace(':', '.');

const _argumentsString = metadata.arguments
? metadata.arguments.map((arg: any) =>
`${arg.name} ${arg.dataType ?? arg.argumentKind.replace('ANY_TYPE', 'ANY TYPE')
}`
)
.join(
', ',
)
: '';

return [
`create or replace function \`${id}\`(${_argumentsString})`,
metadata.language == 'js' ? `language ${metadata.language}` : '',
metadata.returnType ? `return ${metadata.returnType}` : '',
`as (${metadata.definitionBody})`,
].filter((s) => s).join('\n');
};

export {
BigQueryResource,
bq2path,
buildThrottledBigQueryClient,
constructDDLfromBigQueryObject,
extractBigQueryDependencies,
extractBigQueryDestinations,
getFullResourceId,
getProjectId,
normalizedBQPath,
normalizeShardingTableId,
path2bq,
Expand Down
69 changes: 67 additions & 2 deletions src/commands/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { formatLocalfiles } from '../../src/commands/fix.js';
import { pushLocalFilesToBigQuery } from '../../src/commands/push.js';
import { createBundleSQL } from '../../src/commands/bundle.js';
import { pullBigQueryResources } from '../../src/commands/pull.js';
import { importBigQueryResources } from '../../src/commands/import.js';

import { buildThrottledBigQueryClient } from '../../src/bigquery.js';
import type { Query } from '@google-cloud/bigquery';
Expand Down Expand Up @@ -47,8 +48,8 @@ export function createCLI() {
.option(
'-p, --parameter <key:value...>',
`Either a file containing a JSON list of query parameters, or a query parameter in the form "name:type:value". ` +
`An empty name produces a positional parameter. The type may be omitted to assume STRING: name::value or ::value. ` +
`The value "NULL" produces a null value. repeat this option to specify a list of values`,
`An empty name produces a positional parameter. The type may be omitted to assume STRING: name::value or ::value. ` +
`The value "NULL" produces a null value. repeat this option to specify a list of values`,
)
.option(
'--maximum_bytes_billed <number of bytes>',
Expand Down Expand Up @@ -247,10 +248,74 @@ export function createCLI() {
}
});

const importCommand = new Command('import')
.description(
'Import other dataset UDF into specified dataset',
)
.argument('<destination>')
.argument('[targets...]')
.action(
async (destination: string, cmdTargets: string[] | undefined, _, cmd) => {
const cmdOptions = cmd.optsWithGlobals();
const rootDir = cmdOptions.rootPath;
if (!rootDir) {
console.error('CLI Error');
return;
}

const bqClient = buildThrottledBigQueryClient(
parseInt(cmdOptions.threads),
500,
);

// Parse targets 'bqutils.fn.sure_nonnull' into [{'project': 'bqutils', dataset: 'fn', routine_id: 'sure_nonnull'}]
const targets = cmdTargets?.map((target) => {
const elms = target.split('.');
if (elms.length !== 3) {
throw new Error(`Invalid target: ${target}`);
}
return {
project: elms[0] as string,
dataset: elms[1] as string,
routine_id: elms[2] as string,
};
}) ?? [];

let paramDestination: { project: string; dataset: string };
const [projectOrDataset, destinationDataset] = destination.split('.');
if (destinationDataset) {
paramDestination = {
project: destinationDataset,
dataset: destinationDataset,
};
} else if (projectOrDataset) {
paramDestination = {
project: '@default',
dataset: projectOrDataset,
};
} else {
throw new Error(`Invalid destination: ${destination}`);
}

const ctx = {
bigQuery: bqClient,
rootPath: rootDir,
destination: paramDestination,
importTargets: targets,
options: {
is_update: true,
},
};

await importBigQueryResources(ctx);
},
);

program.addCommand(pushCommand);
program.addCommand(pullCommand);
program.addCommand(formatCommmand);
program.addCommand(bundleCommand);
program.addCommand(importCommand);

return program;
}
199 changes: 199 additions & 0 deletions src/commands/import.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import type {
BigQuery,
// DatasetOptions,
// GetDatasetsOptions,
Routine,
} from '@google-cloud/bigquery';

import {
BigQueryResource,
bq2path,
constructDDLfromBigQueryObject,
} from '../../src/bigquery.js';
import { pullMetadataTaskBuilder } from '../../src/commands/pull.js';
import { Task } from '../../src/tasks/base.js';
import { ReporterMap } from '../../src/reporter/index.js';
import * as fs from 'node:fs';
import * as path from 'node:path';

type ImportContext = {
bigQuery: BigQuery;
rootPath: string;
destination: {
project: string;
dataset: string;
};
importTargets: {
project: string;
dataset: string;
routine_id: string;
}[];
options: {
is_update: boolean;
};
};

type NormalProject = {
kind: 'normal';
value: string;
};
type SpecialProject = {
kind: 'special';
value: '@default';
resolved_value: string;
};

type BQPPRojectID = NormalProject | SpecialProject;

const parseProjectID = async (
ctx: ImportContext,
projectID: string,
): Promise<BQPPRojectID> => {
if (projectID === '@default') {
return {
kind: 'special',
value: '@default',
resolved_value: await ctx.bigQuery.getProjectId(),
};
} else {
return {
kind: 'normal',
value: projectID,
};
}
};

const importRoutine = async (
ctx: ImportContext,
client: BigQuery,
destination: { project: string; dataset: string },
routine: Routine,
) => {
const [metadata] = await routine.getMetadata();

let imported;
try {
const _imported = client.dataset(destination.dataset, {
projectId: destination.project,
}).routine(metadata.routineReference.routineId);
await _imported.get();
imported = _imported;
} catch (e) {
const [_imported, _] = await client.dataset(destination.dataset, {
projectId: destination.project,
})
.createRoutine(
metadata.routineReference.routineId,
{
arguments: metadata.arguments,
definitionBody: metadata.definitionBody,
description: metadata.description,
determinismLevel: metadata.determinismLevel,
language: metadata.language,
returnType: metadata.returnType,
routineType: metadata.routineType,
},
);
imported = _imported;
}

const parsed: BQPPRojectID = await parseProjectID(
ctx,
ctx.destination.project,
);
const d = bq2path(
imported as BigQueryResource,
parsed.kind === 'special',
);

fs.mkdirSync(path.dirname(d), { recursive: true });
const importPath = path.join(ctx.rootPath, d, '_imported.json');
fs.promises.writeFile(
importPath,
JSON.stringify(routine.metadata.routineReference, null, 2),
);
return imported;
};

async function importBigQueryResources(
ctx: ImportContext,
): Promise<number> {
const tasks: Task[] = [];

const genPullTask = pullMetadataTaskBuilder(
{
BQIDs: [],
BigQuery: ctx.bigQuery,
rootPath: ctx.rootPath,
withDDL: true,
forceAll: false,
reporter: 'json',
},
async (bqId: string) => {
const [datasetAndProject, routineId] = bqId.split('.');
if (!datasetAndProject) {
return undefined;
}
const [projectId, datasetId] = datasetAndProject.split(':');
if (!projectId || !datasetId || !routineId) {
return undefined;
}

const routine = ctx.bigQuery.dataset(datasetId, { projectId })
.routine(routineId);

return constructDDLfromBigQueryObject(routine);
},
);

for (const target of ctx.importTargets) {
const parsed: BQPPRojectID = await parseProjectID(
ctx,
ctx.destination.project,
);
const parsedDestination = {
project: (parsed.kind == 'special' ? parsed.resolved_value : undefined) ??
parsed.value,
dataset: ctx.destination.dataset,
};
const task1 = new Task(
`${ctx.destination.project}/${ctx.destination.dataset}/(import)/${target.project}.${target.dataset}.${target.routine_id}`,
async () => {
const importedRoutine: Routine = await importRoutine(
ctx,
ctx.bigQuery,
parsedDestination,
ctx.bigQuery
.dataset(target.dataset, { projectId: target.project })
.routine(target.routine_id),
);

const task = await genPullTask(importedRoutine);
task.run();
tasks.push(task);
return 'success';
},
);
tasks.push(task1);
}

const reporter = new ReporterMap['json']();
try {
reporter.onInit(tasks);
tasks.forEach((t) => t.run());
while (tasks.some((t) => !t.done())) {
reporter.onUpdate();
await new Promise((resolve) => setTimeout(resolve, 100));
}
reporter.onUpdate();
} catch (e: unknown) {
} finally {
reporter.onFinished();
}

const failedTasks =
tasks.filter((t) => t.result().status !== 'success').length;
return failedTasks;
}

export { importBigQueryResources };
Loading