Skip to content

Commit

Permalink
refactor transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Mar 28, 2024
1 parent 4e36af5 commit 05a82e5
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 86 deletions.
63 changes: 63 additions & 0 deletions core/dbio/iop/datastream.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/parquet-go/parquet-go/compress"
"github.com/segmentio/ksuid"
"github.com/slingdata-io/sling-cli/core/dbio/env"
"golang.org/x/text/transform"

"github.com/samber/lo"
"github.com/spf13/cast"
Expand Down Expand Up @@ -385,6 +386,63 @@ func (ds *Datastream) SetIterator(it *Iterator) {
ds.it = it
}

func (ds *Datastream) decodeReader(reader io.Reader) (newReader io.Reader, decoded bool) {
// decode File if requested
if transformsPayload, ok := ds.Sp.Config.Map["transforms"]; ok {
columnTransforms := makeColumnTransforms(transformsPayload)
applied := []Transform{}

if ts, ok := columnTransforms["*"]; ok {
remove := func(t Transform) {
ts = lo.Filter(ts, func(tName Transform, i int) bool {
return tName != t
})
}

for _, t := range ts {
switch t {
case TransformReplaceAccents:
newReader = transform.NewReader(reader, ds.Sp.transformers.Accent)
case TransformDecodeLatin1:
newReader = transform.NewReader(reader, ds.Sp.transformers.ISO8859_1)
case TransformDecodeLatin5:
newReader = transform.NewReader(reader, ds.Sp.transformers.ISO8859_5)
case TransformDecodeLatin9:
newReader = transform.NewReader(reader, ds.Sp.transformers.ISO8859_15)
case TransformDecodeWindows1250:
newReader = transform.NewReader(reader, ds.Sp.transformers.Windows1250)
case TransformDecodeWindows1252:
newReader = transform.NewReader(reader, ds.Sp.transformers.Windows1252)
case TransformDecodeUtf16:
newReader = transform.NewReader(reader, ds.Sp.transformers.UTF16)
case TransformDecodeUtf8:
newReader = transform.NewReader(reader, ds.Sp.transformers.UTF8)
case TransformDecodeUtf8Bom:
newReader = transform.NewReader(reader, ds.Sp.transformers.UTF8BOM)
default:
continue
}
applied = append(applied, t) // delete from transforms, already applied
}

for _, t := range applied {
remove(t)
}

columnTransforms["*"] = ts
}

if len(applied) > 0 {
// re-apply transforms
ds.Sp.applyTransforms(g.Marshal(columnTransforms))

return newReader, true
}
}

return reader, false
}

