/
server.go
176 lines (141 loc) · 4.56 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package server
import (
"context"
"github.com/sonofbytes/gocrawl/api/pb"
"log"
"net"
"math/rand"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
// AuthService is an interface to meet the authentication needs
type AuthService interface {
Authenticate(username, password string) (session string, err error)
Validate(session string) error
SetConnection(host string) error
}
// QueueService is an interface to meet the queue needs
type QueueService interface {
Submit(ctx context.Context, session string, url string, depth int, job string) error
SetConnection(host string) error
}
// StoreService is an interface to meet the store needs
type StoreService interface {
Get(ctx context.Context, session string, url string) (urls []string, err error)
SetConnection(host string) error
}
// Server satisfies gRPC service interface requirements API
type Server struct {
server *grpc.Server
authService AuthService
queueService QueueService
storeService StoreService
}
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func init() {
// ensure our session ids are seeded randomly-ish
rand.Seed(time.Now().UTC().UnixNano())
}
// New creates a gRPC server instance for API
func New(authService AuthService, queueService QueueService, storeService StoreService) *Server {
s := &Server{
authService: authService,
queueService: queueService,
storeService: storeService,
}
s.server = grpc.NewServer()
apipb.RegisterAPIServer(s.server, s)
// Register reflection service on gRPC server.
reflection.Register(s.server)
return s
}
// Serve handles gRPC requests for service API
func (s *Server) Serve(address string) error {
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
return s.server.Serve(listener)
}
// Submit handles client requests to Submit to the API service
func (s *Server) Submit(ctx context.Context, ar *apipb.APISubmitRequest) (*apipb.APISubmitReply, error) {
if ar == nil || ar.Session == "" || ar.Url == "" {
return &apipb.APISubmitReply{}, status.Error(codes.InvalidArgument, "request needs Session token and Url")
}
if p, ok := peer.FromContext(ctx); ok {
if addr, ok := p.Addr.(*net.TCPAddr); ok {
s.authService.SetConnection(addr.IP.String())
s.queueService.SetConnection(addr.IP.String())
}
}
err := s.authService.Validate(ar.Session)
if err != nil {
return &apipb.APISubmitReply{}, err
}
job := randSeq(12)
err = s.queueService.Submit(ctx, ar.Session, ar.Url, 0, job)
if err != nil {
return &apipb.APISubmitReply{}, err
}
return &apipb.APISubmitReply{
Job: job,
}, nil
}
func (s *Server) Get(ctx context.Context, ar *apipb.APIGetRequest) (*apipb.APIGetReply, error) {
if ar == nil || ar.Session == "" || ar.Url == "" {
return &apipb.APIGetReply{}, status.Error(codes.InvalidArgument, "request needs Session token and Url")
}
if p, ok := peer.FromContext(ctx); ok {
if addr, ok := p.Addr.(*net.TCPAddr); ok {
s.authService.SetConnection(addr.IP.String())
s.storeService.SetConnection(addr.IP.String())
}
}
err := s.authService.Validate(ar.Session)
if err != nil {
return &apipb.APIGetReply{}, err
}
urls, err := s.storeService.Get(ctx, ar.Session, ar.Url)
return &apipb.APIGetReply{
Urls: urls,
}, err
}
func (s *Server) Authenticate(ctx context.Context, ar *apipb.APIAuthenticateRequest) (*apipb.APIAuthenticateReply, error) {
if ar == nil || ar.Username == "" || ar.Password == "" {
return &apipb.APIAuthenticateReply{}, status.Error(codes.InvalidArgument, "request needs Username and Password")
}
if p, ok := peer.FromContext(ctx); ok {
if addr, ok := p.Addr.(*net.TCPAddr); ok {
s.authService.SetConnection(addr.IP.String())
}
}
session, err := s.authService.Authenticate(ar.Username, ar.Password)
return &apipb.APIAuthenticateReply{
Session: session,
}, err
}
func (s *Server) Validate(ctx context.Context, ar *apipb.APIValidateRequest) (*apipb.APIValidateReply, error) {
if ar == nil || ar.Session == "" {
return &apipb.APIValidateReply{}, status.Error(codes.InvalidArgument, "request needs Username and Password")
}
if p, ok := peer.FromContext(ctx); ok {
if addr, ok := p.Addr.(*net.TCPAddr); ok {
s.authService.SetConnection(addr.IP.String())
}
}
err := s.authService.Validate(ar.Session)
return &apipb.APIValidateReply{
Valid: err == nil,
}, err
}
func randSeq(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}