Skip to content

Commit

Permalink
chore: refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
powerman committed May 30, 2021
1 parent 840dc3d commit a990187
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 146 deletions.
9 changes: 5 additions & 4 deletions internal/apix/jsonrpc2.go
Expand Up @@ -2,6 +2,7 @@ package apix

import (
"context"
"net"

"github.com/powerman/rpc-codec/jsonrpc2"
"github.com/powerman/structlog"
Expand Down Expand Up @@ -32,11 +33,11 @@ func (c *JSONRPC2Ctx) NewContext(
) {
ctx = c.Context()

remote := "unknown" // non-HTTP RPC call (like in tests)
remoteIP := "" // non-HTTP RPC call (like in tests)
if r := jsonrpc2.HTTPRequestFromContext(ctx); r != nil {
remote = xff.GetRemoteAddr(r)
remoteIP, _, _ = net.SplitHostPort(xff.GetRemoteAddr(r))
}
ctx = context.WithValue(ctx, contextKeyRemoteIP, remote)
ctx = context.WithValue(ctx, contextKeyRemoteIP, remoteIP)

methodName = reflectx.CallerMethodName(1)
ctx = context.WithValue(ctx, contextKeyMethodName, methodName)
Expand All @@ -50,7 +51,7 @@ func (c *JSONRPC2Ctx) NewContext(

log = structlog.New(
structlog.KeyApp, service,
def.LogRemoteIP, remote,
def.LogRemoteIP, remoteIP,
def.LogFunc, methodName,
def.LogUserName, auth.UserName,
)
Expand Down
34 changes: 17 additions & 17 deletions ms/auth/internal/dal/sql.go
Expand Up @@ -7,35 +7,35 @@ import (
//nolint:gosec // False positive.
const (
sqlUsersAdd = `
INSERT INTO users (id, pass_salt, pass_hash, email, display_name, role)
VALUES (:id, :pass_salt, :pass_hash, :email, :display_name, :role)
INSERT INTO users (id, pass_salt, pass_hash, email, display_name, role)
VALUES (:id, :pass_salt, :pass_hash, :email, :display_name, :role)
`
sqlUsersGet = `
SELECT id, pass_salt, pass_hash, email, display_name, role, created_at
FROM users
WHERE id = :id
SELECT id, pass_salt, pass_hash, email, display_name, role, created_at
FROM users
WHERE id = :id
`
sqlUsersGetByEmail = `
SELECT id, pass_salt, pass_hash, email, display_name, role, created_at
FROM users
WHERE LOWER(email) = LOWER(:email)
SELECT id, pass_salt, pass_hash, email, display_name, role, created_at
FROM users
WHERE LOWER(email) = LOWER(:email)
`
sqlGetUserByAccessToken = `
SELECT id, email, display_name, role, u.created_at
FROM access_tokens AS t LEFT JOIN users AS u ON (t.user_id = u.id)
WHERE access_token = :access_token
SELECT id, email, display_name, role, u.created_at
FROM access_tokens AS t LEFT JOIN users AS u ON (t.user_id = u.id)
WHERE access_token = :access_token
`
sqlAccessTokensAdd = `
INSERT INTO access_tokens (access_token, user_id)
VALUES (:access_token, :user_id)
INSERT INTO access_tokens (access_token, user_id)
VALUES (:access_token, :user_id)
`
sqlAccessTokensDel = `
DELETE FROM access_tokens
WHERE access_token = :access_token
DELETE FROM access_tokens
WHERE access_token = :access_token
`
sqlAccessTokensDelByUser = `
DELETE FROM access_tokens
WHERE user_id = :user_id
DELETE FROM access_tokens
WHERE user_id = :user_id
`
)

Expand Down
11 changes: 7 additions & 4 deletions ms/example/internal/dal/sql.go
Expand Up @@ -6,12 +6,15 @@ import (

const (
sqlExampleInc = `
INSERT INTO example (user_id, counter) VALUES (:user_id, 1)
ON DUPLICATE KEY
UPDATE counter = example.counter + 1, mtime = NOW()
INSERT INTO example (user_id, counter)
VALUES (:user_id, 1)
ON DUPLICATE KEY
UPDATE counter = example.counter + 1, mtime = NOW()
`
sqlExampleGet = `
SELECT counter, mtime FROM example WHERE user_id = :user_id
SELECT counter, mtime
FROM example
WHERE user_id = :user_id
`
)

Expand Down
65 changes: 65 additions & 0 deletions pkg/reflectx/netrpc-stdlib.go
@@ -0,0 +1,65 @@
// Code in this file is copied from net/rpc with minimum changes.
//
// Copyright 2009 The Go Authors. All rights reserved.

package reflectx

import (
"go/token"
"reflect"
)

// suitableMethods returns suitable Rpc methods of typ.
func suitableMethods(typ reflect.Type) (methods []string) {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 { //nolint:gomnd // net/rpc
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2) //nolint:gomnd // net/rpc
if replyType.Kind() != reflect.Ptr {
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
methods = append(methods, mname)
}
return methods
}

// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem() //nolint:gochecknoglobals // net/rpc

// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return token.IsExported(t.Name()) || t.PkgPath() == ""
}
66 changes: 5 additions & 61 deletions pkg/reflectx/netrpc.go
@@ -1,65 +1,9 @@
// Code in this file is copied from net/rpc with minimum changes.
//
// Copyright 2009 The Go Authors. All rights reserved.

package reflectx

import (
"go/token"
"reflect"
)

// suitableMethods returns suitable Rpc methods of typ.
func suitableMethods(typ reflect.Type) (methods []string) {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 { //nolint:gomnd // net/rpc
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2) //nolint:gomnd // net/rpc
if replyType.Kind() != reflect.Ptr {
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
methods = append(methods, mname)
}
return methods
}

// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem() //nolint:gochecknoglobals // net/rpc
import "reflect"

// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return token.IsExported(t.Name()) || t.PkgPath() == ""
// RPCMethodsOf require receiver value used for net/rpc.Register and
// returns all it RPC methods (detected in same way as net/rpc does).
func RPCMethodsOf(v interface{}) []string {
return suitableMethods(reflect.TypeOf(v))
}
6 changes: 0 additions & 6 deletions pkg/reflectx/reflectx.go
Expand Up @@ -7,12 +7,6 @@ import (
"strings"
)

// RPCMethodsOf require receiver value used for net/rpc.Register and
// returns all it RPC methods (detected in same way as net/rpc does).
func RPCMethodsOf(v interface{}) []string {
return suitableMethods(reflect.TypeOf(v))
}

// MethodsOf require pointer to interface (e.g.: new(app.Appl)) and
// returns all it methods.
func MethodsOf(v interface{}) []string {
Expand Down
37 changes: 37 additions & 0 deletions pkg/serve/grpc.go
@@ -0,0 +1,37 @@
package serve

import (
"net"

"github.com/powerman/structlog"
"google.golang.org/grpc"

"github.com/powerman/go-monolith-example/pkg/def"
"github.com/powerman/go-monolith-example/pkg/netx"
)

// GRPC starts gRPC server on addr, logged as service.
// It runs until failed or ctx.Done.
func GRPC(ctx Ctx, addr netx.Addr, srv *grpc.Server, service string) (err error) {
log := structlog.FromContext(ctx, nil).New(def.LogServer, service)

ln, err := net.Listen("tcp", addr.String())
if err != nil {
return err
}

log.Info("serve", def.LogHost, addr.Host(), def.LogPort, addr.Port())
errc := make(chan error, 1)
go func() { errc <- srv.Serve(ln) }()

select {
case err = <-errc:
case <-ctx.Done():
srv.GracefulStop() // It will not interrupt streaming.
}
if err != nil {
return log.Err("failed to serve", "err", err)
}
log.Info("shutdown")
return nil
}
54 changes: 0 additions & 54 deletions pkg/serve/serve.go → pkg/serve/http.go
Expand Up @@ -4,15 +4,11 @@ package serve
import (
"context"
"crypto/tls"
"net"
"net/http"
"net/rpc"

"github.com/powerman/rpc-codec/jsonrpc2"
"github.com/powerman/structlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"

"github.com/powerman/go-monolith-example/pkg/def"
"github.com/powerman/go-monolith-example/pkg/netx"
Expand Down Expand Up @@ -66,53 +62,3 @@ func HandleMetrics(mux *http.ServeMux, reg *prometheus.Registry) {
handler := promhttp.InstrumentMetricHandler(reg, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.Handle("/metrics", handler)
}

// RPC starts HTTP server on addr path /rpc using rcvr as JSON-RPC 2.0
// handler.
func RPC(ctx Ctx, addr netx.Addr, tlsConfig *tls.Config, rcvr interface{}) error {
return RPCName(ctx, addr, tlsConfig, rcvr, "")
}

// RPCName starts HTTP server on addr path /rpc using rcvr as JSON-RPC 2.0
// handler but uses the provided name for the type instead of the
// receiver's concrete type.
func RPCName(ctx Ctx, addr netx.Addr, tlsConfig *tls.Config, rcvr interface{}, name string) (err error) {
srv := rpc.NewServer()
if name != "" {
err = srv.RegisterName(name, rcvr)
} else {
err = srv.Register(rcvr)
}
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/rpc", jsonrpc2.HTTPHandler(srv))
return HTTP(ctx, addr, tlsConfig, mux, "JSON-RPC 2.0")
}

// GRPC starts gRPC server on addr, logged as service.
// It runs until failed or ctx.Done.
func GRPC(ctx Ctx, addr netx.Addr, srv *grpc.Server, service string) (err error) {
log := structlog.FromContext(ctx, nil).New(def.LogServer, service)

ln, err := net.Listen("tcp", addr.String())
if err != nil {
return err
}

log.Info("serve", def.LogHost, addr.Host(), def.LogPort, addr.Port())
errc := make(chan error, 1)
go func() { errc <- srv.Serve(ln) }()

select {
case err = <-errc:
case <-ctx.Done():
srv.GracefulStop() // It will not interrupt streaming.
}
if err != nil {
return log.Err("failed to serve", "err", err)
}
log.Info("shutdown")
return nil
}
35 changes: 35 additions & 0 deletions pkg/serve/jsonrpc2.go
@@ -0,0 +1,35 @@
package serve

import (
"crypto/tls"
"net/http"
"net/rpc"

"github.com/powerman/rpc-codec/jsonrpc2"

"github.com/powerman/go-monolith-example/pkg/netx"
)

// RPC starts HTTP server on addr path /rpc using rcvr as JSON-RPC 2.0
// handler.
func RPC(ctx Ctx, addr netx.Addr, tlsConfig *tls.Config, rcvr interface{}) error {
return RPCName(ctx, addr, tlsConfig, rcvr, "")
}

// RPCName starts HTTP server on addr path /rpc using rcvr as JSON-RPC 2.0
// handler but uses the provided name for the type instead of the
// receiver's concrete type.
func RPCName(ctx Ctx, addr netx.Addr, tlsConfig *tls.Config, rcvr interface{}, name string) (err error) {
srv := rpc.NewServer()
if name != "" {
err = srv.RegisterName(name, rcvr)
} else {
err = srv.Register(rcvr)
}
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/rpc", jsonrpc2.HTTPHandler(srv))
return HTTP(ctx, addr, tlsConfig, mux, "JSON-RPC 2.0")
}

0 comments on commit a990187

Please sign in to comment.