Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permit Bigquery output to create jobs in a different project #2532

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 14 additions & 6 deletions internal/impl/gcp/output_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func gcpBigQueryCSVConfigFromParsed(conf *service.ParsedConfig) (csvconf gcpBigQ
}

type gcpBigQueryOutputConfig struct {
JobProjectID string
ProjectID string
DatasetID string
TableID string
Expand All @@ -71,6 +72,12 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig
if gconf.ProjectID == "" {
gconf.ProjectID = bigquery.DetectProjectID
}
if gconf.JobProjectID, err = conf.FieldString("job_project"); err != nil {
return
}
if gconf.JobProjectID == "" {
gconf.JobProjectID = gconf.ProjectID
}
if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil {
return
}
Expand Down Expand Up @@ -106,11 +113,11 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig

type gcpBQClientURL string

func (g gcpBQClientURL) NewClient(ctx context.Context, projectID string) (*bigquery.Client, error) {
func (g gcpBQClientURL) NewClient(ctx context.Context, JobProjectID string) (*bigquery.Client, error) {
if g == "" {
return bigquery.NewClient(ctx, projectID)
return bigquery.NewClient(ctx, JobProjectID)
}
return bigquery.NewClient(ctx, projectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
return bigquery.NewClient(ctx, JobProjectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
}

func gcpBigQueryConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -155,6 +162,7 @@ The same is true for the CSV format.

For the CSV format when the field `+"`csv.header`"+` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.`)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("job_project").Description("The project ID in which jobs will be exectuted. If not set, project will be used.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Expand Down Expand Up @@ -302,7 +310,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
defer g.connMut.Unlock()

var client *bigquery.Client
if client, err = g.clientURL.NewClient(context.Background(), g.conf.ProjectID); err != nil {
if client, err = g.clientURL.NewClient(context.Background(), g.conf.JobProjectID); err != nil {
err = fmt.Errorf("error creating big query client: %w", err)
return
}
Expand All @@ -312,7 +320,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}
}()

dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID)
dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID)
if _, err = dataset.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID)
Expand Down Expand Up @@ -385,7 +393,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag
}

func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader {
table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID)
table := g.client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID).Table(g.conf.TableID)

source := bigquery.NewReaderSource(bytes.NewReader(*data))
source.SourceFormat = bigquery.DataFormat(g.conf.Format)
Expand Down
10 changes: 10 additions & 0 deletions website/docs/components/outputs/gcp_bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ output:
label: ""
gcp_bigquery:
project: ""
job_project: ""
dataset: "" # No default (required)
table: "" # No default (required)
format: NEWLINE_DELIMITED_JSON
Expand All @@ -60,6 +61,7 @@ output:
label: ""
gcp_bigquery:
project: ""
job_project: ""
dataset: "" # No default (required)
table: "" # No default (required)
format: NEWLINE_DELIMITED_JSON
Expand Down Expand Up @@ -140,6 +142,14 @@ Batches can be formed at both the input and output level. You can find out more
The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.


Type: `string`
Default: `""`

### `job_project`

The project ID in which jobs will be exectuted. If not set, project will be used.


Type: `string`
Default: `""`

Expand Down