Skip to content

Commit

Permalink
NewESEngine変更
Browse files Browse the repository at this point in the history
  • Loading branch information
ikura-hamu committed Jun 24, 2023
1 parent 43b7070 commit bfeb3a4
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func provideFirebaseCredentialsFilePathString(c *Config) variable.FirebaseCreden

func provideESEngineConfig(c *Config) search.ESEngineConfig {
return search.ESEngineConfig{
URL: c.ES.URL,
URL: []string{c.ES.URL},
}
}

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ require (
github.com/docker/docker v20.10.24+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect
github.com/elastic/go-elasticsearch/v8 v8.8.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fogleman/gg v1.3.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dyatlov/go-opengraph/opengraph v0.0.0-20220524092352-606d7b1e5f8a h1:etIrTD8BQqzColk9nKRusM9um5+1q0iOEJLqfBMIK64=
github.com/dyatlov/go-opengraph/opengraph v0.0.0-20220524092352-606d7b1e5f8a/go.mod h1:emQhSYTXqB0xxjLITTw4EaWZ+8IIQYw+kx9GqNUKdLg=
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY=
github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
70 changes: 55 additions & 15 deletions service/search/es.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package search

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/gofrs/uuid"
"github.com/olivere/elastic/v7"
"go.uber.org/zap"
Expand All @@ -29,12 +33,12 @@ func getIndexName(index string) string {
// ESEngineConfig Elasticsearch検索エンジン設定
type ESEngineConfig struct {
// URL ESのURL
URL string
URL []string
}

// esEngine search.Engine 実装
type esEngine struct {
client *elastic.Client
client *elasticsearch.Client
mm message.Manager
cm channel.Manager
repo repository.Repository
Expand Down Expand Up @@ -73,6 +77,11 @@ type esMessageDocUpdate struct {
HasAudio bool `json:"hasAudio"`
}

type esCreateIndexBody struct {
Mappings m `json:"mappings"`
Settings m `json:"settings"`
}

type m map[string]interface{}

// esMapping Elasticsearchに入るメッセージの情報
Expand Down Expand Up @@ -161,36 +170,65 @@ var esSetting = m{

// NewESEngine Elasticsearch検索エンジンを生成します
func NewESEngine(mm message.Manager, cm channel.Manager, repo repository.Repository, logger *zap.Logger, config ESEngineConfig) (Engine, error) {
// es接続
client, err := elastic.NewClient(elastic.SetURL(config.URL), elastic.SetSniff(false))
// esクライアント作成
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: config.URL,
})
if err != nil {
return nil, fmt.Errorf("failed to init search engine: %w", err)
}

// esバージョン確認
version, err := client.ElasticsearchVersion(config.URL)
infoRes, err := client.Info()
if err != nil {
return nil, fmt.Errorf("failed to fetch es version: %w", err)
return nil, fmt.Errorf("failed to get search engine info: %w", err)
}
if infoRes.IsError() {
return nil, fmt.Errorf("failed to get search engine info: %s", infoRes.String())
}
defer infoRes.Body.Close()

var r map[string]interface{}
if err := json.NewDecoder(infoRes.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("failed to decode search engine info: %w", err)
}

version, ok := r["version"].(map[string]interface{})["number"].(string)
if !ok {
return nil, fmt.Errorf("failed to convert version value '%v' to string", r["version"].(map[string]interface{})["number"])
}
logger.Info(fmt.Sprintf("Using elasticsearch version %s", version))
if !strings.HasPrefix(version, esRequiredVersionPrefix) {
return nil, fmt.Errorf("failed to init search engine: unsupported version (%s). expected major version %s", version, esRequiredVersionPrefix)
}

// index確認
if exists, err := client.IndexExists(getIndexName(esMessageIndex)).Do(context.Background()); err != nil {
existsRes, err := client.Indices.Exists([]string{getIndexName(esMessageIndex)})
if err != nil || existsRes.IsError() {
return nil, fmt.Errorf("failed to init search engine: %w", err)
} else if !exists {
// index作成
r1, err := client.CreateIndex(getIndexName(esMessageIndex)).BodyJson(m{
"mappings": esMapping,
"settings": esSetting,
}).Do(context.Background())
}
if existsRes.StatusCode == http.StatusNotFound {
body, err := json.Marshal(esCreateIndexBody{
Mappings: esMapping,
Settings: esSetting,
})
if err != nil {
return nil, fmt.Errorf("failed to init search engine: %w", err)
}
if !r1.Acknowledged {
return nil, fmt.Errorf("failed to init search engine: index not acknowledged")
createIndexRes, err := client.Index(getIndexName(esMessageIndex), bytes.NewBuffer(body), client.Index.WithContext(context.Background()))
if err != nil {
return nil, fmt.Errorf("failed to init search engine: %w", err)
}
defer createIndexRes.Body.Close()
if err := json.NewDecoder(createIndexRes.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("failed to decode create index response: %w", err)
}
acknowledged, ok := r["acknowledged"].(bool)
if !ok {
return nil, fmt.Errorf("failed to convert es index acknowledged value: %v", createIndexRes.String())
}
if !acknowledged {
return nil, fmt.Errorf("failed to create index")
}
}

Expand Down Expand Up @@ -286,6 +324,8 @@ func (e *esEngine) Do(q *Query) (Result, error) {
// NOTE: 現状`sort.Key`はそのままesのソートキーとして使える前提
sort := q.GetSortKey()



sr, err := e.client.Search().
Index(getIndexName(esMessageIndex)).
Query(elastic.NewBoolQuery().Must(musts...)).
Expand Down

0 comments on commit bfeb3a4

Please sign in to comment.