Skip to content

Commit

Permalink
add extension to compressed filed
Browse files Browse the repository at this point in the history
  • Loading branch information
lomik committed Apr 25, 2019
1 parent aed44a5 commit 4850d13
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 23 deletions.
9 changes: 4 additions & 5 deletions carbon-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/lomik/carbon-clickhouse/carbon"
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
"github.com/lomik/carbon-clickhouse/helper/config"
"github.com/lomik/zapwriter"
"go.uber.org/zap"

Expand Down Expand Up @@ -47,8 +46,8 @@ func main() {
printDefaultConfig := flag.Bool("config-print-default", false, "Print default config")
checkConfig := flag.Bool("check-config", false, "Check config and exit")
printVersion := flag.Bool("version", false, "Print version")
cat := flag.String("cat", "", "Print RowBinary file in TabSeparated format (uncompressed only)")
bincat := flag.String("recover", "", "Read all good records from corrupted data file (uncompressed only). Write binary data to stdout")
cat := flag.String("cat", "", "Print RowBinary file in TabSeparated format")
bincat := flag.String("recover", "", "Read all good records from corrupted data file. Write binary data to stdout")

flag.Parse()

Expand All @@ -58,7 +57,7 @@ func main() {
}

if *cat != "" {
reader, err := RowBinary.NewReader(*cat, config.CompAlgoNone, false)
reader, err := RowBinary.NewReader(*cat, false)
if err != nil {
log.Fatal(err)
}
Expand All @@ -83,7 +82,7 @@ func main() {
}

if *bincat != "" {
reader, err := RowBinary.NewReader(*bincat, config.CompAlgoNone, false)
reader, err := RowBinary.NewReader(*bincat, false)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 0 additions & 3 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,6 @@ func ReadConfig(filename string) (*Config, error) {
if err := u.Parse(); err != nil {
return nil, err
}

u.CompAlgo = cfg.Data.CompAlgo.CompAlgo
u.CompLevel = cfg.Data.CompLevel
}

return cfg, nil
Expand Down
11 changes: 4 additions & 7 deletions helper/RowBinary/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"io"
"math"
"os"
"strings"
"time"

"github.com/lomik/carbon-clickhouse/helper/config"
"github.com/pierrec/lz4"
)

Expand Down Expand Up @@ -160,17 +160,14 @@ func (r *Reader) Read(p []byte) (int, error) {
}
}

func NewReader(filename string, compAlgo config.CompAlgo, reverse bool) (*Reader, error) {
func NewReader(filename string, reverse bool) (*Reader, error) {
fd, err := os.Open(filename)
if err != nil {
return nil, err
}

var rdr io.Reader
switch compAlgo {
case config.CompAlgoNone:
rdr = fd
case config.CompAlgoLZ4:
var rdr io.Reader = fd
if strings.HasSuffix(filename, lz4.Extension) {
rdr = lz4.NewReader(fd)
}

Expand Down
1 change: 1 addition & 0 deletions helper/prompb/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated prometheus.LabelMatcher matchers = 3;
prometheus.ReadHints hints = 4;
}

message QueryResult {
Expand Down
5 changes: 5 additions & 0 deletions helper/prompb/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ message LabelMatcher {
string name = 2;
string value = 3;
}

message ReadHints {
int64 step_ms = 1; // Query step size in milliseconds.
string func = 2; // String representation of surrounding function or aggregation.
}
2 changes: 0 additions & 2 deletions uploader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ type Config struct {
Threads int `toml:"threads"`
URL string `toml:"url"`
CacheTTL *config.Duration `toml:"cache-ttl"`
CompAlgo config.CompAlgo
CompLevel int
}

func (cfg *Config) Parse() error {
Expand Down
2 changes: 1 addition & 1 deletion uploader/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewPoints(base *Base, reverse bool) *Points {
}

func (u *Points) upload(ctx context.Context, logger *zap.Logger, filename string) error {
reader, err := RowBinary.NewReader(filename, u.config.CompAlgo, u.reverse)
reader, err := RowBinary.NewReader(filename, u.reverse)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion uploader/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (u *Series) parseFile(filename string, out io.Writer) (map[string]bool, err
var reader *RowBinary.Reader
var err error

reader, err = RowBinary.NewReader(filename, u.config.CompAlgo, u.isReverse)
reader, err = RowBinary.NewReader(filename, u.isReverse)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion uploader/tagged.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (u *Tagged) parseFile(filename string, out io.Writer) (map[string]bool, err
var reader *RowBinary.Reader
var err error

reader, err = RowBinary.NewReader(filename, u.config.CompAlgo, false)
reader, err = RowBinary.NewReader(filename, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion uploader/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewTree(base *Base) *Tree {
}

func (u *Tree) parseFile(filename string, out io.Writer) (map[string]bool, error) {
reader, err := RowBinary.NewReader(filename, u.config.CompAlgo, false)
reader, err := RowBinary.NewReader(filename, false)
if err != nil {
return nil, err
}
Expand Down
11 changes: 9 additions & 2 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func New(in chan *RowBinary.WriteBuffer, path string, autoInterval *config.Chunk
switch compAlgo {
case config.CompAlgoLZ4:
wr.lz4Header = lz4.Header{
Size: 64 * 1024,
BlockMaxSize: 4 << 20,
CompressionLevel: compLevel,
}
}
Expand Down Expand Up @@ -165,7 +165,14 @@ func (w *Writer) worker(ctx context.Context) {
// replace fn in inProgress
w.Lock()
delete(w.inProgress, fn)
fn = path.Join(w.path, fmt.Sprintf("default.%d", time.Now().UnixNano()))

var fileExtension string
switch w.compAlgo {
case config.CompAlgoLZ4:
fileExtension = lz4.Extension
}

fn = path.Join(w.path, fmt.Sprintf("default.%d%s", time.Now().UnixNano(), fileExtension))
w.inProgress[fn] = true
w.Unlock()

Expand Down

0 comments on commit 4850d13

Please sign in to comment.