Skip to content
Merged
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ OAuth flow for installed applications.
| description | string | optional | nil | description of table |
| merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) |
| merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) |
| retain_column_descriptions | boolean | optional | false | In case of replace mode, the column's descriptions are taken over. |
| retain_column_policy_tags | boolean | optional | false | In case of replace mode, the table policy tags are taken over. |

Client or request options

Expand Down
31 changes: 31 additions & 0 deletions example/config_replace_retain_column_descriptions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
retain_column_descriptions: true
31 changes: 31 additions & 0 deletions example/config_replace_retain_column_policy_tags.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
retain_column_policy_tags: true
2 changes: 2 additions & 0 deletions lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def self.configure(config, schema, task_count)
'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),

'description' => config.param('description', :string, :default => nil),
'retain_column_descriptions' => config.param('retain_column_descriptions', :bool, :default => false),
'retain_column_policy_tags' => config.param('retain_column_policy_tags', :bool, :default => false),

'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil),
'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0
Expand Down
28 changes: 24 additions & 4 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'thwait'
require_relative 'google_client'
require_relative 'helper'
require_relative 'bigquery_service_with_policy_tag'

module Embulk
module Output
Expand All @@ -12,7 +13,8 @@ class BigqueryClient < GoogleClient

def initialize(task, schema, fields = nil)
scope = "https://www.googleapis.com/auth/bigquery"
client_class = Google::Apis::BigqueryV2::BigqueryService
need_takeover = (task['mode'] == 'replace') && (task['retain_column_descriptions'] || task['retain_column_policy_tags'])
client_class = need_takeover ? BigqueryServiceWithPolicyTag : Google::Apis::BigqueryV2::BigqueryService
super(task, scope, client_class)

@schema = schema
Expand All @@ -30,6 +32,14 @@ def initialize(task, schema, fields = nil)
@task['encoding'] ||= 'UTF-8'
@task['ignore_unknown_values'] = false if @task['ignore_unknown_values'].nil?
@task['allow_quoted_newlines'] = false if @task['allow_quoted_newlines'].nil?

@src_fields = need_takeover ? fetch_src_fields : []
end

def fetch_src_fields
get_table(@task['table'])&.schema&.fields || []
rescue NotFoundError
[]
end

def fields
Expand Down Expand Up @@ -537,21 +547,31 @@ def patch_table
with_job_retry do
table = get_table(@task['table'])

def patch_description(fields, column_options)
def patch_description_and_policy_tags(fields, column_options, src_fields)
fields.map do |field|
src_field = src_fields.select {|s_field| s_field.name == field.name}.first
if src_field
field.update!(description: src_field.description) if @task['retain_column_descriptions'] && src_field.description
field.update!(policy_tags: src_field.policy_tags) if @task['retain_column_policy_tags'] && src_field.policy_tags
if field.fields && src_field.fields
nested_fields = patch_description_and_policy_tags(field.fields, [], src_field.fields)
field.update!(fields: nested_fields)
end
end

column_option = column_options.select{|col_opt| col_opt['name'] == field.name}.first
if column_option
field.update!(description: column_option['description']) if column_option['description']
if field.fields && column_option['fields']
nested_fields = patch_description(field.fields, column_option['fields'])
nested_fields = patch_description_and_policy_tags(field.fields, column_option['fields'], [])
field.update!(fields: nested_fields)
end
end
field
end
end

fields = patch_description(table.schema.fields, @task['column_options'])
fields = patch_description_and_policy_tags(table.schema.fields, @task['column_options'], @src_fields)
table.schema.update!(fields: fields)
table_id = Helper.chomp_partition_decorator(@task['table'])
with_network_retry { client.patch_table(@destination_project, @dataset, table_id, table) }
Expand Down
88 changes: 88 additions & 0 deletions lib/embulk/output/bigquery/bigquery_service_with_policy_tag.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
require 'google/apis/bigquery_v2'

