/
http_server.go
124 lines (105 loc) · 3.51 KB
/
http_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package server
import (
"context"
"net"
"net/http"
"time"
"github.com/gin-contrib/cors"
ginzap "github.com/gin-contrib/zap"
"github.com/gin-gonic/gin"
"github.com/mosuka/phalanx/clients"
"go.uber.org/zap"
)
const CorsMaxAge = 12 * time.Hour
func init() {
gin.SetMode(gin.ReleaseMode)
}
type HTTPIndexServer struct {
httpAddress string
grpcAddress string
certificateFile string
keyFile string
corsAllowedMethods []string
corsAllowedOrigins []string
corsAllowedHeaders []string
logger *zap.Logger
ctx context.Context
cancel context.CancelFunc
client *clients.GRPCIndexClient
router *gin.Engine
listener net.Listener
}
func NewHTTPIndexServerWithTLS(httpAddress string, grpcAddress string, certificateFile string, keyFile string, commonName string, corsAllowedMethods []string, corsAllowedOrigins []string, corsAllowedHeaders []string, logger *zap.Logger) (*HTTPIndexServer, error) {
httpLogger := logger.Named("http")
client, err := clients.NewGRPCIndexClientWithTLS(grpcAddress, certificateFile, commonName)
if err != nil {
httpLogger.Error(err.Error(), zap.String("grpc_address", grpcAddress), zap.String("certificate_file", certificateFile), zap.String("common_name", commonName))
return nil, err
}
marshaler := NewMarshaler()
ctx, cancel := context.WithCancel(context.Background())
router := gin.Default()
router.Use(setClient(client))
router.Use(setMarshaler(marshaler))
router.Use(ginzap.Ginzap(httpLogger, time.RFC3339, true))
if len(corsAllowedOrigins) > 0 || len(corsAllowedMethods) > 0 || len(corsAllowedHeaders) > 0 {
corsConfig := cors.Config{
AllowOrigins: corsAllowedOrigins,
AllowMethods: corsAllowedMethods,
AllowHeaders: corsAllowedHeaders,
AllowCredentials: true,
MaxAge: CorsMaxAge,
}
router.Use(cors.New(corsConfig))
}
router.GET("/livez", livezHandlerFunc)
router.GET("/readyz", readyzHandlerFunc)
router.GET("/metrics", metricsHandlerFunc)
router.GET("/cluster", clusterHandlerFunc)
router.PUT("/v1/indexes/:index_name", createIndexHandlerFunc)
router.DELETE("/v1/indexes/:index_name", deleteIndexHandlerFunc)
router.PUT("/v1/indexes/:index_name/documents", addDocumentsHandlerFunc)
router.DELETE("/v1/indexes/:index_name/documents", deleteDocumentsHandlerFunc)
router.POST("/v1/indexes/:index_name/_search", searchHandlerFunc)
listener, err := net.Listen("tcp", httpAddress)
if err != nil {
cancel()
httpLogger.Error(err.Error(), zap.String("http_address", httpAddress))
return nil, err
}
return &HTTPIndexServer{
httpAddress: httpAddress,
grpcAddress: grpcAddress,
certificateFile: certificateFile,
keyFile: keyFile,
corsAllowedMethods: corsAllowedMethods,
corsAllowedOrigins: corsAllowedOrigins,
corsAllowedHeaders: corsAllowedHeaders,
logger: httpLogger,
ctx: ctx,
cancel: cancel,
client: client,
router: router,
listener: listener,
}, nil
}
func (s *HTTPIndexServer) Start() error {
go func() {
if s.certificateFile == "" && s.keyFile == "" {
_ = http.Serve(s.listener, s.router)
} else {
_ = http.ServeTLS(s.listener, s.router, s.certificateFile, s.keyFile)
}
}()
return nil
}
func (s *HTTPIndexServer) Stop() error {
defer s.cancel()
if err := s.listener.Close(); err != nil {
s.logger.Warn(err.Error())
}
if err := s.client.Close(); err != nil {
s.logger.Error(err.Error())
}
return nil
}