Skip to content

Commit

Permalink
Merge pull request #7625 from tinyspeck/am_vtadmin_vschemas
Browse files Browse the repository at this point in the history
[vtadmin] vschemas api endpoints
  • Loading branch information
rafael committed Mar 8, 2021
2 parents a6bee38 + 0743256 commit 2c9063e
Show file tree
Hide file tree
Showing 12 changed files with 31,001 additions and 14,374 deletions.
1,227 changes: 1,118 additions & 109 deletions go/vt/proto/vtadmin/vtadmin.pb.go

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func NewAPI(clusters []*cluster.Cluster, opts grpcserver.Options, httpOpts vtadm
router.HandleFunc("/schemas", httpAPI.Adapt(vtadminhttp.GetSchemas)).Name("API.GetSchemas")
router.HandleFunc("/tablets", httpAPI.Adapt(vtadminhttp.GetTablets)).Name("API.GetTablets")
router.HandleFunc("/tablet/{tablet}", httpAPI.Adapt(vtadminhttp.GetTablet)).Name("API.GetTablet")
router.HandleFunc("/vschema/{cluster_id}/{keyspace}", httpAPI.Adapt(vtadminhttp.GetVSchema)).Name("API.GetVSchema")
router.HandleFunc("/vschemas", httpAPI.Adapt(vtadminhttp.GetVSchemas)).Name("API.GetVSchemas")
router.HandleFunc("/vtexplain", httpAPI.Adapt(vtadminhttp.VTExplain)).Name("API.VTExplain")

// Middlewares are executed in order of addition. Our ordering (all
Expand Down Expand Up @@ -663,6 +665,118 @@ func (api *API) getClustersForRequest(ids []string) ([]*cluster.Cluster, []strin
return clusters, ids
}

// GetVSchema is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetVSchema(ctx context.Context, req *vtadminpb.GetVSchemaRequest) (*vtadminpb.VSchema, error) {
span, ctx := trace.NewSpan(ctx, "API.GetVSchema")
defer span.Finish()

c, ok := api.clusterMap[req.ClusterId]
if !ok {
return nil, fmt.Errorf("%w: no such cluster %s", errors.ErrUnsupportedCluster, req.ClusterId)
}

cluster.AnnotateSpan(c, span)

if err := c.Vtctld.Dial(ctx); err != nil {
return nil, err
}

return c.GetVSchema(ctx, req.Keyspace)
}

// GetVSchemas is part of the vtadminpb.VTAdminServer interface.
func (api *API) GetVSchemas(ctx context.Context, req *vtadminpb.GetVSchemasRequest) (*vtadminpb.GetVSchemasResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.GetVSchemas")
defer span.Finish()

clusters, _ := api.getClustersForRequest(req.ClusterIds)

var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
vschemas []*vtadminpb.VSchema
)

if len(clusters) == 0 {
if len(req.ClusterIds) > 0 {
return nil, fmt.Errorf("%w: %s", errors.ErrUnsupportedCluster, strings.Join(req.ClusterIds, ", "))
}

return &vtadminpb.GetVSchemasResponse{VSchemas: []*vtadminpb.VSchema{}}, nil
}

for _, c := range clusters {
wg.Add(1)

go func(c *cluster.Cluster) {
defer wg.Done()

span, ctx := trace.NewSpan(ctx, "API.getVSchemasForCluster")
defer span.Finish()

cluster.AnnotateSpan(c, span)

if err := c.Vtctld.Dial(ctx); err != nil {
rec.RecordError(fmt.Errorf("Vtctld.Dial(cluster = %s): %w", c.ID, err))
return
}

keyspaces, err := c.Vtctld.GetKeyspaces(ctx, &vtctldatapb.GetKeyspacesRequest{})
if err != nil {
rec.RecordError(fmt.Errorf("GetKeyspaces(cluster = %s): %w", c.ID, err))
return
}

var (
clusterM sync.Mutex
clusterWG sync.WaitGroup
clusterRec concurrency.AllErrorRecorder
clusterVSchemas = make([]*vtadminpb.VSchema, 0, len(keyspaces.Keyspaces))
)

for _, keyspace := range keyspaces.Keyspaces {
clusterWG.Add(1)

go func(keyspace *vtctldatapb.Keyspace) {
defer clusterWG.Done()

vschema, err := c.GetVSchema(ctx, keyspace.Name)
if err != nil {
clusterRec.RecordError(fmt.Errorf("GetVSchema(keyspace = %s): %w", keyspace.Name, err))
return
}

clusterM.Lock()
clusterVSchemas = append(clusterVSchemas, vschema)
clusterM.Unlock()
}(keyspace)
}

clusterWG.Wait()

if clusterRec.HasErrors() {
rec.RecordError(fmt.Errorf("GetVSchemas(cluster = %s): %w", c.ID, clusterRec.Error()))
return
}

m.Lock()
vschemas = append(vschemas, clusterVSchemas...)
m.Unlock()
}(c)
}

wg.Wait()

if rec.HasErrors() {
return nil, rec.Error()
}

return &vtadminpb.GetVSchemasResponse{
VSchemas: vschemas,
}, nil
}

// VTExplain is part of the vtadminpb.VTAdminServer interface.
func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest) (*vtadminpb.VTExplainResponse, error) {
span, ctx := trace.NewSpan(ctx, "API.VTExplain")
Expand Down

0 comments on commit 2c9063e

Please sign in to comment.