From 6a5ca54157a1e276bf54dec1be6dce42d41de7ab Mon Sep 17 00:00:00 2001 From: "luca.rovinetti" Date: Tue, 16 Apr 2024 08:50:11 +0200 Subject: [PATCH] Add support for creating bigquery jobs in a different project Signed-off-by: luca.rovi Signed-off-by: luca.rovinetti --- .../components/pages/outputs/gcp_bigquery.adoc | 11 +++++++++++ internal/impl/gcp/output_bigquery.go | 16 ++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/docs/modules/components/pages/outputs/gcp_bigquery.adoc b/docs/modules/components/pages/outputs/gcp_bigquery.adoc index 7e1fc5b799..f451d6200e 100644 --- a/docs/modules/components/pages/outputs/gcp_bigquery.adoc +++ b/docs/modules/components/pages/outputs/gcp_bigquery.adoc @@ -40,6 +40,7 @@ output: label: "" gcp_bigquery: project: "" + job_project: "" dataset: "" # No default (required) table: "" # No default (required) format: NEWLINE_DELIMITED_JSON @@ -67,6 +68,7 @@ output: label: "" gcp_bigquery: project: "" + job_project: "" dataset: "" # No default (required) table: "" # No default (required) format: NEWLINE_DELIMITED_JSON @@ -145,6 +147,15 @@ This output benefits from sending messages as a batch for improved performance. The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable. +*Type*: `string` + +*Default*: `""` + +=== `job_project` + +The project ID in which jobs will be exectuted. If not set, project will be used. + + *Type*: `string` *Default*: `""` diff --git a/internal/impl/gcp/output_bigquery.go b/internal/impl/gcp/output_bigquery.go index c4887dab04..da98f0efb8 100644 --- a/internal/impl/gcp/output_bigquery.go +++ b/internal/impl/gcp/output_bigquery.go @@ -62,6 +62,7 @@ func gcpBigQueryCSVConfigFromParsed(conf *service.ParsedConfig) (csvconf gcpBigQ } type gcpBigQueryOutputConfig struct { + JobProjectID string ProjectID string DatasetID string TableID string @@ -85,6 +86,12 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig if gconf.ProjectID == "" { gconf.ProjectID = bigquery.DetectProjectID } + if gconf.JobProjectID, err = conf.FieldString("job_project"); err != nil { + return + } + if gconf.JobProjectID == "" { + gconf.JobProjectID = gconf.ProjectID + } if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil { return } @@ -131,9 +138,9 @@ func (g gcpBQClientURL) NewClient(ctx context.Context, conf gcpBigQueryOutputCon if err != nil { return nil, err } - return bigquery.NewClient(ctx, conf.ProjectID, opt...) + return bigquery.NewClient(ctx, conf.JobProjectID, opt...) } - return bigquery.NewClient(ctx, conf.ProjectID, option.WithoutAuthentication(), option.WithEndpoint(string(g))) + return bigquery.NewClient(ctx, conf.JobProjectID, option.WithoutAuthentication(), option.WithEndpoint(string(g))) } func gcpBigQueryConfig() *service.ConfigSpec { @@ -179,6 +186,7 @@ The same is true for the CSV format. For the CSV format when the field ` + "`csv.header`" + ` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.` + service.OutputPerformanceDocs(true, true)). Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")). + Field(service.NewStringField("job_project").Description("The project ID in which jobs will be exectuted. If not set, project will be used.").Default("")). Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")). Field(service.NewStringField("table").Description("The table to insert messages to.")). Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)). @@ -337,7 +345,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) { } }() - dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID) + dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID) if _, err = dataset.Metadata(ctx); err != nil { if hasStatusCode(err, http.StatusNotFound) { err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID) @@ -410,7 +418,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag } func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader { - table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID) + table := g.client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID).Table(g.conf.TableID) source := bigquery.NewReaderSource(bytes.NewReader(*data)) source.SourceFormat = bigquery.DataFormat(g.conf.Format)