Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize core service implement logic #148

Merged
merged 2 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ release/paopao-ce --no-default-features --features sqlite3,localoss,loggerfile,r

目前支持的功能集合:
* 数据库: MySQL/Sqlite3/PostgreSQL
`Gorm` + `MySQL`/`Sqlite3`/`PostgreSQL` 使用[gorm](https://github.com/go-gorm/gorm)作为数据库的ORM,默认使用 `Grom` + `MySQL`组合(目前状态:稳定,默认,推荐使用);
`Sqlx` + `MySQL`/`PostgreSQL` 使用[sqlx](https://github.com/jmoiron/sqlx)作为数据库的ORM(目前状态:WIP);
* 对象存储: AliOSS/MinIO/LocalOSS
`AliOSS` 阿里云对象存储服务;
`MinIO` [MinIO](https://github.com/minio/minio)对象存储服务;
Expand Down
1 change: 1 addition & 0 deletions config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Features:
Option: ["SimpleCacheIndex"]
Sms: "SmsJuhe"
SmsJuhe:
Gateway: https://v.juhe.cn/sms/send
Key:
TplID:
TplVal: "#code#=%d&#m#=%d"
Expand Down
24 changes: 24 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,27 @@ func Cfg(key string) (string, bool) {
func CfgIf(expression string) bool {
return features.CfgIf(expression)
}

func GetOssDomain() string {
uri := "https://"
if CfgIf("AliOSS") {
return uri + AliOSSSetting.Domain + "/"
} else if CfgIf("MinIO") {
if !MinIOSetting.Secure {
uri = "http://"
}
return uri + MinIOSetting.Domain + "/" + MinIOSetting.Bucket + "/"
} else if CfgIf("S3") {
if !S3Setting.Secure {
uri = "http://"
}
// TODO: will not work well need test in real world
return uri + S3Setting.Domain + "/" + S3Setting.Bucket + "/"
} else if CfgIf("LocalOSS") {
if !LocalOSSSetting.Secure {
uri = "http://"
}
return uri + LocalOSSSetting.Domain + "/oss/" + LocalOSSSetting.Bucket + "/"
}
return uri + AliOSSSetting.Domain + "/"
}
7 changes: 4 additions & 3 deletions internal/conf/settting.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ type AlipaySettingS struct {
}

type SmsJuheSettings struct {
Key string
TplID string
TplVal string
Gateway string
Key string
TplID string
TplVal string
}

type FeaturesSettingS struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/core/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type Action struct {
// AuthorizationManageService 授权管理服务
type AuthorizationManageService interface {
IsAllow(user *model.User, action *Action) bool
GetFriendFilter(userId int64) FriendFilter
GetFriendIds(userId int64) []int64
BeFriendFilter(userId int64) FriendFilter
BeFriendIds(userId int64) ([]int64, error)
}

func (f FriendFilter) IsFriend(userId int64) bool {
Expand Down
3 changes: 2 additions & 1 deletion internal/core/tweets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
)

// TweetService 推文检索服务
Expand Down Expand Up @@ -44,5 +45,5 @@ type TweetHelpService interface {

// IndexPostsService 广场首页推文列表服务
type IndexPostsService interface {
IndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error)
IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error)
}
11 changes: 11 additions & 0 deletions internal/core/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package core

import (
"github.com/Masterminds/semver/v3"
)

// VersionInfo 版本信息
type VersionInfo interface {
Name() string
Version() *semver.Version
}
19 changes: 0 additions & 19 deletions internal/dao/attachment.go

This file was deleted.

37 changes: 0 additions & 37 deletions internal/dao/authority.go

This file was deleted.

70 changes: 27 additions & 43 deletions internal/dao/cache_index_big.go → internal/dao/cache/bigcache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dao
package cache

import (
"bytes"
Expand All @@ -8,81 +8,65 @@ import (

"github.com/Masterminds/semver/v3"
"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
"github.com/sirupsen/logrus"
)

func newBigCacheIndexServant(getIndexPosts indexPostsFunc) (*bigCacheIndexServant, versionInfo) {
s := conf.BigCacheIndexSetting

config := bigcache.DefaultConfig(s.ExpireInSecond)
config.Shards = s.MaxIndexPage
config.Verbose = s.Verbose
config.MaxEntrySize = 10000
config.Logger = logrus.StandardLogger()
cache, err := bigcache.NewBigCache(config)
if err != nil {
logrus.Fatalf("initial bigCahceIndex failure by err: %v", err)
}

cacheIndex := &bigCacheIndexServant{
getIndexPosts: getIndexPosts,
cache: cache,
}
var (
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
_ core.VersionInfo = (*bigCacheIndexServant)(nil)
)

// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
type postsEntry struct {
key string
tweets *rest.IndexTweetsResp
}

// 启动索引更新器
go cacheIndex.startIndexPosts()
type bigCacheIndexServant struct {
ips core.IndexPostsService

return cacheIndex, cacheIndex
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}

func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) {
func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
key := s.keyFrom(user, offset, limit)
posts, err := s.getPosts(key)
if err == nil {
logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from cache by key: %s", key)
return posts, nil
}

if posts, err = s.getIndexPosts(user, offset, limit); err != nil {
if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil {
return nil, err
}
logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from database by key: %s", key)
s.cachePosts(key, posts)
return posts, nil
}

func (s *bigCacheIndexServant) getPosts(key string) ([]*model.PostFormated, error) {
func (s *bigCacheIndexServant) getPosts(key string) (*rest.IndexTweetsResp, error) {
data, err := s.cache.Get(key)
if err != nil {
logrus.Debugf("bigCacheIndexServant.getPosts get posts by key: %s from cache err: %v", key, err)
return nil, err
}
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
var posts []*model.PostFormated
if err := dec.Decode(&posts); err != nil {
var resp rest.IndexTweetsResp
if err := dec.Decode(&resp); err != nil {
logrus.Debugf("bigCacheIndexServant.getPosts get posts from cache in decode err: %v", err)
return nil, err
}
return posts, nil
return &resp, nil
}

func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormated) {
entry := &postsEntry{key: key, posts: posts}
func (s *bigCacheIndexServant) cachePosts(key string, tweets *rest.IndexTweetsResp) {
entry := &postsEntry{key: key, tweets: tweets}
select {
case s.cachePostsCh <- entry:
logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts by chan of key: %s", key)
Expand All @@ -97,7 +81,7 @@ func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormate
func (s *bigCacheIndexServant) setPosts(entry *postsEntry) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry.posts); err != nil {
if err := enc.Encode(entry.tweets); err != nil {
logrus.Debugf("bigCacheIndexServant.setPosts setPosts encode post entry err: %v", err)
return
}
Expand Down Expand Up @@ -153,10 +137,10 @@ func (s *bigCacheIndexServant) startIndexPosts() {
}
}

func (s *bigCacheIndexServant) name() string {
func (s *bigCacheIndexServant) Name() string {
return "BigCacheIndex"
}

func (s *bigCacheIndexServant) version() *semver.Version {
func (s *bigCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
86 changes: 86 additions & 0 deletions internal/dao/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cache

import (
"time"

"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)

func NewBigCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) {
s := conf.BigCacheIndexSetting

config := bigcache.DefaultConfig(s.ExpireInSecond)
config.Shards = s.MaxIndexPage
config.Verbose = s.Verbose
config.MaxEntrySize = 10000
config.Logger = logrus.StandardLogger()
cache, err := bigcache.NewBigCache(config)
if err != nil {
logrus.Fatalf("initial bigCahceIndex failure by err: %v", err)
}

cacheIndex := &bigCacheIndexServant{
ips: indexPosts,
cache: cache,
}

// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)

// 启动索引更新器
go cacheIndex.startIndexPosts()

return cacheIndex, cacheIndex
}

func NewSimpleCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) {
s := conf.SimpleCacheIndexSetting
cacheIndex := &simpleCacheIndexServant{
ips: indexPosts,
maxIndexSize: s.MaxIndexSize,
indexPosts: nil,
checkTick: time.NewTicker(s.CheckTickDuration), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Second),
}

// force expire index every ExpireTickDuration second
if s.ExpireTickDuration != 0 {
cacheIndex.expireIndexTick.Reset(s.CheckTickDuration)
} else {
cacheIndex.expireIndexTick.Stop()
}

// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)

// start index posts
cacheIndex.atomicIndex.Store(cacheIndex.indexPosts)
go cacheIndex.startIndexPosts()

return cacheIndex, cacheIndex
}

func NewNoneCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) {
obj := &noneCacheIndexServant{
ips: indexPosts,
}
return obj, obj
}
Loading