New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add destination snowflake #6
Conversation
add ConfigureDestination method to the Destination interface add ConfigureDestination method for other destination update README.md to include command to run generator for snowflake destination
generator/null.go
Outdated
@@ -13,3 +13,8 @@ func (n *Null) SendDocument(docs []interface{}) error { | |||
func (n *Null) GetLatestTimestamp() (time.Time, error) { | |||
return time.Now().Add(-10 * time.Millisecond), nil | |||
} | |||
|
|||
func (n *Null) ConfigureDestination() error { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit: remove empty line
generator/rockset.go
Outdated
@@ -117,3 +117,8 @@ func (r *Rockset) GetLatestTimestamp() (time.Time, error) { | |||
// Convert from microseconds to (secs, nanosecs) | |||
return time.Unix(timeMicro/1000000, (timeMicro%1000000)*1000), nil | |||
} | |||
|
|||
func (r *Rockset) ConfigureDestination() error { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -11,4 +11,7 @@ type Destination interface { | |||
|
|||
// Get latest timestamp seen in the destination. | |||
GetLatestTimestamp() (time.Time, error) | |||
|
|||
// ConfigureDestination is used to make any configuration changes to the destination that might be required for sending documents. | |||
ConfigureDestination() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this interface? you only use this function after casting to snowflake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, earlier we had the ConfigureDestination method only for snowflake that's why we were doing the casting but later we decided to add it to the interface to keep things consistent, I have updated main.go to remove the casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, only found nits.
generator/snowflake.go
Outdated
return fmt.Errorf("failed to open a connection with snowflake: %w", err) | ||
} | ||
|
||
defer db.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should log the error from db.Close()
if there is one
generator/snowflake.go
Outdated
_, err = db.Query(createStageQuery) | ||
if err != nil { | ||
return fmt.Errorf("failed to run a query. %v, err: %v", createStageQuery, err) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can drop the else
as the if
returns
generator/snowflake.go
Outdated
_, err = db.Query(createTableQuery) | ||
if err != nil { | ||
return fmt.Errorf("failed to run a query. %v, err: %v", createTableQuery, err) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can drop the else
as the if
returns
generator/snowflake.go
Outdated
_, err = db.Query(createPipeQuery) | ||
if err != nil { | ||
return fmt.Errorf("failed to run a query. %v, err: %v", createPipeQuery, err) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can drop the else
as the if
returns
generator/snowflake.go
Outdated
err := rows.Scan(&created_on, &name, &database_name, &schema_name, &definition, &owner, ¬ification_channel, &comment, &integration, &pattern) | ||
if err != nil { | ||
return fmt.Errorf("failed to scan row to get notification channel info, err: %v", err) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can drop the else
as the if
returns
generator/snowflake.go
Outdated
return time.Time{}, fmt.Errorf("failed to run a query. %v, err: %v", getLatestTimeStampQuery, err) | ||
} | ||
var unixtime interface{} | ||
defer rows.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should log the error from rows.Close()
if there is one
generator/snowflake.go
Outdated
if unixtime != nil { | ||
unixtimeFloat, err := strconv.ParseFloat(unixtime.(string), 64) | ||
if err != nil { | ||
return time.Time{}, fmt.Errorf("could not convert unixtime from string to float64 %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use %w
instead of %v
generator/snowflake.go
Outdated
timeMicro := int64(unixtimeFloat) | ||
// Convert from microseconds to (secs, nanosecs) | ||
return time.Unix(timeMicro/1000000, (timeMicro%1000000)*1000), nil | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove else
generator/snowflake.go
Outdated
|
||
svc := s3.New(sess) | ||
input := &s3.PutBucketNotificationConfigurationInput{ | ||
Bucket: aws.String(r.stageS3BucketName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when you already have a variable, you can just use the address of it:
Bucket: &r.stageS3BucketName,
generator/snowflake.go
Outdated
} | ||
timeMicro := int64(unixtimeFloat) | ||
// Convert from microseconds to (secs, nanosecs) | ||
return time.Unix(timeMicro/1000000, (timeMicro%1000000)*1000), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use 1_000_000
to increase readability
remove else when if returns handle error returned by defer & other error handling improvements call ConfigureDestination using destination interface type without casting to Snowflake type
remove else when if returns
@pmenglund @hicder Thanks a lot for your valuable feedback, I have made the changes as per the suggestions provided. |
add snowflake as a destination for rockbench
add ConfigureDestination method to the Destination interface
add ConfigureDestination method for other destination
update README.md to include the command to run the generator for snowflake destination