In [1]:
%%writefile sample_data.proto

syntax = "proto2";

message SampleData {

  optional double float64_col = 1;
  optional int64 int64_col = 2;
  optional string string_col = 3;
    
  required int64 row_num = 4;
}


Overwriting sample_data.proto


In [2]:
!protoc --python_out=. sample_data.proto

In [3]:
import datetime
import decimal

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2

import sample_data_pb2

In [4]:
def create_row_data(float64_col: float, int64_col: int, string_col: str, row_num: int ):
    row = sample_data_pb2.SampleData()
    row.row_num = row_num
    row.float64_col = float64_col
    row.int64_col = int64_col
    row.string_col = string_col
    return row.SerializeToString()

In [5]:
create_row_data(2.3,2,'Hello',1)

b'\tffffff\x02@\x10\x02\x1a\x05Hello \x01'

In [6]:
PROJECT_ID = 'michaelabel-demo'
DATASET_ID = 'bq_demos'
TABLE_ID = 'storage_demo'

write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(PROJECT_ID, DATASET_ID, TABLE_ID)

write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.PENDING

write_stream = write_client.create_write_stream(
    parent=parent, write_stream=write_stream
)

stream_name = write_stream.name


In [7]:
request_template = types.AppendRowsRequest()
request_template.write_stream = stream_name


proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data

In [8]:
append_rows_stream = writer.AppendRowsStream(write_client, request_template)

In [9]:
proto_rows = types.ProtoRows()

proto_rows.serialized_rows.append(create_row_data(1.2, 4, "Bye.", 1))

In [10]:
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data

In [11]:
response_future_1 = append_rows_stream.send(request)

In [12]:
print(response_future_1.result())

append_result {
  offset {
  }
}
write_stream: "projects/michaelabel-demo/datasets/bq_demos/tables/storage_demo/streams/Cic3MGUzOTJlZi0wMDAwLTJlNzgtYmI0ZS1mNDAzMDQ1ZTllZjY6czg"



In [13]:
proto_rows = types.ProtoRows()

proto_rows.serialized_rows.append(create_row_data(7.52, -93, "Hello.", 2))
proto_rows.serialized_rows.append(create_row_data(-1.23, 913, "BigQuery is fun.", 42))

# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data

# Offset must equal the number of rows that were previously sent.
request.offset = 1

response_future_2 = append_rows_stream.send(request)

In [14]:
print(response_future_2.result())

append_result {
  offset {
    value: 1
  }
}



In [15]:
append_rows_stream.close()

In [16]:
write_client.finalize_write_stream(name=write_stream.name)

row_count: 3

In [17]:
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)

print(f"Writes to stream: '{write_stream.name}' have been committed.")

Writes to stream: 'projects/michaelabel-demo/datasets/bq_demos/tables/storage_demo/streams/Cic3MGUzOTJlZi0wMDAwLTJlNzgtYmI0ZS1mNDAzMDQ1ZTllZjY6czg' have been committed.
