Skip to content

Commit

Permalink
feat: add read task in bigquery component (#156)
Browse files Browse the repository at this point in the history
Because

- we want to read from data components

This commit

- add read task to bigquery
  • Loading branch information
chuang8511 committed Jun 24, 2024
1 parent 77fe2fc commit 4d2e7ec
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 18 deletions.
14 changes: 14 additions & 0 deletions data/bigquery/v0/README.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The BigQuery component is a data component that allows users to insert data to B
It can carry out the following tasks:

- [Insert](#insert)
- [Read](#read)

## Release Stage

Expand Down Expand Up @@ -41,3 +42,16 @@ Insert data to BigQuery.
| Output | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Status | `status` | string | Status of the upload operation |

### Read

Read data from BigQuery.

| Input | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Task ID (required) | `task` | string | `TASK_READ` |
| Filtering | `filtering` | string | The filter to be applied to the data, please start with where clause |

| Output | ID | Type | Description |
| :--- | :--- | :--- | :--- |
| Data | `data` | array[object] | The data to be read from BigQuery |
3 changes: 2 additions & 1 deletion data/bigquery/v0/config/definition.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"availableTasks": [
"TASK_INSERT"
"TASK_INSERT",
"TASK_READ"
],
"custom": false,
"documentationUrl": "https://www.instill.tech/docs/component/data/bigquery",
Expand Down
58 changes: 41 additions & 17 deletions data/bigquery/v0/config/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,16 @@
"instillUIOrder": 0,
"properties": {
"data": {
"additionalProperties": false,
"additionalProperties": true,
"instillShortDescription": "The data to be inserted to BigQuery",
"description": "The data to be inserted to BigQuery",
"instillUIOrder": 0,
"patternProperties": {
"^[a-z_][-a-z_0-9]{0,31}$": {
"instillAcceptFormats": [
"*"
],
"instillUIOrder": 0,
"instillUpstreamTypes": [
"reference",
"template"
],
"title": "Data"
}
},
"required": [],
"title": "Data",
"type": "object"
}
},
"required": [
"data"
],
"required": [],
"title": "Input",
"type": "object"
},
Expand All @@ -50,5 +35,44 @@
"title": "Output",
"type": "object"
}
},
"TASK_READ": {
"instillShortDescription": "Read data from BigQuery.",
"input": {
"instillUIOrder": 0,
"properties": {
"filtering": {
"instillShortDescription": "The filter to be applied to the data",
"description": "The filter to be applied to the data, please start with where clause",
"instillUIOrder": 0,
"required": [],
"title": "Filtering",
"type": "string"
}
},
"required": [],
"title": "Input",
"type": "object"
},
"output": {
"instillUIOrder": 0,
"description": "The data to be read from BigQuery",
"properties": {
"data": {
"description": "The data to be read from BigQuery",
"instillUIOrder": 0,
"title": "Data",
"type": "array",
"items": {
"title": "Data item",
"type": "object",
"required": []
}
}
},
"required": ["data"],
"title": "Output",
"type": "object"
}
}
}
171 changes: 171 additions & 0 deletions data/bigquery/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ import (
"sync"

"cloud.google.com/go/bigquery"
pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/instill-ai/component/base"
)

const (
taskInsert = "TASK_INSERT"
taskRead = "TASK_READ"
)

var instillUpstreamTypes = []string{"value", "reference", "template"}

//go:embed config/definition.json
var definitionJSON []byte

Expand Down Expand Up @@ -102,6 +108,27 @@ func (e *execution) Execute(ctx context.Context, inputs []*structpb.Struct) ([]*
return nil, err
}
output = &structpb.Struct{Fields: map[string]*structpb.Value{"status": {Kind: &structpb.Value_StringValue{StringValue: "success"}}}}
case taskRead:

inputStruct := ReadInput{
ProjectID: getProjectID(e.Setup),
DatasetID: getDatasetID(e.Setup),
TableName: getTableName(e.Setup),
Client: client,
}
err := base.ConvertFromStructpb(input, &inputStruct)
if err != nil {
return nil, err
}
outputStruct, err := readDataFromBigQuery(inputStruct)
if err != nil {
return nil, err
}
output, err = base.ConvertToStructpb(outputStruct)
if err != nil {
return nil, err
}

default:
return nil, fmt.Errorf("unsupported task: %s", e.Task)
}
Expand All @@ -122,3 +149,147 @@ func (c *component) Test(sysVars map[string]any, setup *structpb.Struct) error {
}
return errors.New("project ID does not match")
}

type TableColumns struct {
TableName string
Columns []Column
}

type Column struct {
Name string
Type string
}