// SetFields sets the fields/columns of the Datastream
func (ds *Datastream) SetFields(fields []string) {
if ds.Columns == nil || len(ds.Columns) != len(fields) {
Expand Down Expand Up @@ -784,6 +842,11 @@ func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error) {
func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error) {
c := CSV{Reader: reader, NoHeader: !ds.config.Header, FieldsPerRecord: ds.config.FieldsPerRec}

// decode File if requested by transform
if newReader, ok := ds.decodeReader(reader); ok {
c.Reader = newReader
}

r, err := c.getReader(ds.config.Delimiter)
if err != nil {
err = g.Error(err, "could not get reader")
Expand Down
103 changes: 79 additions & 24 deletions core/dbio/iop/stream_processor.go
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/flarco/g"
"github.com/spf13/cast"
"golang.org/x/text/encoding/charmap"
encUnicode "golang.org/x/text/encoding/unicode"
"golang.org/x/text/runes"
"golang.org/x/text/transform"
Expand Down Expand Up @@ -56,22 +57,34 @@ type StreamConfig struct {
Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is!
transforms map[string][]TransformFunc // array of transform functions to apply
maxDecimalsFormat string `json:"-"`

Map map[string]string `json:"-"`
}

type Transformers struct {
Accent transform.Transformer
UTF8 transform.Transformer
UTF8BOM transform.Transformer
UTF16 transform.Transformer
Accent transform.Transformer
UTF8 transform.Transformer
UTF8BOM transform.Transformer
UTF16 transform.Transformer
ISO8859_1 transform.Transformer
ISO8859_5 transform.Transformer
ISO8859_15 transform.Transformer
Windows1250 transform.Transformer
Windows1252 transform.Transformer
}

func NewTransformers() Transformers {

win16be := encUnicode.UTF16(encUnicode.BigEndian, encUnicode.IgnoreBOM)
return Transformers{
Accent: transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC),
UTF8: encUnicode.UTF8.NewDecoder(),
UTF8BOM: encUnicode.UTF8BOM.NewDecoder(),
UTF16: encUnicode.UTF16(encUnicode.LittleEndian, encUnicode.UseBOM).NewDecoder(),
Accent: transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC),
UTF8: encUnicode.UTF8.NewDecoder(),
UTF8BOM: encUnicode.UTF8BOM.NewDecoder(),
UTF16: encUnicode.BOMOverride(win16be.NewDecoder()),
ISO8859_1: charmap.ISO8859_1.NewDecoder(),
ISO8859_5: charmap.ISO8859_5.NewDecoder(),
ISO8859_15: charmap.ISO8859_15.NewDecoder(),
Windows1250: charmap.Windows1250.NewDecoder(),
Windows1252: charmap.Windows1252.NewDecoder(),
}
}

Expand Down Expand Up @@ -192,6 +205,8 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {
sp = NewStreamProcessor()
}

sp.Config.Map = configMap

if configMap["fields_per_rec"] != "" {
sp.Config.FieldsPerRec = cast.ToInt(configMap["fields_per_rec"])
}
Expand Down Expand Up @@ -246,21 +261,7 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {
g.Unmarshal(configMap["columns"], &sp.Config.Columns)
}
if configMap["transforms"] != "" {
columnTransforms := map[string][]string{}
g.Unmarshal(configMap["transforms"], &columnTransforms)
sp.Config.transforms = map[string][]TransformFunc{}
for key, names := range columnTransforms {
key = strings.ToLower(key)
sp.Config.transforms[key] = []TransformFunc{}
for _, name := range names {
f, ok := Transforms[name]
if ok {
sp.Config.transforms[key] = append(sp.Config.transforms[key], f)
} else {
g.Warn("did find find transform named: '%s'", name)
}
}
}
sp.applyTransforms(configMap["transforms"])
}
sp.Config.Compression = configMap["compression"]

Expand All @@ -273,6 +274,28 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {
}
}

func makeColumnTransforms(transformsPayload string) map[string][]Transform {
columnTransforms := map[string][]Transform{}
g.Unmarshal(transformsPayload, &columnTransforms)
return columnTransforms
}

func (sp *StreamProcessor) applyTransforms(transformsPayload string) {
columnTransforms := makeColumnTransforms(transformsPayload)
sp.Config.transforms = map[string][]TransformFunc{}
for key, names := range columnTransforms {
sp.Config.transforms[key] = []TransformFunc{}
for _, name := range names {
f, ok := Transforms[name]
if ok {
sp.Config.transforms[key] = append(sp.Config.transforms[key], f)
} else {
g.Warn("did find find transform named: '%s'", name)
}
}
}
}

// CastVal casts the type of an interface based on its value
// From html/template/content.go
// Copyright 2011 The Go Authors. All rights reserved.
Expand Down Expand Up @@ -396,6 +419,38 @@ func (sp *StreamProcessor) commitChecksum() {
}
}

