Skip to content

Commit

Permalink
Merge pull request #247 from slingdata-io/v1.2.3
Browse files Browse the repository at this point in the history
v1.2.3
  • Loading branch information
flarco committed Apr 5, 2024
2 parents 6b432e9 + b11b396 commit e29c4bb
Show file tree
Hide file tree
Showing 32 changed files with 603 additions and 276 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -54,9 +54,9 @@ Example [Replication](https://docs.slingdata.io/sling-cli/run/configuration/repl
---

Available Connectors:
- **Databases**: [`bigquery`](https://docs.slingdata.io/connections/database-connections/bigquery) [`bigtable`](https://docs.slingdata.io/connections/database-connections/bigtable) [`clickhouse`](https://docs.slingdata.io/connections/database-connections/clickhouse) [`duckdb`](https://docs.slingdata.io/connections/database-connections/duckdb) [`mariadb`](https://docs.slingdata.io/connections/database-connections/mariadb) [`motherduck`](https://docs.slingdata.io/connections/database-connections/motherduck) [`mysql`](https://docs.slingdata.io/connections/database-connections/mysql) [`oracle`](https://docs.slingdata.io/connections/database-connections/oracle) [`postgres`](https://docs.slingdata.io/connections/database-connections/postgres) [`redshift`](https://docs.slingdata.io/connections/database-connections/redshift) [`snowflake`](https://docs.slingdata.io/connections/database-connections/snowflake) [`sqlite`](https://docs.slingdata.io/connections/database-connections/sqlite) [`sqlserver`](https://docs.slingdata.io/connections/database-connections/sqlserver) [`starrocks`](https://docs.slingdata.io/connections/database-connections/starrocks)
- **Databases**: [`bigquery`](https://docs.slingdata.io/connections/database-connections/bigquery) [`bigtable`](https://docs.slingdata.io/connections/database-connections/bigtable) [`clickhouse`](https://docs.slingdata.io/connections/database-connections/clickhouse) [`duckdb`](https://docs.slingdata.io/connections/database-connections/duckdb) [`mariadb`](https://docs.slingdata.io/connections/database-connections/mariadb) [`motherduck`](https://docs.slingdata.io/connections/database-connections/motherduck) [`mysql`](https://docs.slingdata.io/connections/database-connections/mysql) [`oracle`](https://docs.slingdata.io/connections/database-connections/oracle) [`postgres`](https://docs.slingdata.io/connections/database-connections/postgres) [`redshift`](https://docs.slingdata.io/connections/database-connections/redshift) [`snowflake`](https://docs.slingdata.io/connections/database-connections/snowflake) [`sqlite`](https://docs.slingdata.io/connections/database-connections/sqlite) [`sqlserver`](https://docs.slingdata.io/connections/database-connections/sqlserver) [`starrocks`](https://docs.slingdata.io/connections/database-connections/starrocks) [`prometheus`](https://docs.slingdata.io/connections/database-connections/prometheus)
- **File Systems**: [`azure`](https://docs.slingdata.io/connections/file-connections/azure) [`b2`](https://docs.slingdata.io/connections/file-connections/b2) [`dospaces`](https://docs.slingdata.io/connections/file-connections/dospaces) [`gs`](https://docs.slingdata.io/connections/file-connections/gs) [`local`](https://docs.slingdata.io/connections/file-connections/local) [`minio`](https://docs.slingdata.io/connections/file-connections/minio) [`r2`](https://docs.slingdata.io/connections/file-connections/r2) [`s3`](https://docs.slingdata.io/connections/file-connections/s3) [`sftp`](https://docs.slingdata.io/connections/file-connections/sftp) [`wasabi`](https://docs.slingdata.io/connections/file-connections/wasabi)
- **File Formats**: `csv`, `parquet`, `json`, `avro`, `xml`, `sas7bday`
- **File Formats**: `csv`, `parquet`, `xlsx`, `json`, `avro`, `xml`, `sas7bday`

Here are some additional links:
- https://slingdata.io
Expand Down
54 changes: 33 additions & 21 deletions cmd/sling/sling_cli.go
Expand Up @@ -416,7 +416,7 @@ func main() {
go func() {
defer close(done)
exitCode = cliInit()
sentry.Flush(time.Second * 2)
g.SentryFlush(time.Second * 2)
}()

exit := func() {
Expand All @@ -432,7 +432,7 @@ func main() {
exitCode = 111
exit()
case <-interrupt:
go sentry.Flush(time.Second * 4)
go g.SentryFlush(time.Second * 4)
if cliRun.Sc.Used {
env.Println("\ninterrupting...")
interrupted = true
Expand Down Expand Up @@ -524,46 +524,58 @@ func setSentry() {
if telemetry {
g.SentryRelease = g.F("sling@%s", core.Version)
g.SentryDsn = env.SentryDsn
g.SentryConfigureFunc = func(event *sentry.Event, scope *sentry.Scope, exception *g.ErrType) bool {
if exception.Err == "context canceled" {
g.SentryConfigureFunc = func(se *g.SentryEvent) bool {
if se.Exception.Err == "context canceled" {
return false
}

// set transaction
taskMap, _ := g.UnmarshalMap(cast.ToString(env.TelMap["task"]))
sourceType := lo.Ternary(taskMap["source_type"] == nil, "unknown", cast.ToString(taskMap["source_type"]))
targetType := lo.Ternary(taskMap["target_type"] == nil, "unknown", cast.ToString(taskMap["target_type"]))
event.Transaction = g.F("%s - %s", sourceType, targetType)
se.Event.Transaction = g.F("%s - %s", sourceType, targetType)
if g.CliObj.Name == "conns" {
targetType = lo.Ternary(env.TelMap["conn_type"] == nil, "unknown", cast.ToString(env.TelMap["conn_type"]))
event.Transaction = g.F(targetType)
se.Event.Transaction = g.F(targetType)
}

// format telMap
telMap, _ := g.UnmarshalMap(g.Marshal(env.TelMap))
if val, ok := telMap["task"]; ok {
telMap["task"], _ = g.UnmarshalMap(cast.ToString(val))
}
if val, ok := telMap["task_options"]; ok {
telMap["task_options"], _ = g.UnmarshalMap(cast.ToString(val))
}
if val, ok := telMap["task_stats"]; ok {
telMap["task_stats"], _ = g.UnmarshalMap(cast.ToString(val))
}
delete(telMap, "error")
bars := "--------------------------------------------------------"
event.Message = exception.Debug() + "\n\n" + bars + "\n\n" + g.Pretty(env.TelMap)
se.Event.Message = se.Exception.Debug() + "\n\n" + bars + "\n\n" + g.Pretty(telMap)

e := event.Exception[0]
event.Exception[0].Type = e.Stacktrace.Frames[len(e.Stacktrace.Frames)-1].Function
event.Exception[0].Value = g.F("%s [%s]", exception.LastCaller(), exception.CallerStackMD5())
e := se.Event.Exception[0]
se.Event.Exception[0].Type = e.Stacktrace.Frames[len(e.Stacktrace.Frames)-1].Function
se.Event.Exception[0].Value = g.F("%s [%s]", se.Exception.LastCaller(), se.Exception.CallerStackMD5())

scope.SetUser(sentry.User{ID: machineID})
se.Scope.SetUser(sentry.User{ID: machineID})
if g.CliObj.Name == "conns" {
scope.SetTag("run_mode", "conns")
scope.SetTag("target_type", targetType)
se.Scope.SetTag("run_mode", "conns")
se.Scope.SetTag("target_type", targetType)
} else {
scope.SetTag("source_type", sourceType)
scope.SetTag("target_type", targetType)
scope.SetTag("stage", cast.ToString(env.TelMap["stage"]))
scope.SetTag("run_mode", cast.ToString(env.TelMap["run_mode"]))
se.Scope.SetTag("source_type", sourceType)
se.Scope.SetTag("target_type", targetType)
se.Scope.SetTag("stage", cast.ToString(env.TelMap["stage"]))
se.Scope.SetTag("run_mode", cast.ToString(env.TelMap["run_mode"]))
if val := cast.ToString(taskMap["mode"]); val != "" {
scope.SetTag("mode", val)
se.Scope.SetTag("mode", val)
}
if val := cast.ToString(taskMap["type"]); val != "" {
scope.SetTag("type", val)
se.Scope.SetTag("type", val)
}
scope.SetTag("package", getSlingPackage())
se.Scope.SetTag("package", getSlingPackage())
if projectID != "" {
scope.SetTag("project_id", projectID)
se.Scope.SetTag("project_id", projectID)
}
}

Expand Down
23 changes: 23 additions & 0 deletions cmd/sling/sling_logic.go
Expand Up @@ -623,6 +623,7 @@ func processConns(c *g.CliSC) (ok bool, err error) {
fmt.Println(g.PrettyTable(data.GetFields(), data.Rows))
}

totalAffected = cast.ToInt64(len(data.Rows))
} else {
if len(queries) > 1 {
if strings.HasPrefix(query, "file://") {
Expand Down Expand Up @@ -651,6 +652,10 @@ func processConns(c *g.CliSC) (ok bool, err error) {
g.Info("Successful! Duration: %d seconds.", end.Unix()-start.Unix())
}

if err := testRowCnt(totalAffected); err != nil {
return ok, err
}

case "list":
fields, rows := ec.List()
if asJSON {
Expand Down Expand Up @@ -829,6 +834,24 @@ func setProjectID(cfgPath string) {
}
}

func testRowCnt(rowCnt int64) error {
if expected := os.Getenv("SLING_ROW_CNT"); expected != "" {

if strings.HasPrefix(expected, ">") {
atLeast := cast.ToInt64(strings.TrimPrefix(expected, ">"))
if rowCnt <= atLeast {
return g.Error("Expected at least %d rows, got %d", atLeast, rowCnt)
}
return nil
}

if rowCnt != cast.ToInt64(expected) {
return g.Error("Expected %d rows, got %d", cast.ToInt(expected), rowCnt)
}
}
return nil
}

func testStreamCnt(streamCnt int, matchedStreams, inputStreams []string) error {
if expected := os.Getenv("SLING_STREAM_CNT"); expected != "" {

Expand Down
2 changes: 2 additions & 0 deletions cmd/sling/tests/files/binary/test.bytes.csv

Large diffs are not rendered by default.

40 changes: 8 additions & 32 deletions core/dbio/connection/connection.go
Expand Up @@ -359,6 +359,11 @@ func (c *Connection) setURL() (err error) {

// set database
setIfMissing("database", pathValue)

// pass query params
for k, v := range U.Query() {
setIfMissing(k, v)
}
}
if g.In(c.Type, dbio.TypeFileSftp, dbio.TypeFileFtp) {
setIfMissing("user", U.Username())
Expand Down Expand Up @@ -531,6 +536,7 @@ func (c *Connection) setURL() (err error) {
setIfMissing("username", c.Data["user"])
setIfMissing("password", "")
setIfMissing("port", c.Type.DefPort())
setIfMissing("app_name", "sling")

template = "sqlserver://{username}:{password}@{host}:{port}"
if _, ok := c.Data["instance"]; ok {
Expand All @@ -540,28 +546,6 @@ func (c *Connection) setURL() (err error) {
if _, ok := c.Data["database"]; ok {
template = template + "&database={database}"
}
if _, ok := c.Data["encrypt"]; ok {
// disable, false, true
template = template + "&encrypt={encrypt}"
}
if _, ok := c.Data["TrustServerCertificate"]; ok {
// false, true
template = template + "&TrustServerCertificate={TrustServerCertificate}"
}
if _, ok := c.Data["hostNameInCertificate"]; ok {
template = template + "&hostNameInCertificate={hostNameInCertificate}"
}
if _, ok := c.Data["certificate"]; ok {
template = template + "&certificate={certificate}"
}
if _, ok := c.Data["user id"]; ok {
template = template + "&user id={user id}"
}
if _, ok := c.Data["app name"]; ok {
template = template + "&app name={app name}"
} else {
template = template + "&app name=sling"
}

case dbio.TypeDbTrino:
setIfMissing("username", c.Data["user"])
Expand Down Expand Up @@ -883,16 +867,8 @@ func (i *Info) IsURL() bool {

// SchemeType returns the correct scheme of the url
func SchemeType(url string) dbio.Type {
if !strings.Contains(url, "://") {
return dbio.TypeUnknown
}

if strings.HasPrefix(url, "file://") {
return dbio.TypeFileLocal
}

if strings.HasPrefix(url, "https") && strings.Contains(url, ".core.windows.") {
return dbio.TypeFileAzure
if t, _, _, err := dbio.ParseURL(url); err == nil {
return t
}

scheme := strings.Split(url, "://")[0]
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/connection/connection_local.go
Expand Up @@ -318,7 +318,7 @@ func (ec *EnvConns) testDiscover(name string, opt *DiscoverOptions) (ok bool, no

conn, ok1 := ec.GetConnEntry(name)
if !ok1 || name == "" {
return ok, nodes, schemata, g.Error("Invalid Connection name: %s", name)
return ok, nodes, schemata, g.Error("Invalid Connection name: %s. Make sure it is created. See https://docs.slingdata.io/sling-cli/environment", name)
}

switch {
Expand Down
3 changes: 2 additions & 1 deletion core/dbio/database/database.go
Expand Up @@ -23,11 +23,12 @@ import (
slingEnv "github.com/slingdata-io/sling-cli/core/env"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/denisenkom/go-mssqldb"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
_ "github.com/microsoft/go-mssqldb"
_ "github.com/microsoft/go-mssqldb/integratedauth/krb5"
"github.com/slingdata-io/sling-cli/core/dbio/iop"
_ "github.com/snowflakedb/gosnowflake"

Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_duckdb.go
Expand Up @@ -784,7 +784,7 @@ func (conn *DuckDbConn) BulkImportStream(tableFName string, ds *iop.Datastream)
})

sqlLines := []string{
g.F(`insert into %s (%s) select * from read_csv('%s', delim=',', header=True, columns=%s, max_line_size=134217728, parallel=false);`, table.FDQN(), strings.Join(columnNames, ", "), csvPath, conn.generateCsvColumns(ds.Columns)),
g.F(`insert into %s (%s) select * from read_csv('%s', delim=',', header=True, columns=%s, max_line_size=134217728, parallel=false, quote='"', escape='"');`, table.FDQN(), strings.Join(columnNames, ", "), csvPath, conn.generateCsvColumns(ds.Columns)),
}

var out []byte
Expand Down
59 changes: 55 additions & 4 deletions core/dbio/database/database_mongo.go
Expand Up @@ -109,7 +109,7 @@ func (conn *MongoDBConn) GetTableColumns(table *Table, fields ...string) (column
return nil, g.Error("did not find collection %s", table.FullName())
}

ds, err := conn.StreamRows(table.FullName(), g.M("limit", 10))
ds, err := conn.StreamRows(table.FullName(), g.M("limit", 10, "silent", true))
if err != nil {
return columns, g.Error("could not query to get columns")
}
Expand All @@ -119,6 +119,12 @@ func (conn *MongoDBConn) GetTableColumns(table *Table, fields ...string) (column
return columns, g.Error("could not collect to get columns")
}

for i := range data.Columns {
data.Columns[i].Schema = table.Schema
data.Columns[i].Table = table.Name
data.Columns[i].DbType = "-"
}

return data.Columns, nil
}

Expand All @@ -131,7 +137,8 @@ func (conn *MongoDBConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err
return
}

ds, err := conn.StreamRowsContext(conn.Context().Ctx, tables[0].FullName())
options, _ := g.UnmarshalMap(tables[0].SQL)
ds, err := conn.StreamRowsContext(conn.Context().Ctx, tables[0].FullName(), options)
if err != nil {
return df, g.Error(err, "could start datastream")
}
Expand All @@ -151,6 +158,32 @@ func (conn *MongoDBConn) StreamRowsContext(ctx context.Context, collectionName s
Limit = val
}

findOpts := &options.FindOptions{Limit: &Limit}
fields := cast.ToStringSlice(opts["fields"])
if len(fields) > 0 {
d := bson.D{}
for _, field := range fields {
d = append(d, bson.D{{Key: field, Value: 1}}...)
}
findOpts = options.Find().SetProjection(d)
}

updateKey := cast.ToString(opts["update_key"])
incrementalValue := cast.ToString(opts["value"])
startValue := cast.ToString(opts["start_value"])
endValue := cast.ToString(opts["end_value"])

filter := bson.D{}
if updateKey != "" && incrementalValue != "" {
// incremental mode
incrementalValue = strings.Trim(incrementalValue, "'")
filter = append(filter, bson.D{{Key: updateKey, Value: bson.D{{Key: "$gt", Value: incrementalValue}}}}...)
} else if updateKey != "" && startValue != "" && endValue != "" {
// backfill mode
filter = append(filter, bson.D{{Key: updateKey, Value: bson.D{{Key: "$gte", Value: startValue}}}}...)
filter = append(filter, bson.D{{Key: updateKey, Value: bson.D{{Key: "$lte", Value: endValue}}}}...)
}

if strings.TrimSpace(collectionName) == "" {
g.Warn("Empty collection name")
return ds, nil
Expand All @@ -162,7 +195,11 @@ func (conn *MongoDBConn) StreamRowsContext(ctx context.Context, collectionName s

collection := conn.Client.Database(table.Schema).Collection(table.Name)

cur, err := collection.Find(queryContext.Ctx, bson.D{}, &options.FindOptions{Limit: &Limit})
if !cast.ToBool(opts["silent"]) {
conn.LogSQL(g.Marshal(g.M("database", table.Schema, "collection", table.Name, "filter", filter, "options", g.M("limit", findOpts.Limit, "projection", findOpts.Projection))))
}

cur, err := collection.Find(queryContext.Ctx, filter, findOpts)
if err != nil {
return ds, g.Error(err, "error querying collection")
}
Expand Down Expand Up @@ -244,7 +281,7 @@ func (conn *MongoDBConn) GetTables(schema string) (data iop.Dataset, err error)

// GetSchemata obtain full schemata info for a schema and/or table in current database
func (conn *MongoDBConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error) {
currDatabase := conn.Type.String()
currDatabase := dbio.TypeDbMongoDB.String()
schemata := Schemata{
Databases: map[string]Database{},
conn: conn,
Expand Down Expand Up @@ -302,6 +339,20 @@ func (conn *MongoDBConn) GetSchemata(schemaName string, tableNames ...string) (S
}

table.Columns = append(table.Columns, column)

if g.In(tableName, tableNames...) {
columns, err := conn.GetSQLColumns(table)
if err != nil {
return schemata, g.Error(err, "could not get columns")
}
for i := range columns {
columns[i].Database = table.Database
}
if len(columns) > 0 {
table.Columns = columns
}
}

schema.Tables[strings.ToLower(tableName)] = table
schemas[strings.ToLower(schema.Name)] = schema
}
Expand Down
2 changes: 2 additions & 0 deletions core/dbio/database/database_postgres.go
Expand Up @@ -228,6 +228,8 @@ func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Colu
selectStr = g.F("%s::%s as %s", qName, tgtCol.DbType, qName)
case srcCol.IsString() && strings.ToLower(tgtCol.DbType) == "uuid":
selectStr = g.F("%s::%s as %s", qName, tgtCol.DbType, qName)
case srcCol.IsBool() && tgtCol.IsInteger():
selectStr = g.F("%s::int as %s", qName, qName)
default:
selectStr = qName
}
Expand Down

0 comments on commit e29c4bb

Please sign in to comment.