func (c *component) GetDefinition(sysVars map[string]any, compConfig *base.ComponentConfig) (*pb.ComponentDefinition, error) {

ctx := context.Background()
oriDef, err := c.Component.GetDefinition(nil, nil)
if err != nil {
return nil, err
}

if compConfig == nil {
return oriDef, nil
}

def := proto.Clone(oriDef).(*pb.ComponentDefinition)
client, err := NewClient(compConfig.Setup["json-key"].(string), compConfig.Setup["project-id"].(string))
if err != nil || client == nil {
return nil, fmt.Errorf("error creating BigQuery client: %v", err)
}
defer client.Close()

myDataset := client.Dataset(compConfig.Setup["dataset-id"].(string))
tables, err := constructTableColumns(myDataset, ctx, compConfig)
if err != nil {
return nil, err
}

tableProperties, err := constructTableProperties(tables)
if err != nil {
return nil, err
}

// TODO: chuang8511, remove table from definition.json and make it dynamic.
// It will be changed before 2024-06-26.
tableProperty := tableProperties[0]
for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values {
data := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["input"].GetStructValue().Fields["properties"].GetStructValue().Fields["data"].GetStructValue()
if data != nil {
data.Fields["properties"] = structpb.NewStructValue(tableProperty)
}
}

for _, dataSpec := range def.Spec.DataSpecifications {
dataInput := dataSpec.Input.Fields["properties"].GetStructValue().Fields["data"].GetStructValue()
if dataInput != nil {
dataInput.Fields["properties"] = structpb.NewStructValue(tableProperty)
}
dataOutput := dataSpec.Output.Fields["properties"].GetStructValue().Fields["data"].GetStructValue()

if dataOutput != nil {
aPieceData := dataOutput.Fields["items"].GetStructValue()
if aPieceData != nil {
aPieceData.Fields["properties"] = structpb.NewStructValue(tableProperty)
}

}
}

return def, nil
}

func constructTableColumns(myDataset *bigquery.Dataset, ctx context.Context, compConfig *base.ComponentConfig) ([]TableColumns, error) {
tableIT := myDataset.Tables(ctx)
tables := []TableColumns{}
for {
table, err := tableIT.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
tableName := table.TableID
tableDetail := myDataset.Table(tableName)
metadata, err := tableDetail.Metadata(ctx)
if err != nil {
return nil, err
}
schema := metadata.Schema
columns := []Column{}
for _, field := range schema {
columns = append(columns, Column{Name: field.Name, Type: string(field.Type)})
}

// TODO: chuang8511, remove table from definition.json and make it dynamic.
// It will be changed before 2024-06-26.
if compConfig.Setup["table-name"].(string) == tableName {
tables = append(tables, TableColumns{TableName: tableName, Columns: columns})
}
}
if len(tables) == 0 {
return nil, fmt.Errorf("table name is not found in the dataset")
}
return tables, nil
}

func constructTableProperties(tables []TableColumns) ([]*structpb.Struct, error) {
tableProperties := make([]*structpb.Struct, len(tables))

for idx, table := range tables {
propertiesMap := make(map[string]interface{})
for idx, column := range table.Columns {
propertiesMap[column.Name] = map[string]interface{}{
"title": column.Name,
"instillUIOrder": idx,
"description": "Column " + column.Name + " of table " + table.TableName,
"instillFormat": getInstillAcceptFormat(column.Type),
"instillUpstreamTypes": instillUpstreamTypes,
"instillAcceptFormats": []string{getInstillAcceptFormat(column.Type)},
"required": []string{},
"type": getInstillAcceptFormat(column.Type),
}
}
propertyStructPB, err := base.ConvertToStructpb(propertiesMap)
if err != nil {
return nil, err
}

tableProperties[idx] = propertyStructPB
}
return tableProperties, nil
}

func getInstillAcceptFormat(tableType string) string {
switch tableType {
case "STRING":
return "string"
case "INTEGER":
return "integer"
case "BOOLEAN":
return "boolean"
default:
return "string"
}
}
3 changes: 3 additions & 0 deletions data/bigquery/v0/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// TODO: chuang8511, add test code
// It will be done before 2024-06-26.
package bigquery
59 changes: 59 additions & 0 deletions data/bigquery/v0/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bigquery

import (
"context"
"fmt"

"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
)

type ReadInput struct {
ProjectID string
DatasetID string
TableName string
Client *bigquery.Client
Filtering string
}

type ReadOutput struct {
Data []map[string]any `json:"data"`
}

func queryBuilder(input ReadInput) string {
if input.Filtering == "" {
return fmt.Sprintf("SELECT * FROM `%s.%s.%s`", input.ProjectID, input.DatasetID, input.TableName)
}
return fmt.Sprintf("SELECT * FROM `%s.%s.%s` %s", input.ProjectID, input.DatasetID, input.TableName, input.Filtering)
}

func readDataFromBigQuery(input ReadInput) (ReadOutput, error) {

ctx := context.Background()
client := input.Client

sql := queryBuilder(input)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
return ReadOutput{}, err
}
result := []map[string]any{}
for {
var values []bigquery.Value
err := it.Next(&values)

if err == iterator.Done {
break
}
data := map[string]any{}

for i, schema := range it.Schema {
data[schema.Name] = values[i]
}

result = append(result, data)
}

return ReadOutput{Data: result}, nil
}

0 comments on commit 4d2e7ec

Please sign in to comment.