func (sp *StreamProcessor) DecodeValue(t Transform, val string) (newVal string, err error) {

switch t {
case TransformReplaceAccents:
newVal, _, err = transform.String(sp.transformers.Accent, val)
case TransformDecodeLatin1:
newVal, _, err = transform.String(sp.transformers.ISO8859_1, val)
case TransformDecodeLatin5:
newVal, _, err = transform.String(sp.transformers.ISO8859_5, val)
case TransformDecodeLatin9:
newVal, _, err = transform.String(sp.transformers.ISO8859_15, val)
case TransformDecodeWindows1250:
newVal, _, err = transform.String(sp.transformers.Windows1250, val)
case TransformDecodeWindows1252:
newVal, _, err = transform.String(sp.transformers.Windows1252, val)
case TransformDecodeUtf16:
newVal, _, err = transform.String(sp.transformers.UTF16, val)
case TransformDecodeUtf8:
newVal, _, err = transform.String(sp.transformers.UTF8, val)
case TransformDecodeUtf8Bom:
newVal, _, err = transform.String(sp.transformers.UTF8BOM, val)
default:
return val, nil
}

if err != nil {
return val, err
}

return newVal, nil
}

// CastVal casts values with stats collection
// which degrades performance by ~10%
// go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'
Expand Down
60 changes: 25 additions & 35 deletions core/dbio/iop/transforms.go
Expand Up @@ -3,52 +3,42 @@ package iop
import (
"bufio"
"embed"
"errors"
"regexp"
"strings"
"unicode"

"github.com/flarco/g"
"github.com/spf13/cast"
"golang.org/x/text/transform"
)

//go:embed templates/*
var templatesFolder embed.FS

var Transforms = map[string]TransformFunc{}

func ReplaceAccents(sp *StreamProcessor, val string) (string, error) {
newVal, _, err := transform.String(sp.transformers.Accent, val)
if err != nil {
return val, errors.New("could not transform while running ReplaceAccents: " + err.Error())
}
return newVal, nil
}

func DecodeUTF16(sp *StreamProcessor, val string) (string, error) {
newVal, _, err := transform.String(sp.transformers.UTF16, val)
if err != nil {
return val, errors.New("could not transform while running DecodeUTF16: " + err.Error())
}
return newVal, nil
}

func DecodeUTF8(sp *StreamProcessor, val string) (string, error) {
newVal, _, err := transform.String(sp.transformers.UTF8, val)
if err != nil {
return val, errors.New("could not transform while running DecodeUTF8: " + err.Error())
}
return newVal, nil
}

func DecodeUTF8BOM(sp *StreamProcessor, val string) (string, error) {
newVal, _, err := transform.String(sp.transformers.UTF8BOM, val)
if err != nil {
return val, errors.New("could not transform while running DecodeUTF8BOM: " + err.Error())
}
return newVal, nil
}
var Transforms = map[Transform]TransformFunc{}

type Transform string

const (
TransformDecodeLatin1 Transform = "decode_latin1"
TransformDecodeLatin5 Transform = "decode_latin5"
TransformDecodeLatin9 Transform = "decode_latin9"
TransformDecodeUtf8 Transform = "decode_utf8"
TransformDecodeUtf8Bom Transform = "decode_utf8_bom"
TransformDecodeUtf16 Transform = "decode_utf16"
TransformDecodeWindows1250 Transform = "decode_windows1250"
TransformDecodeWindows1252 Transform = "decode_windows1252"
TransformDuckdbListToText Transform = "duckdb_list_to_text"
TransformHashMd5 Transform = "hash_md5"
TransformHashSha256 Transform = "hash_sha256"
TransformHashSha512 Transform = "hash_sha512"
TransformParseBit Transform = "parse_bit"
TransformParseFix Transform = "parse_fix"
TransformParseUuid Transform = "parse_uuid"
TransformReplace0x00 Transform = "replace_0x00"
TransformReplaceAccents Transform = "replace_accents"
TransformReplaceNonPrintable Transform = "replace_non_printable"
TransformTrimSpace Transform = "trim_space"
)

// https://stackoverflow.com/a/46637343/2295355
// https://web.itu.edu.tr/sgunduz/courses/mikroisl/ascii.html
Expand Down
65 changes: 38 additions & 27 deletions core/sling/transforms.go
Expand Up @@ -10,35 +10,46 @@ import (
"github.com/google/uuid"
"github.com/slingdata-io/sling-cli/core/dbio/iop"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
)