module Embulk
module Output
class Bigquery < OutputPlugin
# NOTE:
# Due to the JRuby version constraint in Embulk v0.9, it’s not possible to upgrade to a version of the google-api-client (0.37.0 or later) that includes support for policy_tags.
# So the workaround was implemented using a patch-like solution as shown below.
class BigqueryServiceWithPolicyTag < Google::Apis::BigqueryV2::BigqueryService
def get_table(project_id, dataset_id, table_id, selected_fields: nil, fields: nil, quota_user: nil, user_ip: nil, options: nil, &block)
command = make_simple_command(:get, 'projects/{projectId}/datasets/{datasetId}/tables/{tableId}', options)
command.response_representation = TableWithPolicyTag::Representation
command.response_class = TableWithPolicyTag
command.params['projectId'] = project_id unless project_id.nil?
command.params['datasetId'] = dataset_id unless dataset_id.nil?
command.params['tableId'] = table_id unless table_id.nil?
command.query['selectedFields'] = selected_fields unless selected_fields.nil?
command.query['fields'] = fields unless fields.nil?
command.query['quotaUser'] = quota_user unless quota_user.nil?
command.query['userIp'] = user_ip unless user_ip.nil?
execute_or_queue_command(command, &block)
end

def patch_table(project_id, dataset_id, table_id, table_object = nil, autodetect_schema: nil, fields: nil, quota_user: nil, options: nil, &block)
command = make_simple_command(:patch, 'projects/{+projectId}/datasets/{+datasetId}/tables/{+tableId}', options)
command.request_representation = TableWithPolicyTag::Representation
command.request_object = table_object
command.response_representation = TableWithPolicyTag::Representation
command.response_class = TableWithPolicyTag
command.params['projectId'] = project_id unless project_id.nil?
command.params['datasetId'] = dataset_id unless dataset_id.nil?
command.params['tableId'] = table_id unless table_id.nil?
command.query['autodetect_schema'] = autodetect_schema unless autodetect_schema.nil?
command.query['fields'] = fields unless fields.nil?
command.query['quotaUser'] = quota_user unless quota_user.nil?
execute_or_queue_command(command, &block)
end
end

class TableFieldSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableFieldSchema
class PolicyTags
include Google::Apis::Core::JsonObjectSupport
include Google::Apis::Core::Hashable

class Representation < Google::Apis::Core::JsonRepresentation
collection :names, as: 'names'
end

attr_accessor :names

def initialize(**args)
update!(**args)
end

def update!(**args)
@names = args[:names] if args.key?(:names)
end
end

include Google::Apis::Core::Hashable

attr_accessor :policy_tags

def update!(**args)
super
@policy_tags = args[:policy_tags] if args.key?(:policy_tags)
end

class Representation < Google::Apis::BigqueryV2::TableFieldSchema::Representation
collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation
property :policy_tags, as: 'policyTags', class: TableFieldSchemaWithPolicyTag::PolicyTags, decorator: TableFieldSchemaWithPolicyTag::PolicyTags::Representation
end
end

class TableSchemaWithPolicyTag < Google::Apis::BigqueryV2::TableSchema
class Representation < Google::Apis::BigqueryV2::TableSchema::Representation
collection :fields, as: 'fields', class: TableFieldSchemaWithPolicyTag, decorator: TableFieldSchemaWithPolicyTag::Representation
end
end

class TableWithPolicyTag < Google::Apis::BigqueryV2::Table
class Representation < Google::Apis::BigqueryV2::Table::Representation
property :schema, as: 'schema', class: TableSchemaWithPolicyTag, decorator: TableSchemaWithPolicyTag::Representation
end
end
end
end
end
30 changes: 30 additions & 0 deletions test/test_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,36 @@ def test_replace_with_partitioning
end
Bigquery.transaction(config, schema, processor_count, &control)
end

def test_replace_with_retain_column_descriptions
config = least_config.merge('mode' => 'replace', 'retain_column_descriptions' => true)
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).get_table(config['table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end

def test_replace_with_retain_column_policy_tags
config = least_config.merge('mode' => 'replace', 'retain_column_policy_tags' => true)
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).get_table(config['table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end
end

sub_test_case "replace_backup" do
Expand Down