Skip to content

Commit

Permalink
Cache size
Browse files Browse the repository at this point in the history
  • Loading branch information
maeb committed Jan 15, 2021
1 parent eddcb04 commit 1e8ab14
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 96 deletions.
30 changes: 9 additions & 21 deletions cmd/warc/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package cmd

import (
"fmt"
"runtime"

"github.com/fsnotify/fsnotify"
"github.com/nlnwa/gowarc/cmd/warc/cmd/cat"
"github.com/nlnwa/gowarc/cmd/warc/cmd/index"
Expand All @@ -27,6 +25,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"strings"
)

type conf struct {
Expand All @@ -41,16 +40,7 @@ func NewCommand() *cobra.Command {
Use: "warc",
Short: "A tool for handling warc files",
Long: ``,

PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
// Increase GOMAXPROCS as recommended by badger
// https://github.com/dgraph-io/badger#are-there-any-go-specific-settings-that-i-should-use
runtime.GOMAXPROCS(128)

if c.logLevel == "" {
c.logLevel = viper.GetString("loglevel")
}

level, err := log.ParseLevel(c.logLevel)
if err != nil {
return fmt.Errorf("'%s' is not part of the valid levels: 'panic', 'fatal', 'error', 'warn', 'warning', 'info', 'debug', 'trace'", c.logLevel)
Expand All @@ -63,10 +53,13 @@ func NewCommand() *cobra.Command {

cobra.OnInitialize(func() { c.initConfig() })


// Flags
cmd.PersistentFlags().StringVarP(&c.logLevel, "log-level", "l", "", "set the log level of gowarc, it will take precedence over config 'loglevel'")
cmd.PersistentFlags().StringVar(&c.cfgFile, "config", "", "config file. If not set, /etc/warc/, $HOME/.warc/ and current working dir will be searched for file config.yaml")
viper.BindPFlag("config", cmd.PersistentFlags().Lookup("config"))
cmd.PersistentFlags().StringVarP(&c.logLevel, "log-level", "l", "info", "fatal, error, warn, info, debug or trace")
cmd.PersistentFlags().StringVar(&c.cfgFile, "config", "c", "config file. If not set, /etc/warc/, $HOME/.warc/ and current working dir will be searched for file config.yaml")
if err := viper.BindPFlags(cmd.PersistentFlags()); err != nil {
log.Fatalf("Failed to bind serve flags: %v", err)
}

// Subcommands
cmd.AddCommand(ls.NewCommand())
Expand All @@ -80,14 +73,9 @@ func NewCommand() *cobra.Command {
// initConfig reads in config file and ENV variables if set.
func (c *conf) initConfig() {
viper.SetTypeByDefaultValue(true)
viper.SetDefault("warcdir", []string{"."})
viper.SetDefault("indexdir", ".")
viper.SetDefault("autoindex", true)
viper.SetDefault("warcport", 9999)
viper.SetDefault("loglevel", "info")

viper.AutomaticEnv() // read in environment variables that match

viper.EnvKeyReplacer(strings.NewReplacer("-", "_"))
if viper.IsSet("config") {
// Use config file from the flag.
viper.SetConfigFile(viper.GetString("config"))
Expand All @@ -110,7 +98,7 @@ func (c *conf) initConfig() {
// Config file not found; ignore error
} else {
// Config file was found but another error was produced
log.Fatalf("error reading config file: %v", err)
log.Fatalf("Failed to read config file: %v", err)
}
}

Expand Down
84 changes: 59 additions & 25 deletions cmd/warc/cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,94 @@
package serve

import (
"context"
"fmt"
"github.com/gorilla/handlers"
"github.com/nlnwa/gowarc/pkg/index"
"github.com/nlnwa/gowarc/pkg/server"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
"time"
)

type conf struct {
port int
warcDirs []string
watchDepth int
}

func NewCommand() *cobra.Command {
c := &conf{}
var cmd = &cobra.Command{
cmd := &cobra.Command{
Use: "serve",
Short: "Start the warc server to serve warc records",
Long: ``,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
// Increase GOMAXPROCS as recommended by badger
// https://github.com/dgraph-io/badger#are-there-any-go-specific-settings-that-i-should-use
runtime.GOMAXPROCS(128)
},
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) > 0 {
c.warcDirs = args
} else {
c.warcDirs = viper.GetStringSlice("warcdir")
viper.Set("warc-dir", args)
}
return runE(c)
return runE()
},
}

cmd.Flags().IntVarP(&c.port, "port", "p", -1, "the port that should be used to serve, will use config value otherwise")
cmd.Flags().IntVarP(&c.watchDepth, "watch-depth", "w", 4, "The maximum depth when indexing warc")
cmd.Flags().IntP("port", "p", 9999, "Server listening port")
cmd.Flags().StringP("path-prefix", "", "/", "Path prefix")
cmd.Flags().IntP("watch-depth", "d", 4, "The maximum depth when indexing warc")
cmd.Flags().BoolP("auto-index", "", true, "Enable automatic indexing")

cmd.Flags().StringP("index-dir", "", ".", "Index directory")
cmd.Flags().StringSliceP("warc-dir", "", []string{"."}, "List of directories containing warcfiles")
cmd.Flags().StringP("cdx-cache-size", "", "", "Size of cdx index cache in bytes")
cmd.Flags().StringP("file-cache-size", "", "", "Size of file index cache in bytes")
cmd.Flags().StringP("id-cache-size", "", "", "Size of id index cache")
if err := viper.BindPFlags(cmd.Flags()); err != nil {
log.Fatalf("Failed to bind serve flags: %v", err)
}

return cmd
}

func runE(c *conf) error {
if c.port < 0 {
c.port = viper.GetInt("warcport")
}
func runE() error {
opts := index.DefaultOptions().
WithDir(viper.GetString("index-dir")).
WithIdCacheSize(int64(viper.GetSizeInBytes("id-cache-size"))).
WithFileCacheSize(int64(viper.GetSizeInBytes("file-cache-size"))).
WithCdxCacheSize(int64(viper.GetSizeInBytes("cdx-cache-size")))

dbDir := viper.GetString("indexdir")
db, err := index.NewIndexDb(dbDir)
db, err := index.NewIndexDb(opts)
if err != nil {
log.Fatal(err)
}
defer db.Close()

if viper.GetBool("autoindex") {
if viper.GetBool("auto-index") {
log.Infof("Starting autoindexer")
autoindexer := index.NewAutoIndexer(db, c.warcDirs, c.watchDepth)
autoindexer := index.NewAutoIndexer(db, viper.GetStringSlice("warc-dir"), viper.GetInt("watch-depth"))
defer autoindexer.Shutdown()
}

log.Infof("Starting web server at http://localhost:%v", c.port)
server.Serve(db, c.port)
return nil
loggingMw := func(h http.Handler) http.Handler {
return handlers.CombinedLoggingHandler(os.Stdout, h)
}
server.Handler(db, loggingMw)

httpServer := &http.Server{
Addr: fmt.Sprintf(":%v", viper.GetString("port")),
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigs
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = httpServer.Shutdown(ctx)
}()

log.Infof("Starting web server at http://localhost:%v", viper.GetInt("port"))
return httpServer.ListenAndServe()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/nlnwa/gowarc
go 1.13

require (
github.com/dgraph-io/badger/v2 v2.0.2
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/fsnotify/fsnotify v1.4.7
github.com/golang/protobuf v1.4.0-rc.4
github.com/gorilla/handlers v1.4.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v2 v2.0.2 h1:uBAA5oM9Gz9TrP01v9LxBGztE5rhtGeBxpF1IvxGGtw=
github.com/dgraph-io/badger/v2 v2.0.2/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.2007.2 h1:EjjK0KqwaFMlPin1ajhP943VPENHJdEz1KLIegjaI3k=
github.com/dgraph-io/badger/v2 v2.2007.2/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
Expand Down
5 changes: 5 additions & 0 deletions k8s/overlays/prod/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
autoindex: true
indexdir: /warcindex
warcdir:
- /warcfiles
loglevel: info
32 changes: 32 additions & 0 deletions k8s/overlays/prod/deployment_volume_patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: gowarc-server
spec:
template:
spec:
containers:
- name: gowarc-server
resources:
limits:
memory: 30Gi
volumeMounts:
- mountPath: /etc/warc
name: gowarc-config
- mountPath: /warcindex
name: warcindex
subPath: nettaviser/nettaviser_2020
- mountPath: /warcfiles
name: warcfiles
subPath: veidemann/validwarcs/nettaviser/nettaviser_2020
volumes:
- name: gowarc-config
configMap:
name: gowarc-config
- name: warcindex
persistentVolumeClaim:
claimName: loke-b11
- name: warcfiles
persistentVolumeClaim:
claimName: nettarkivet-0
readOnly: true
17 changes: 17 additions & 0 deletions k8s/overlays/prod/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: nettarkivet
namePrefix: "loke-"
nameSuffix: "-nettaviser"

resources:
- ../../base

configMapGenerator:
- name: gowarc-config
files:
- config.yaml

patchesStrategicMerge:
- deployment_volume_patch.yaml
34 changes: 21 additions & 13 deletions pkg/index/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type Db struct {
cdxIndex *badger.DB
dbGcInterval *time.Ticker

// cache settings
fileIndexCacheSize int64
cdxIndexCacheSize int64
idIndexCacheSize int64

// batch settings
batchMaxSize int
batchMaxWait time.Duration
Expand All @@ -55,8 +60,8 @@ type Db struct {
batchFlushChan chan []*record
}

func NewIndexDb(dbDir string) (*Db, error) {
dbDir = path.Join(dbDir, "warcdb")
func NewIndexDb(opts Options) (*Db, error) {
dbDir := path.Join(opts.Dir, "warcdb")
idIndexDir := path.Join(dbDir, "id-index")
fileIndexDir := path.Join(dbDir, "file-index")
cdxIndexDir := path.Join(dbDir, "cdx-index")
Expand Down Expand Up @@ -93,17 +98,17 @@ func NewIndexDb(dbDir string) (*Db, error) {
// Open db
var err error

d.idIndex, err = openIndex(idIndexDir)
d.idIndex, err = openIndex(idIndexDir, opts.IdCacheSize)
if err != nil {
return nil, err
}

d.fileIndex, err = openIndex(fileIndexDir)
d.fileIndex, err = openIndex(fileIndexDir, opts.FileCacheSize)
if err != nil {
return nil, err
}

d.cdxIndex, err = openIndex(cdxIndexDir)
d.cdxIndex, err = openIndex(cdxIndexDir, opts.CdxCacheSize)
if err != nil {
return nil, err
}
Expand All @@ -117,11 +122,11 @@ func NewIndexDb(dbDir string) (*Db, error) {
return d, nil
}

func openIndex(indexDir string) (db *badger.DB, err error) {
if err := os.MkdirAll(indexDir, 0777); err != nil {
func openIndex(dir string, cacheSize int64) (db *badger.DB, err error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
opts := badger.DefaultOptions(indexDir)
opts := badger.DefaultOptions(dir).WithIndexCacheSize(cacheSize)
opts.Logger = log.StandardLogger()
db, err = badger.Open(opts)
return
Expand Down Expand Up @@ -221,12 +226,18 @@ func (d *Db) UpdateFilePath(filePath string) {
}

func (d *Db) AddBatch(records []*record) {
log.Debugf("flushing batch to DB")
log.Debug("Flushing batch to DB")

var err error

err = d.idIndex.Update(func(txn *badger.Txn) error {
for _, r := range records {
if r == nil {
log.Warn("Record is nil")
continue
} else if r.filePath == "" {
log.Warn("Empty filepath")
}
r.filePath, err = filepath.Abs(r.filePath)
if err != nil {
log.Errorf("%v", err)
Expand Down Expand Up @@ -274,10 +285,7 @@ func (d *Db) Flush() {
return
}

copiedItems := make([]*record, len(d.batchItems))
for idx, i := range d.batchItems {
copiedItems[idx] = i
}
copiedItems := d.batchItems
d.batchItems = d.batchItems[:0]
d.batchFlushChan <- copiedItems
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/index/indexworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func indexFile(db *Db, fileName string) {
// Check if file is indexed and has not changed since indexing
stat, err := os.Stat(fileName)
if err != nil {
log.Errorf("%v", err)
log.Errorf("Failed to stat file %v: %v", fileName, err)
}

fileSize := stat.Size()
Expand All @@ -111,7 +111,7 @@ func indexFile(db *Db, fileName string) {
if fileInfo, err := db.GetFilePath(fn); err == nil {
fileInfoLastModified, err := ptypes.Timestamp(fileInfo.LastModified)
if err != nil {
log.Errorf("%v", err)
log.Errorf("Failed to convert timestamp: %v", err)
}
if fileInfo.Size == fileSize && fileInfoLastModified.Equal(fileLastModified) {
log.Debugf("Already indexed %v", fileName)
Expand Down
Loading

0 comments on commit 1e8ab14

Please sign in to comment.