var decISO8859_1 = charmap.ISO8859_1.NewDecoder()
var decISO8859_5 = charmap.ISO8859_5.NewDecoder()
var decISO8859_15 = charmap.ISO8859_15.NewDecoder()
var decWindows1250 = charmap.Windows1250.NewDecoder()
var decWindows1252 = charmap.Windows1252.NewDecoder()

var transforms = map[string]iop.TransformFunc{
"decode_latin1": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_1, val) },
"decode_latin5": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_5, val) },
"decode_latin9": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_15, val) },
"decode_utf8": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.DecodeUTF8(sp, val) },
"decode_utf8_bom": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.DecodeUTF8BOM(sp, val) },
"decode_utf16": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.DecodeUTF16(sp, val) },
"decode_windows1250": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1250, val) },
"decode_windows1252": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1252, val) },
"duckdb_list_to_text": func(sp *iop.StreamProcessor, val string) (string, error) { return duckDbListAsText(val), nil },
"hash_md5": func(sp *iop.StreamProcessor, val string) (string, error) { return g.MD5(val), nil },
"hash_sha256": func(sp *iop.StreamProcessor, val string) (string, error) { return SHA256(val), nil },
"hash_sha512": func(sp *iop.StreamProcessor, val string) (string, error) { return SHA512(val), nil },
"parse_bit": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseBit(sp, val) },
"parse_fix": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseFIX(sp, val) },
"parse_uuid": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseUUID(sp, val) },
"replace_0x00": func(sp *iop.StreamProcessor, val string) (string, error) { return Replace0x00(sp, val) },
"replace_accents": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.ReplaceAccents(sp, val) },
"replace_non_printable": func(sp *iop.StreamProcessor, val string) (string, error) { return ReplaceNonPrint(sp, val) },
"trim_space": func(sp *iop.StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil },
var transforms = map[iop.Transform]iop.TransformFunc{
iop.TransformDecodeLatin1: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeLatin1, val)
},
iop.TransformDecodeLatin5: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeLatin5, val)
},
iop.TransformDecodeLatin9: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeLatin9, val)
},
iop.TransformDecodeUtf8: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeUtf8, val)
},
iop.TransformDecodeUtf8Bom: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeUtf8Bom, val)
},
iop.TransformDecodeUtf16: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeUtf16, val)
},
iop.TransformDecodeWindows1250: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeWindows1250, val)
},
iop.TransformDecodeWindows1252: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformDecodeWindows1252, val)
},
iop.TransformDuckdbListToText: func(sp *iop.StreamProcessor, val string) (string, error) { return duckDbListAsText(val), nil },
iop.TransformHashMd5: func(sp *iop.StreamProcessor, val string) (string, error) { return g.MD5(val), nil },
iop.TransformHashSha256: func(sp *iop.StreamProcessor, val string) (string, error) { return SHA256(val), nil },
iop.TransformHashSha512: func(sp *iop.StreamProcessor, val string) (string, error) { return SHA512(val), nil },
iop.TransformParseBit: func(sp *iop.StreamProcessor, val string) (string, error) { return ParseBit(sp, val) },
iop.TransformParseFix: func(sp *iop.StreamProcessor, val string) (string, error) { return ParseFIX(sp, val) },
iop.TransformParseUuid: func(sp *iop.StreamProcessor, val string) (string, error) { return ParseUUID(sp, val) },
iop.TransformReplace0x00: func(sp *iop.StreamProcessor, val string) (string, error) { return Replace0x00(sp, val) },
iop.TransformReplaceAccents: func(sp *iop.StreamProcessor, val string) (string, error) {
return sp.DecodeValue(iop.TransformReplaceAccents, val)
},
iop.TransformReplaceNonPrintable: func(sp *iop.StreamProcessor, val string) (string, error) { return ReplaceNonPrint(sp, val) },
iop.TransformTrimSpace: func(sp *iop.StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil },
}

func init() {
Expand Down

0 comments on commit 05a82e5

Please sign in to comment.