From fd365a04c3ff7e44e46874e567d20aab67170d79 Mon Sep 17 00:00:00 2001 From: Dawid Heyman Date: Wed, 7 Jun 2023 11:02:01 +0200 Subject: [PATCH] SNOW-830268: Add PUT/GET feature example (#814) --- cmd/filestransfer/.gitignore | 1 + cmd/filestransfer/Makefile | 16 +++ cmd/filestransfer/filestransfer.go | 181 +++++++++++++++++++++++++++ cmd/multistatement/multistatement.go | 2 +- 4 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 cmd/filestransfer/.gitignore create mode 100644 cmd/filestransfer/Makefile create mode 100644 cmd/filestransfer/filestransfer.go diff --git a/cmd/filestransfer/.gitignore b/cmd/filestransfer/.gitignore new file mode 100644 index 000000000..8155a94cc --- /dev/null +++ b/cmd/filestransfer/.gitignore @@ -0,0 +1 @@ +filestransfer \ No newline at end of file diff --git a/cmd/filestransfer/Makefile b/cmd/filestransfer/Makefile new file mode 100644 index 000000000..0e2a7c92b --- /dev/null +++ b/cmd/filestransfer/Makefile @@ -0,0 +1,16 @@ +include ../../gosnowflake.mak +CMD_TARGET=filestransfer + +## Install +install: cinstall + +## Run +run: crun + +## Lint +lint: clint + +## Format source codes +fmt: cfmt + +.PHONY: install run lint fmt \ No newline at end of file diff --git a/cmd/filestransfer/filestransfer.go b/cmd/filestransfer/filestransfer.go new file mode 100644 index 000000000..16246763c --- /dev/null +++ b/cmd/filestransfer/filestransfer.go @@ -0,0 +1,181 @@ +// Example: Files transfer using PUT/GET commands +// +// This example shows how to transfer files to staging area, from which data can be loaded into snowflake +// database tables. Apart from sending files to staging area using PUT command, files can also be downloaded +// using GET command. +package main + +import ( + "bytes" + "compress/gzip" + "database/sql" + "flag" + "fmt" + "log" + "os" + "strconv" + + sf "github.com/snowflakedb/gosnowflake" +) + +const customFormatCsvDataToUpload = "NUM; TEXT\n1; foo\n2; bar\n3; baz" + +func getDSN() (string, *sf.Config, error) { + env := func(key string, failOnMissing bool) string { + if value := os.Getenv(key); value != "" { + return value + } + if failOnMissing { + log.Fatalf("%v environment variable not set", key) + } + return "" + } + account := env("SNOWFLAKE_TEST_ACCOUNT", true) + user := env("SNOWFLAKE_TEST_USER", true) + password := env("SNOWFLAKE_TEST_PASSWORD", true) + database := env("SNOWFLAKE_TEST_DATABASE", true) + schema := env("SNOWFLAKE_TEST_SCHEMA", true) + warehouse := env("SNOWFLAKE_TEST_WAREHOUSE", true) + host := env("SNOWFLAKE_TEST_HOST", false) + portStr := env("SNOWFLAKE_TEST_PORT", false) + protocol := env("SNOWFLAKE_TEST_PROTOCOL", false) + + port := 443 + var err error + if len(portStr) > 0 { + port, err = strconv.Atoi(portStr) + if err != nil { + return "", nil, err + } + } + + cfg := &sf.Config{ + Account: account, + User: user, + Password: password, + Database: database, + Warehouse: warehouse, + Schema: schema, + Host: host, + Port: port, + Protocol: protocol, + } + + dsn, err := sf.DSN(cfg) + if err != nil { + return "", nil, err + } + + return dsn, cfg, nil +} + +func createTmpFile(content string) string { + tempFile, err := os.CreateTemp("", "data_to_upload.csv") + if err != nil { + log.Fatalf("error during creating temp file; err: %v", err) + } + _, err = tempFile.Write([]byte(content)) + if err != nil { + log.Fatalf("error during writing data to temp file; err: %v", err) + } + absolutePath := tempFile.Name() + fmt.Printf("Tmp file with data to upload created at %v with content %#v\n", absolutePath, customFormatCsvDataToUpload) + return absolutePath +} + +func decompressAndRead(file *os.File) (string, error) { + gzipReader, err := gzip.NewReader(file) + defer gzipReader.Close() + if err != nil { + return "", err + } + var b bytes.Buffer + _, err = b.ReadFrom(gzipReader) + if err != nil { + return "", err + } + return b.String(), nil +} + +func printRows(rows *sql.Rows) { + for i := 1; rows.Next(); i++ { + var col1 int + var col2 string + if err := rows.Scan(&col1, &col2); err != nil { + log.Fatalf("error while scaning rows; err: %v", err) + } + fmt.Printf("Row %v: %v, %v\n", i, col1, col2) + } +} + +func main() { + if !flag.Parsed() { + flag.Parse() + } + + //Opening connection + dsn, cfg, err := getDSN() + if err != nil { + log.Fatalf("error while creating DSN from config: %v, error: %v", cfg, err) + } + db, err := sql.Open("snowflake", dsn) + defer db.Close() + + //Creating table to which the data from CSV file will be copied + _, err = db.Exec("CREATE OR REPLACE TABLE GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE(num integer, text varchar);") + if err != nil { + log.Fatalf("error while creating table; err: %v", err) + } + defer db.Exec("DROP TABLE IF EXISTS GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE;") + + //Uploading data_to_upload.csv to internal stage for table GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE + tmpFilePath := createTmpFile(customFormatCsvDataToUpload) + defer os.Remove(tmpFilePath) + _, err = db.Exec(fmt.Sprintf("PUT file://%v @%%GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE;", tmpFilePath)) + if err != nil { + log.Fatalf("error while uploading file; err: %v", err) + } + fmt.Println("data_do_upload.csv successfully uploaded to internal stage.") + + //Creating custom file format that describes our data + _, err = db.Exec("CREATE OR REPLACE TEMPORARY FILE FORMAT CUSTOM_CSV_FORMAT" + + " TYPE = CSV COMPRESSION = GZIP FIELD_DELIMITER = ';' FILE_EXTENSION = 'csv' SKIP_HEADER = 1;") + if err != nil { + log.Fatalf("error while creating file format; err: %v", err) + } + fmt.Println("Custom CSV format successfully created.") + + //Loading data from files in stage area into table + _, err = db.Exec("COPY INTO GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE FILE_FORMAT = CUSTOM_CSV_FORMAT;") + if err != nil { + log.Fatalf("error while copying data into table; err: %v", err) + } + fmt.Println("Data successfully loaded into table. Querying...") + + //Querying loaded data from table + rows, err := db.Query("SELECT * FROM GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE;") + if err != nil { + log.Fatalf("error while querying data from table; err: %v", err) + } + defer rows.Close() + printRows(rows) + + //Downloading file from stage area to system's TMP directory + tmpDir := os.TempDir() + _, err = db.Exec(fmt.Sprintf("GET @%%GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE/data_to_upload.csv file://%v/;", tmpDir)) + if err != nil { + log.Fatalf("error while downloading data from internal stage area; err: %v", err) + } + fmt.Printf("File successfully downloaded from internal stage area to %v\n", tmpDir) + + //Reading from downloaded file + file, err := os.Open(fmt.Sprintf("%v/data_to_upload.csv.gz", tmpDir)) + if err != nil { + log.Fatalf("error while opening downloaded file; err: %v", err) + } + content, err := decompressAndRead(file) + if err != nil { + log.Fatalf("error while reading file; err: %v", err) + } + fmt.Printf("Downloaded file content: %#v\n", content) +} diff --git a/cmd/multistatement/multistatement.go b/cmd/multistatement/multistatement.go index b17bc376c..9b3477d53 100644 --- a/cmd/multistatement/multistatement.go +++ b/cmd/multistatement/multistatement.go @@ -19,7 +19,7 @@ func getDSN() (string, *sf.Config, error) { return value } if failOnMissing { - log.Fatalf("#{key} environment not set") + log.Fatalf("%v environment not set", key) } return "" }