Skip to content

Commit

Permalink
slow_queries: support export (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
unbyte authored and breezewish committed Nov 26, 2020
1 parent 09dd36a commit a673ded
Show file tree
Hide file tree
Showing 13 changed files with 468 additions and 194 deletions.
106 changes: 72 additions & 34 deletions pkg/apiserver/slowquery/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ type SlowQuery struct {
}

type GetListRequest struct {
RangeBeginTs uint `json:"rangeBeginTs" form:"rangeBeginTs"`
RangeEndTs uint `json:"rangeEndTs" form:"rangeEndTs"`
DB []string `json:"db" form:"db"`
Limit uint `json:"limit" form:"limit"`
Text string `json:"text" form:"text"`
OrderBy string `json:"orderBy" form:"orderBy"`
IsDesc bool `json:"desc" form:"desc"`
BeginTime int `json:"begin_time" form:"begin_time"`
EndTime int `json:"end_time" form:"end_time"`
DB []string `json:"db" form:"db"`
Limit uint `json:"limit" form:"limit"`
Text string `json:"text" form:"text"`
OrderBy string `json:"orderBy" form:"orderBy"`
IsDesc bool `json:"desc" form:"desc"`

// for showing slow queries in the statement detail page
Plans []string `json:"plans" form:"plans"`
Expand All @@ -108,33 +108,58 @@ type GetListRequest struct {
Fields string `json:"fields" form:"fields"` // example: "Query,Digest"
}

func getProjectionsByFields(jsonFields ...string) ([]string, error) {
fields := make(map[string]*reflect.StructField)
t := reflect.TypeOf(SlowQuery{})
fieldsNum := t.NumField()
for i := 0; i < fieldsNum; i++ {
field := t.Field(i)
fields[strings.ToLower(field.Tag.Get("json"))] = &field
var cachedProjectionsMap map[string]string

func getProjectionsMap() map[string]string {
if cachedProjectionsMap == nil {
t := reflect.TypeOf(SlowQuery{})
fieldsNum := t.NumField()
ret := map[string]string{}
for i := 0; i < fieldsNum; i++ {
field := t.Field(i)
// ignore to check error because the field is defined by ourself
// we can confirm that it has "gorm" tag and fixed structure
s, _ := field.Tag.Lookup("gorm")
jsonField := strings.ToLower(field.Tag.Get("json"))
sourceField := strings.Split(s, ":")[1]
if proj, ok := field.Tag.Lookup("proj"); ok {
ret[jsonField] = fmt.Sprintf("%s AS %s", proj, sourceField)
} else {
ret[jsonField] = sourceField
}
}
cachedProjectionsMap = ret
}
return cachedProjectionsMap
}

func getProjectionsByFields(jsonFields ...string) ([]string, error) {
projMap := getProjectionsMap()
ret := make([]string, 0, len(jsonFields))
for _, fieldName := range jsonFields {
field, ok := fields[strings.ToLower(fieldName)]
field, ok := projMap[strings.ToLower(fieldName)]
if !ok {
return nil, fmt.Errorf("unknown field %s", fieldName)
}
// ignore to check error because the field is defined by ourself
// we can confirm that it has "gorm" tag and fixed structure
s, _ := field.Tag.Lookup("gorm")
sourceField := strings.Split(s, ":")[1]
if proj, ok := field.Tag.Lookup("proj"); ok {
ret = append(ret, fmt.Sprintf("%s AS %s", proj, sourceField))
} else {
ret = append(ret, sourceField)
}
ret = append(ret, field)
}
return ret, nil
}

var cachedAllProjections []string

func getAllProjections() []string {
if cachedAllProjections == nil {
projMap := getProjectionsMap()
ret := make([]string, 0, len(projMap))
for _, proj := range projMap {
ret = append(ret, proj)
}
cachedAllProjections = ret
}
return cachedAllProjections
}

type GetDetailRequest struct {
Digest string `json:"digest" form:"digest"`
Timestamp float64 `json:"timestamp" form:"timestamp"`
Expand All @@ -143,21 +168,29 @@ type GetDetailRequest struct {
}

func QuerySlowLogList(db *gorm.DB, req *GetListRequest) ([]SlowQuery, error) {
sqlFields := []string{"digest", "connection_id", "timestamp"}
if strings.TrimSpace(req.Fields) != "" {
sqlFields = append(sqlFields, strings.Split(req.Fields, ",")...)
sqlFields = funk.UniqString(sqlFields)
}
projections, err := getProjectionsByFields(sqlFields...)
if err != nil {
return nil, err
var projections []string
var err error
reqFields := strings.Split(req.Fields, ",")
if len(reqFields) == 1 && reqFields[0] == "*" {
projections = getAllProjections()
} else {
projections, err = getProjectionsByFields(
funk.UniqString(
append([]string{"digest", "connection_id", "timestamp"}, reqFields...),
)...)
if err != nil {
return nil, err
}
}

tx := db.
Table(SlowQueryTable).
Select(strings.Join(projections, ", ")).
Where("Time BETWEEN FROM_UNIXTIME(?) AND FROM_UNIXTIME(?)", req.RangeBeginTs, req.RangeEndTs).
Limit(req.Limit)
Where("Time BETWEEN FROM_UNIXTIME(?) AND FROM_UNIXTIME(?)", req.BeginTime, req.EndTime)

if req.Limit > 0 {
tx = tx.Limit(req.Limit)
}

if req.Text != "" {
lowerStr := strings.ToLower(req.Text)
Expand All @@ -177,6 +210,11 @@ func QuerySlowLogList(db *gorm.DB, req *GetListRequest) ([]SlowQuery, error) {
tx = tx.Where("DB IN (?)", req.DB)
}

// more robust
if req.OrderBy == "" {
req.OrderBy = "timestamp"
}

order, err := getProjectionsByFields(req.OrderBy)
if err != nil {
return nil, err
Expand Down
88 changes: 84 additions & 4 deletions pkg/apiserver/slowquery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
package slowquery

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/joomcode/errorx"

"github.com/gin-gonic/gin"
"go.uber.org/fx"
Expand All @@ -24,6 +29,11 @@ import (
"github.com/pingcap-incubator/tidb-dashboard/pkg/tidb"
)

var (
ErrNS = errorx.NewNamespace("error.api.slow_query")
ErrNoData = ErrNS.NewType("export_no_data")
)

type ServiceParams struct {
fx.In
TiDBClient *tidb.Client
Expand All @@ -39,10 +49,17 @@ func NewService(p ServiceParams) *Service {

func Register(r *gin.RouterGroup, auth *user.AuthService, s *Service) {
endpoint := r.Group("/slow_query")
endpoint.Use(auth.MWAuthRequired())
endpoint.Use(utils.MWConnectTiDB(s.params.TiDBClient))
endpoint.GET("/list", s.getList)
endpoint.GET("/detail", s.getDetails)
{
endpoint.GET("/download", s.downloadHandler)
endpoint.Use(auth.MWAuthRequired())
endpoint.Use(utils.MWConnectTiDB(s.params.TiDBClient))
{
endpoint.GET("/list", s.getList)
endpoint.GET("/detail", s.getDetails)

endpoint.POST("/download/token", s.downloadTokenHandler)
}
}
}

// @Summary List all slow queries
Expand Down Expand Up @@ -88,3 +105,66 @@ func (s *Service) getDetails(c *gin.Context) {
}
c.JSON(http.StatusOK, *result)
}

// @Router /slow_query/download/token [post]
// @Summary Generate a download token for exported slow query statements
// @Produce plain
// @Param request body GetListRequest true "Request body"
// @Success 200 {string} string "xxx"
// @Security JwtAuth
// @Failure 401 {object} utils.APIError "Unauthorized failure"
func (s *Service) downloadTokenHandler(c *gin.Context) {
var req GetListRequest
if err := c.ShouldBindJSON(&req); err != nil {
utils.MakeInvalidRequestErrorFromError(c, err)
return
}
db := utils.GetTiDBConnection(c)
fields := []string{}
if strings.TrimSpace(req.Fields) != "" {
fields = strings.Split(req.Fields, ",")
}
list, err := QuerySlowLogList(db, &req)
if err != nil {
_ = c.Error(err)
return
}
if len(list) == 0 {
_ = c.Error(ErrNoData.NewWithNoMessage())
return
}

// interface{} tricky
rawData := make([]interface{}, len(list))
for i, v := range list {
rawData[i] = v
}

// convert data
csvData := utils.GenerateCSVFromRaw(rawData, fields, []string{})

// generate temp file that persist encrypted data
timeLayout := "0102150405"
beginTime := time.Unix(int64(req.BeginTime), 0).Format(timeLayout)
endTime := time.Unix(int64(req.EndTime), 0).Format(timeLayout)
token, err := utils.ExportCSV(csvData,
fmt.Sprintf("slowquery_%s_%s_*.csv", beginTime, endTime),
"slowquery/download")

if err != nil {
_ = c.Error(err)
return
}
c.String(http.StatusOK, token)
}

// @Router /slow_query/download [get]
// @Summary Download slow query statements
// @Produce text/csv
// @Param token query string true "download token"
// @Failure 400 {object} utils.APIError
// @Failure 401 {object} utils.APIError "Unauthorized failure"
func (s *Service) downloadHandler(c *gin.Context) {
token := c.Query("token")
utils.DownloadByToken(token, "slowquery/download", c)
}
52 changes: 31 additions & 21 deletions pkg/apiserver/statement/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,40 +103,50 @@ type Model struct {
RelatedSchemas string `json:"related_schemas"`
}

func getAggrFields(sqlFields ...string) []string {
fields := make(map[string]*reflect.StructField)
t := reflect.TypeOf(Model{})
fieldsNum := t.NumField()
for i := 0; i < fieldsNum; i++ {
field := t.Field(i)
fields[strings.ToLower(field.Tag.Get("json"))] = &field
var cachedAggrMap map[string]string // jsonFieldName => aggr

func getAggrMap() map[string]string {
if cachedAggrMap == nil {
t := reflect.TypeOf(Model{})
fieldsNum := t.NumField()
ret := map[string]string{}
for i := 0; i < fieldsNum; i++ {
field := t.Field(i)
jsonField := strings.ToLower(field.Tag.Get("json"))
if agg, ok := field.Tag.Lookup("agg"); ok {
ret[jsonField] = fmt.Sprintf("%s AS %s", agg, gorm.ToColumnName(field.Name))
}
}
cachedAggrMap = ret
}
return cachedAggrMap
}

func getAggrFields(sqlFields ...string) []string {
aggrMap := getAggrMap()
ret := make([]string, 0, len(sqlFields))
for _, fieldName := range sqlFields {
if field, ok := fields[strings.ToLower(fieldName)]; ok {
if agg, ok := field.Tag.Lookup("agg"); ok {
ret = append(ret, fmt.Sprintf("%s AS %s", agg, gorm.ToColumnName(field.Name)))
} else {
panic(fmt.Sprintf("field %s cannot be aggregated", fieldName))
}
if aggr, ok := aggrMap[strings.ToLower(fieldName)]; ok {
ret = append(ret, aggr)
} else {
panic(fmt.Sprintf("unknown aggregation field %s", fieldName))
}
}
return ret
}

var cachedAllAggrFields []string

func getAllAggrFields() []string {
t := reflect.TypeOf(Model{})
fieldsNum := t.NumField()
ret := make([]string, 0, fieldsNum)
for i := 0; i < fieldsNum; i++ {
field := t.Field(i)
if agg, ok := field.Tag.Lookup("agg"); ok {
ret = append(ret, fmt.Sprintf("%s AS %s", agg, gorm.ToColumnName(field.Name)))
if cachedAllAggrFields == nil {
aggrMap := getAggrMap()
ret := make([]string, 0, len(aggrMap))
for _, aggr := range aggrMap {
ret = append(ret, aggr)
}
cachedAllAggrFields = ret
}
return ret
return cachedAllAggrFields
}

// tableNames example: "d1.a1,d2.a2,d1.a1,d3.a3"
Expand Down

0 comments on commit a673ded

Please sign in to comment.