Skip to content

Commit

Permalink
Merge e534480 into 5cbef4c
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Jun 22, 2020
2 parents 5cbef4c + e534480 commit a169d4c
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 96 deletions.
243 changes: 173 additions & 70 deletions cloud/bq/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,79 @@ import (
"bytes"
"context"
"errors"
"fmt"
"html/template"
"log"

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"google.golang.org/api/iterator"

"github.com/m-lab/go/dataset"

"github.com/m-lab/etl-gardener/tracker"
)

// Queryer provides the interface for running bigquery operations.
type Queryer interface {
QueryFor(key string) string
Run(ctx context.Context, key string, dryRun bool) (bqiface.Job, error)
Copy(ctx context.Context, dryRun bool) (bqiface.Job, error)
// OpsHandler provides the interface for running bigquery operations.
type OpsHandler interface {
DedupQuery() string

LoadToTmp(ctx context.Context, dryRun bool) (bqiface.Job, error)
Dedup(ctx context.Context, dryRun bool) (bqiface.Job, error)
CopyToRaw(ctx context.Context, dryRun bool) (bqiface.Job, error)
DeleteTmp(ctx context.Context) error
}

// queryer is used to construct a dedup query.
type queryer struct {
client bqiface.Client
Project string
Date string // Name of the partition field
Job tracker.Job
client bqiface.Client
LoadSource string // The bucket/path to load from.
Project string
Date string // Name of the partition field
Job tracker.Job
// map key is the single field name, value is fully qualified name
Partition map[string]string
Order string
PartitionKeys map[string]string
OrderKeys string
}

// ErrDatatypeNotSupported is returned by Query for unsupported datatypes.
var ErrDatatypeNotSupported = errors.New("Datatype not supported")

// NewQuerier creates a suitable QueryParams for a Job.
// NewQuerier creates a suitable OpsHandler for a Job.
// The context is used to create a bigquery client, and should be kept alive while
// the querier is in use.
func NewQuerier(ctx context.Context, job tracker.Job, project string) (Queryer, error) {
func NewQuerier(ctx context.Context, job tracker.Job, project string, loadSource string) (OpsHandler, error) {
c, err := bigquery.NewClient(ctx, project)
if err != nil {
return nil, err
}
bqClient := bqiface.AdaptClient(c)
return NewQuerierWithClient(bqClient, job, project)
return NewQuerierWithClient(bqClient, job, project, loadSource)
}

// NewQuerierWithClient creates a suitable QueryParams for a Job.
func NewQuerierWithClient(client bqiface.Client, job tracker.Job, project string) (Queryer, error) {
func NewQuerierWithClient(client bqiface.Client, job tracker.Job, project string, loadSource string) (OpsHandler, error) {
switch job.Datatype {
case "annotation":
return &queryer{
client: client,
Project: project,
Date: "date",
Job: job,
Partition: map[string]string{"id": "id"},
Order: "",
client: client,
LoadSource: loadSource,
Project: project,
Date: "date",
Job: job,
PartitionKeys: map[string]string{"id": "id"},
OrderKeys: "",
}, nil

case "ndt7":
return &queryer{
client: client,
Project: project,
Date: "date",
Job: job,
Partition: map[string]string{"id": "id"},
Order: "",
client: client,
LoadSource: loadSource,
Project: project,
Date: "date",
Job: job,
PartitionKeys: map[string]string{"id": "id"},
OrderKeys: "",
}, nil

// TODO: enable tcpinfo again once it supports standard columns.
Expand All @@ -78,9 +86,9 @@ func NewQuerierWithClient(client bqiface.Client, job tracker.Job, project string
Project: project,
Date: "DATE(TestTime)",
Job: job,
Partition: map[string]string{"uuid": "uuid", "Timestamp": "FinalSnapshot.Timestamp"},
PartitionKeys: map[string]string{"uuid": "uuid", "Timestamp": "FinalSnapshot.Timestamp"},
// TODO TaskFileName should be ArchiveURL once we update the schema.
Order: "ARRAY_LENGTH(Snapshots) DESC, ParseInfo.TaskFileName, ",
OrderKeys: "ARRAY_LENGTH(Snapshots) DESC, ParseInfo.TaskFileName, ",
}, nil
*/
default:
Expand All @@ -89,8 +97,7 @@ func NewQuerierWithClient(client bqiface.Client, job tracker.Job, project string
}

var queryTemplates = map[string]*template.Template{
"dedup": dedupTemplate,
"cleanup": cleanupTemplate,
"dedup": dedupTemplate,
}

// MakeQuery creates a query from a template.
Expand All @@ -103,18 +110,14 @@ func (params queryer) makeQuery(t *template.Template) string {
return out.String()
}

// QueryFor returns the appropriate query in string form.
func (params queryer) QueryFor(key string) string {
t, ok := queryTemplates[key]
if !ok {
return ""
}
return params.makeQuery(t)
// DedupQuery returns the appropriate query in string form.
func (params queryer) DedupQuery() string {
return params.makeQuery(dedupTemplate)
}

// Run executes a query constructed from a template. It returns the bqiface.Job.
func (params queryer) Run(ctx context.Context, key string, dryRun bool) (bqiface.Job, error) {
qs := params.QueryFor(key)
func (params queryer) Dedup(ctx context.Context, dryRun bool) (bqiface.Job, error) {
qs := params.DedupQuery()
if len(qs) == 0 {
return nil, dataset.ErrNilQuery
}
Expand All @@ -132,32 +135,71 @@ func (params queryer) Run(ctx context.Context, key string, dryRun bool) (bqiface
return q.Run(ctx)
}

// Copy copies the tmp_ job partition to the raw_ job partition.
func (params queryer) Copy(ctx context.Context, dryRun bool) (bqiface.Job, error) {
// LoadToTmp loads the tmp_ exp table from GCS files.
func (params queryer) LoadToTmp(ctx context.Context, dryRun bool) (bqiface.Job, error) {
if dryRun {
return nil, errors.New("dryrun not implemented")
}
if params.client == nil {
return nil, dataset.ErrNilBqClient
}
src := params.client.Dataset("tmp_" + params.Job.Experiment).Table(params.Job.Datatype)
dest := params.client.Dataset("raw_" + params.Job.Experiment).Table(params.Job.Datatype)

copier := dest.CopierFrom(src)
config := bqiface.CopyConfig{}
config.WriteDisposition = bigquery.WriteTruncate
config.Dst = dest
config.Srcs = append(config.Srcs, src)
copier.SetCopyConfig(config)
return copier.Run(ctx)
}
gcsRef := bigquery.NewGCSReference(params.LoadSource)
gcsRef.SourceFormat = bigquery.JSON

// Dedup executes a query that deletes duplicates from the destination table.
func (params queryer) Dedup(ctx context.Context, dryRun bool) (bqiface.Job, error) {
return params.Run(ctx, "dedup", dryRun)
dest := params.client.
Dataset("tmp_" + params.Job.Experiment).
Table(params.Job.Datatype)
if dest == nil {
return nil, ErrTableNotFound
}
loader := dest.LoaderFrom(gcsRef)
loadConfig := bqiface.LoadConfig{}
loadConfig.WriteDisposition = bigquery.WriteAppend
loadConfig.Dst = dest
loadConfig.Src = gcsRef
loader.SetLoadConfig(loadConfig)

return loader.Run(ctx)
}

// Cleanup executes a query that deletes the entire partition
// from the tmp table.
func (params queryer) Cleanup(ctx context.Context, dryRun bool) (bqiface.Job, error) {
return params.Run(ctx, "cleanup", dryRun)
// CopyToRaw copies the tmp_ job partition to the raw_ job partition.
func (params queryer) CopyToRaw(ctx context.Context, dryRun bool) (bqiface.Job, error) {
if dryRun {
return nil, errors.New("dryrun not implemented")
}
if params.client == nil {
return nil, dataset.ErrNilBqClient
}
// HACK - for now, we use a standard bq client, because using
// bqiface doesn't seem to actually work. However, this may
// be due to incorrect observations caused by streaming buffer delay,
// and should be re-evaluated.
client, err := bigquery.NewClient(ctx, params.client.Dataset("tmp_"+params.Job.Experiment).ProjectID())
if err != nil {
return nil, err
}

// TODO - names should be fields in queryer.
src := client.Dataset("tmp_" + params.Job.Experiment).
Table(params.Job.Datatype + "$" + params.Job.Date.Format("20060102"))
dest := client.Dataset("raw_" + params.Job.Experiment).
Table(params.Job.Datatype + "$" + params.Job.Date.Format("20060102"))

if m, err := src.Metadata(ctx); err == nil {
log.Printf("Source %s: %+v\n", src.FullyQualifiedName(), m)
}
if m, err := dest.Metadata(ctx); err == nil {
log.Printf("Dest %s: %+v\n", dest.FullyQualifiedName(), m)
}

copier := dest.CopierFrom(src)
copier.CopyConfig.WriteDisposition = bigquery.WriteTruncate
//log.Printf("%+v\n%+v\n%+v\n", config.Srcs[0], config.Dst, *(*bqiface.Copier)(copier))

j, err := copier.Run(ctx)

return &xJob{j: j}, err
}

// TODO get the tmp_ and raw_ from the job Target?
Expand All @@ -179,11 +221,11 @@ AND NOT EXISTS (
WITH keep AS (
SELECT * EXCEPT(row_number) FROM (
SELECT
{{range $k, $v := .Partition}}{{$v}}, {{end}}
{{range $k, $v := .PartitionKeys}}{{$v}}, {{end}}
parser.Time,
ROW_NUMBER() OVER (
PARTITION BY {{range $k, $v := .Partition}}{{$v}}, {{end}}date
ORDER BY {{.Order}} parser.Time DESC
PARTITION BY {{range $k, $v := .PartitionKeys}}{{$v}}, {{end}}date
ORDER BY {{.OrderKeys}} parser.Time DESC
) row_number
FROM (
SELECT * FROM ` + tmpTable + `
Expand All @@ -196,14 +238,75 @@ AND NOT EXISTS (
# This matches against the keep table based on keys. Sufficient select keys must be
# used to distinguish the preferred row from the others.
WHERE
{{range $k, $v := .Partition}}target.{{$v}} = keep.{{$k}} AND {{end}}
{{range $k, $v := .PartitionKeys}}target.{{$v}} = keep.{{$k}} AND {{end}}
target.parser.Time = keep.Time
)`))

var cleanupTemplate = template.Must(template.New("").Parse(`
#standardSQL
# Delete all rows in a partition.
DELETE
FROM ` + tmpTable + `
WHERE {{.Date}} = "{{.Job.Date.Format "2006-01-02"}}"
`))
// DeleteTmp deletes the tmp table partition.
func (params queryer) DeleteTmp(ctx context.Context) error {
if params.client == nil {
return dataset.ErrNilBqClient
}
// TODO - name should be field in queryer.
tmp := params.client.Dataset("tmp_" + params.Job.Experiment).Table(
fmt.Sprintf("%s$%s", params.Job.Datatype, params.Job.Date.Format("20060102")))
log.Println("Deleting", tmp.FullyQualifiedName())
return tmp.Delete(ctx)
}

// This is used to allow using bigquery.Copier as a bqiface.Copier. YUCK.
type xRowIterator struct {
i *bigquery.RowIterator
bqiface.RowIterator
}

func (i *xRowIterator) SetStartIndex(s uint64) {
i.i.StartIndex = s
}
func (i *xRowIterator) Schema() bigquery.Schema {
return i.i.Schema
}
func (i *xRowIterator) TotalRows() uint64 {
return i.i.TotalRows
}
func (i *xRowIterator) Next(p interface{}) error {
return i.i.Next(p)
}
func (i *xRowIterator) PageInfo() *iterator.PageInfo {
return i.i.PageInfo()
}

func assertRowIterator() {
func(bqiface.RowIterator) {}(&xRowIterator{})
}

type xJob struct {
j *bigquery.Job
bqiface.Job
}

func (x *xJob) ID() string {
return x.j.ID()
}
func (x *xJob) Location() string {
return x.j.Location()
}
func (x *xJob) Config() (bigquery.JobConfig, error) {
return x.j.Config()
}
func (x *xJob) Status(ctx context.Context) (*bigquery.JobStatus, error) {
return x.j.Status(ctx)
}
func (x *xJob) LastStatus() *bigquery.JobStatus {
return x.j.LastStatus()
}
func (x *xJob) Cancel(ctx context.Context) error {
return x.j.Cancel(ctx)
}
func (x *xJob) Wait(ctx context.Context) (*bigquery.JobStatus, error) {
return x.j.Wait(ctx)
}
func (x *xJob) Read(ctx context.Context) (bqiface.RowIterator, error) {
i, err := x.j.Read(ctx)
return &xRowIterator{i: i}, err
}

0 comments on commit a169d4c

Please sign in to comment.