Skip to content

Commit

Permalink
feat(bigquery): add remote function options to routine metadata (goog…
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx committed Sep 26, 2022
1 parent abb6ec8 commit d9a437d
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 1 deletion.
11 changes: 11 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

connection "cloud.google.com/go/bigquery/connection/apiv1"
"cloud.google.com/go/civil"
datacatalog "cloud.google.com/go/datacatalog/apiv1"
"cloud.google.com/go/httpreplay"
Expand All @@ -54,6 +55,7 @@ var record = flag.Bool("record", false, "record RPCs")
var (
client *Client
storageClient *storage.Client
connectionsClient *connection.Client
policyTagManagerClient *datacatalog.PolicyTagManagerClient
dataset *Dataset
otherDataset *Dataset
Expand Down Expand Up @@ -123,6 +125,10 @@ func initIntegrationTest() func() {
if err != nil {
log.Fatal(err)
}
connectionsClient, err = connection.NewClient(ctx, option.WithHTTPClient(hc))
if err != nil {
log.Fatal(err)
}
policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx)
if err != nil {
log.Fatal(err)
Expand All @@ -140,6 +146,7 @@ func initIntegrationTest() func() {
}
client = nil
storageClient = nil
connectionsClient = nil
return func() {}

default: // Run integration tests against a real backend.
Expand Down Expand Up @@ -203,6 +210,10 @@ func initIntegrationTest() func() {
if err != nil {
log.Fatalf("datacatalog.NewPolicyTagManagerClient: %v", err)
}
connectionsClient, err = connection.NewClient(ctx, sOpts...)
if err != nil {
log.Fatalf("connection.NewService: %v", err)
}
c := initTestState(client, now)
return func() { c(); cleanup() }
}
Expand Down
87 changes: 86 additions & 1 deletion bigquery/routine.go
Expand Up @@ -163,6 +163,15 @@ const (
NotDeterministic RoutineDeterminism = "NOT_DETERMINISTIC"
)

const (
// ScalarFunctionRoutine scalar function routine type
ScalarFunctionRoutine = "SCALAR_FUNCTION"
// ProcedureRoutine procedure routine type
ProcedureRoutine = "PROCEDURE"
// TableValuedFunctionRoutine routine type for table valued functions
TableValuedFunctionRoutine = "TABLE_VALUED_FUNCTION"
)

// RoutineMetadata represents details of a given BigQuery Routine.
type RoutineMetadata struct {
ETag string
Expand All @@ -177,7 +186,11 @@ type RoutineMetadata struct {
// Language of the routine, such as SQL or JAVASCRIPT.
Language string
// The list of arguments for the the routine.
Arguments []*RoutineArgument
Arguments []*RoutineArgument

// Information for a remote user-defined function.
RemoteFunctionOptions *RemoteFunctionOptions

ReturnType *StandardSQLDataType

// Set only if the routine type is TABLE_VALUED_FUNCTION.
Expand All @@ -195,6 +208,66 @@ type RoutineMetadata struct {
Body string
}

// RemoteFunctionOptions contains information for a remote user-defined function.
type RemoteFunctionOptions struct {

// Fully qualified name of the user-provided connection object which holds
// the authentication information to send requests to the remote service.
// Format:
// projects/{projectId}/locations/{locationId}/connections/{connectionId}
Connection string

// Endpoint of the user-provided remote service (e.g. a function url in
// Google Cloud Function or Cloud Run )
Endpoint string

// Max number of rows in each batch sent to the remote service.
// If absent or if 0, it means no limit.
MaxBatchingRows int64

// User-defined context as a set of key/value pairs,
// which will be sent as function invocation context together with
// batched arguments in the requests to the remote service. The total
// number of bytes of keys and values must be less than 8KB.
UserDefinedContext map[string]string
}

func bqToRemoteFunctionOptions(in *bq.RemoteFunctionOptions) (*RemoteFunctionOptions, error) {
if in == nil {
return nil, nil
}
rfo := &RemoteFunctionOptions{
Connection: in.Connection,
Endpoint: in.Endpoint,
MaxBatchingRows: in.MaxBatchingRows,
}
if in.UserDefinedContext != nil {
rfo.UserDefinedContext = make(map[string]string)
for k, v := range in.UserDefinedContext {
rfo.UserDefinedContext[k] = v
}
}
return rfo, nil
}

func (rfo *RemoteFunctionOptions) toBQ() (*bq.RemoteFunctionOptions, error) {
if rfo == nil {
return nil, nil
}
r := &bq.RemoteFunctionOptions{
Connection: rfo.Connection,
Endpoint: rfo.Endpoint,
MaxBatchingRows: rfo.MaxBatchingRows,
}
if rfo.UserDefinedContext != nil {
r.UserDefinedContext = make(map[string]string)
for k, v := range rfo.UserDefinedContext {
r.UserDefinedContext[k] = v
}
}
return r, nil
}

func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
r := &bq.Routine{}
if rm == nil {
Expand Down Expand Up @@ -227,6 +300,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
}
r.Arguments = args
r.ImportedLibraries = rm.ImportedLibraries
if rm.RemoteFunctionOptions != nil {
rfo, err := rm.RemoteFunctionOptions.toBQ()
if err != nil {
return nil, err
}
r.RemoteFunctionOptions = rfo
}
if !rm.CreationTime.IsZero() {
return nil, errors.New("cannot set CreationTime on create")
}
Expand Down Expand Up @@ -436,6 +516,11 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
return nil, err
}
meta.ReturnType = ret
rfo, err := bqToRemoteFunctionOptions(r.RemoteFunctionOptions)
if err != nil {
return nil, err
}
meta.RemoteFunctionOptions = rfo
tt, err := bqToStandardSQLTableType(r.ReturnTableType)
if err != nil {
return nil, err
Expand Down
80 changes: 80 additions & 0 deletions bigquery/routine_integration_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/genproto/googleapis/cloud/bigquery/connection/v1"
)

func TestIntegration_RoutineScalarUDF(t *testing.T) {
Expand Down Expand Up @@ -88,6 +89,85 @@ func TestIntegration_RoutineJSUDF(t *testing.T) {
}
}

func TestIntegration_RoutineRemoteUDF(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

routineID := routineIDs.New()
routine := dataset.Routine(routineID)
uri := "https://aaabbbccc-uc.a.run.app"

connectionLocation := fmt.Sprintf("projects/%s/locations/%s", dataset.ProjectID, "us")
connectionName := fmt.Sprintf("udf_conn%s", routineID)
cleanupConnection, connectionID, err := createConnection(ctx, t, connectionLocation, connectionName)
if err != nil {
t.Fatal(err)
}
defer cleanupConnection()

remoteOpts := &RemoteFunctionOptions{
Endpoint: uri,
Connection: connectionID,
MaxBatchingRows: 50,
UserDefinedContext: map[string]string{"foo": "bar"},
}
meta := &RoutineMetadata{
RemoteFunctionOptions: remoteOpts,
Description: "defines a remote function",
Type: ScalarFunctionRoutine,
ReturnType: &StandardSQLDataType{
TypeKind: "STRING",
},
}
if err := routine.Create(ctx, meta); err != nil {
t.Fatalf("routine.Create: %v", err)
}

gotMeta, err := routine.Metadata(ctx)
if err != nil {
t.Fatalf("routine.Metadata: %v", err)
}

if diff := testutil.Diff(gotMeta.RemoteFunctionOptions, remoteOpts); diff != "" {
t.Fatalf("RemoteFunctionOptions: -got, +want:\n%s", diff)
}
}

func createConnection(ctx context.Context, t *testing.T, parent, name string) (cleanup func(), connectionID string, err error) {
fullname := fmt.Sprintf("%s/connections/%s", parent, name)
conn, err := connectionsClient.CreateConnection(ctx, &connection.CreateConnectionRequest{
Parent: parent,
ConnectionId: name,
Connection: &connection.Connection{
FriendlyName: name,
Properties: &connection.Connection_CloudResource{
CloudResource: &connection.CloudResourceProperties{},
},
},
})
if err != nil {
return
}
conn, err = connectionsClient.GetConnection(ctx, &connection.GetConnectionRequest{
Name: fullname,
})
if err != nil {
return
}
cleanup = func() {
err := connectionsClient.DeleteConnection(ctx, &connection.DeleteConnectionRequest{
Name: fullname,
})
if err != nil {
t.Logf("could not delete connection: %s", fullname)
}
}
connectionID = conn.Name
return
}

func TestIntegration_RoutineComplexTypes(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down

0 comments on commit d9a437d

Please sign in to comment.