Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/handlers/search_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
var (
filterPrefix = "filter."
whiteListQueryParamKey = "filter.type"

queryPrefix = "query."
)

type SearchHandler struct {
Expand Down Expand Up @@ -77,6 +79,7 @@ func (handler *SearchHandler) buildSearchCfg(params url.Values) (cfg discovery.S
cfg.MaxResults, _ = strconv.Atoi(params.Get("size"))
cfg.Filters = filterConfigFromValues(params)
cfg.RankBy = params.Get("rankby")
cfg.Queries = queryConfigFromValues(params)
cfg.TypeWhiteList, err = parseTypeWhiteList(params)
return
}
Expand Down
33 changes: 32 additions & 1 deletion api/handlers/search_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,30 @@ func TestSearchHandlerSearch(t *testing.T) {
"service": {"kafka", "rabbitmq"},
"data.landscape": {"th"},
},
Queries: make(map[string]string),
}

searcher.On("Search", ctx, cfg).Return([]discovery.SearchResult{}, nil)
},
ValidateResponse: func(tc testCase, body io.Reader) error {
return nil
},
},
{
Title: "should pass queries to search config format",
Querystring: "text=resource&landscape=id,vn&filter.data.landscape=th&filter.type=topic&filter.service=kafka,rabbitmq&query.data.columns.name=timestamp&query.owners.email=john.doe@email.com",
InitSearcher: func(tc testCase, searcher *mock.RecordSearcher) {
cfg := discovery.SearchConfig{
Text: "resource",
TypeWhiteList: []string{"topic"},
Filters: map[string][]string{
"service": {"kafka", "rabbitmq"},
"data.landscape": {"th"},
},
Queries: map[string]string{
"data.columns.name": "timestamp",
"owners.email": "john.doe@email.com",
},
}

searcher.On("Search", ctx, cfg).Return([]discovery.SearchResult{}, nil)
Expand All @@ -73,6 +97,7 @@ func TestSearchHandlerSearch(t *testing.T) {
cfg := discovery.SearchConfig{
Text: "test",
Filters: make(map[string][]string),
Queries: make(map[string]string),
}
response := []discovery.SearchResult{
{
Expand Down Expand Up @@ -123,6 +148,7 @@ func TestSearchHandlerSearch(t *testing.T) {
Text: "resource",
MaxResults: 10,
Filters: make(map[string][]string),
Queries: make(map[string]string),
}

var results []discovery.SearchResult
Expand Down Expand Up @@ -221,14 +247,15 @@ func TestSearchHandlerSuggest(t *testing.T) {
cfg := discovery.SearchConfig{
Text: "test",
Filters: map[string][]string{},
Queries: make(map[string]string),
}
searcher.On("Suggest", ctx, cfg).Return([]string{}, fmt.Errorf("service unavailable"))
},
ExpectStatus: http.StatusInternalServerError,
},
{
Title: "should pass filter to search config format",
Querystring: "text=resource&landscape=id,vn&filter.data.landscape=th&filter.type=topic&filter.service=kafka,rabbitmq",
Querystring: "text=resource&landscape=id,vn&query.description=this is my dashboard&filter.data.landscape=th&filter.type=topic&filter.service=kafka,rabbitmq",
InitSearcher: func(tc testCase, searcher *mock.RecordSearcher) {
cfg := discovery.SearchConfig{
Text: "resource",
Expand All @@ -237,6 +264,9 @@ func TestSearchHandlerSuggest(t *testing.T) {
"service": {"kafka", "rabbitmq"},
"data.landscape": {"th"},
},
Queries: map[string]string{
"description": "this is my dashboard",
},
}

searcher.On("Suggest", ctx, cfg).Return([]string{}, nil)
Expand All @@ -252,6 +282,7 @@ func TestSearchHandlerSuggest(t *testing.T) {
cfg := discovery.SearchConfig{
Text: "test",
Filters: make(map[string][]string),
Queries: make(map[string]string),
}
response := []string{
"test",
Expand Down
14 changes: 14 additions & 0 deletions api/handlers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,17 @@ func filterConfigFromValues(querystring url.Values) map[string][]string {
}
return filter
}

func queryConfigFromValues(querystring url.Values) map[string]string {
var query = make(map[string]string)
for key, values := range querystring {
// filters are of form "query.{field}"
if !strings.HasPrefix(key, queryPrefix) {
continue
}

queryKey := strings.TrimPrefix(key, queryPrefix)
query[queryKey] = values[0] // cannot have duplicate query key, always get the first one
}
return query
}
6 changes: 6 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ func initElasticsearch(config Config) *elasticsearch.Client {
esClient, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: brokers,
Transport: nrelasticsearch.NewRoundTripper(nil),
// uncomment below code to debug request and response to elasticsearch
// Logger: &estransport.ColorLogger{
// Output: os.Stdout,
// EnableRequestBody: true,
// EnableResponseBody: true,
// },
})
if err != nil {
log.Fatalf("error connecting to elasticsearch: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions discovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ type SearchConfig struct {

// RankBy is a param to rank based on a specific parameter
RankBy string

// Queries is a param to search a resource based on record's fields
Queries map[string]string
}
8 changes: 8 additions & 0 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Record struct {
Data map[string]interface{} `json:"data"`
Labels map[string]string `json:"labels"`
Tags []string `json:"tags"`
Owners []Owner `json:"owners"`
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
CreatedAt time.Time `json:"created_at"`
Expand All @@ -25,6 +26,13 @@ type LineageRecord struct {
Type string `json:"type"`
}

type Owner struct {
URN string `json:"urn"`
Name string `json:"name"`
Role string `json:"role"`
Email string `json:"email"`
}

type ErrNoSuchRecord struct {
RecordID string
}
Expand Down
24 changes: 22 additions & 2 deletions store/elasticsearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (sr *Searcher) buildQuery(ctx context.Context, cfg discovery.SearchConfig,
var query elastic.Query

query = sr.buildTextQuery(ctx, cfg.Text)
query = sr.buildFilterQueries(query, cfg.Filters)
query = sr.buildFilterTermQueries(query, cfg.Filters)
query = sr.buildFilterMatchQueries(ctx, query, cfg.Queries)
query = sr.buildFunctionScoreQuery(query, cfg.RankBy)

src, err := query.Source()
Expand Down Expand Up @@ -230,7 +231,25 @@ func (sr *Searcher) buildTextQuery(ctx context.Context, text string) elastic.Que
)
}

func (sr *Searcher) buildFilterQueries(query elastic.Query, filters map[string][]string) elastic.Query {
func (sr *Searcher) buildFilterMatchQueries(ctx context.Context, query elastic.Query, queries map[string]string) elastic.Query {
if len(queries) == 0 {
return query
}

esQueries := []elastic.Query{}
for field, value := range queries {
esQueries = append(esQueries,
elastic.
NewMatchQuery(field, value).
Fuzziness("AUTO"))
}

return elastic.NewBoolQuery().
Should(query).
Filter(esQueries...)
}

func (sr *Searcher) buildFilterTermQueries(query elastic.Query, filters map[string][]string) elastic.Query {
if len(filters) == 0 {
return query
}
Expand All @@ -246,6 +265,7 @@ func (sr *Searcher) buildFilterQueries(query elastic.Query, filters map[string][
values = append(values, rawVal)
}

key := fmt.Sprintf("%s.keyword", key)
filterQueries = append(
filterQueries,
elastic.NewTermsQuery(key, values...),
Expand Down
41 changes: 41 additions & 0 deletions store/elasticsearch/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestSearcherSearch(t *testing.T) {
{Type: "topic", RecordID: "order-topic"},
{Type: "topic", RecordID: "purchase-topic"},
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Expand All @@ -78,6 +79,7 @@ func TestSearcherSearch(t *testing.T) {
{Type: "topic", RecordID: "order-topic"},
{Type: "topic", RecordID: "purchase-topic"},
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Expand Down Expand Up @@ -115,6 +117,7 @@ func TestSearcherSearch(t *testing.T) {
Expected: []expectedRow{
{Type: "topic", RecordID: "order-topic"},
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Expand All @@ -127,6 +130,19 @@ func TestSearcherSearch(t *testing.T) {
"data.company": {"odpf"},
},
},
Expected: []expectedRow{
{Type: "topic", RecordID: "consumer-topic"},
{Type: "topic", RecordID: "consumer-mq-2"},
},
},
{
Description: "should return 'consumer-topic' if filter owner email with 'john.doe@email.com'",
Config: discovery.SearchConfig{
Text: "topic",
Filters: map[string][]string{
"owners.email": {"john.doe@email.com"},
},
},
Expected: []expectedRow{
{Type: "topic", RecordID: "consumer-topic"},
},
Expand All @@ -143,6 +159,31 @@ func TestSearcherSearch(t *testing.T) {
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-1"},
},
},
{
Description: "should return consumer-topic if search by query description field with text 'rabbitmq' and owners name 'johndoe'",
Config: discovery.SearchConfig{
Text: "consumer",
Queries: map[string]string{
"description": "rabbitmq",
"owners.name": "john doe",
},
},
Expected: []expectedRow{
{Type: "topic", RecordID: "consumer-topic"},
},
},
{
Description: "should return 'bigquery::gcpproject/dataset/tablename-common' resource on top if search by query table column name field with text 'tablename-common-column1'",
Config: discovery.SearchConfig{
Text: "tablename",
Queries: map[string]string{
"data.schema.columns.name": "common",
},
},
Expected: []expectedRow{
{Type: "table", RecordID: "bigquery::gcpproject/dataset/tablename-common"},
},
},
}
for _, test := range tests {
t.Run(test.Description, func(t *testing.T) {
Expand Down
Loading