Skip to content

Commit

Permalink
SNOW-830268: Add PUT/GET feature example (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dheyman-1 committed Jun 7, 2023
1 parent 2f493c8 commit fd365a0
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 1 deletion.
1 change: 1 addition & 0 deletions cmd/filestransfer/.gitignore
@@ -0,0 +1 @@
filestransfer
16 changes: 16 additions & 0 deletions cmd/filestransfer/Makefile
@@ -0,0 +1,16 @@
include ../../gosnowflake.mak
CMD_TARGET=filestransfer

## Install
install: cinstall

## Run
run: crun

## Lint
lint: clint

## Format source codes
fmt: cfmt

.PHONY: install run lint fmt
181 changes: 181 additions & 0 deletions cmd/filestransfer/filestransfer.go
@@ -0,0 +1,181 @@
// Example: Files transfer using PUT/GET commands
//
// This example shows how to transfer files to staging area, from which data can be loaded into snowflake
// database tables. Apart from sending files to staging area using PUT command, files can also be downloaded
// using GET command.
package main

import (
"bytes"
"compress/gzip"
"database/sql"
"flag"
"fmt"
"log"
"os"
"strconv"

sf "github.com/snowflakedb/gosnowflake"
)

const customFormatCsvDataToUpload = "NUM; TEXT\n1; foo\n2; bar\n3; baz"

func getDSN() (string, *sf.Config, error) {
env := func(key string, failOnMissing bool) string {
if value := os.Getenv(key); value != "" {
return value
}
if failOnMissing {
log.Fatalf("%v environment variable not set", key)
}
return ""
}
account := env("SNOWFLAKE_TEST_ACCOUNT", true)
user := env("SNOWFLAKE_TEST_USER", true)
password := env("SNOWFLAKE_TEST_PASSWORD", true)
database := env("SNOWFLAKE_TEST_DATABASE", true)
schema := env("SNOWFLAKE_TEST_SCHEMA", true)
warehouse := env("SNOWFLAKE_TEST_WAREHOUSE", true)
host := env("SNOWFLAKE_TEST_HOST", false)
portStr := env("SNOWFLAKE_TEST_PORT", false)
protocol := env("SNOWFLAKE_TEST_PROTOCOL", false)

port := 443
var err error
if len(portStr) > 0 {
port, err = strconv.Atoi(portStr)
if err != nil {
return "", nil, err
}
}

cfg := &sf.Config{
Account: account,
User: user,
Password: password,
Database: database,
Warehouse: warehouse,
Schema: schema,
Host: host,
Port: port,
Protocol: protocol,
}

dsn, err := sf.DSN(cfg)
if err != nil {
return "", nil, err
}

return dsn, cfg, nil
}

func createTmpFile(content string) string {
tempFile, err := os.CreateTemp("", "data_to_upload.csv")
if err != nil {
log.Fatalf("error during creating temp file; err: %v", err)
}
_, err = tempFile.Write([]byte(content))
if err != nil {
log.Fatalf("error during writing data to temp file; err: %v", err)
}
absolutePath := tempFile.Name()
fmt.Printf("Tmp file with data to upload created at %v with content %#v\n", absolutePath, customFormatCsvDataToUpload)
return absolutePath
}

func decompressAndRead(file *os.File) (string, error) {
gzipReader, err := gzip.NewReader(file)
defer gzipReader.Close()
if err != nil {
return "", err
}
var b bytes.Buffer
_, err = b.ReadFrom(gzipReader)
if err != nil {
return "", err
}
return b.String(), nil
}

func printRows(rows *sql.Rows) {
for i := 1; rows.Next(); i++ {
var col1 int
var col2 string
if err := rows.Scan(&col1, &col2); err != nil {
log.Fatalf("error while scaning rows; err: %v", err)
}
fmt.Printf("Row %v: %v, %v\n", i, col1, col2)
}
}

func main() {
if !flag.Parsed() {
flag.Parse()
}

//Opening connection
dsn, cfg, err := getDSN()
if err != nil {
log.Fatalf("error while creating DSN from config: %v, error: %v", cfg, err)
}
db, err := sql.Open("snowflake", dsn)
defer db.Close()

//Creating table to which the data from CSV file will be copied
_, err = db.Exec("CREATE OR REPLACE TABLE GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE(num integer, text varchar);")
if err != nil {
log.Fatalf("error while creating table; err: %v", err)
}
defer db.Exec("DROP TABLE IF EXISTS GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE;")

//Uploading data_to_upload.csv to internal stage for table GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE
tmpFilePath := createTmpFile(customFormatCsvDataToUpload)
defer os.Remove(tmpFilePath)
_, err = db.Exec(fmt.Sprintf("PUT file://%v @%%GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE;", tmpFilePath))
if err != nil {
log.Fatalf("error while uploading file; err: %v", err)
}
fmt.Println("data_do_upload.csv successfully uploaded to internal stage.")

//Creating custom file format that describes our data
_, err = db.Exec("CREATE OR REPLACE TEMPORARY FILE FORMAT CUSTOM_CSV_FORMAT" +
" TYPE = CSV COMPRESSION = GZIP FIELD_DELIMITER = ';' FILE_EXTENSION = 'csv' SKIP_HEADER = 1;")
if err != nil {
log.Fatalf("error while creating file format; err: %v", err)
}
fmt.Println("Custom CSV format successfully created.")

//Loading data from files in stage area into table
_, err = db.Exec("COPY INTO GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE FILE_FORMAT = CUSTOM_CSV_FORMAT;")
if err != nil {
log.Fatalf("error while copying data into table; err: %v", err)
}
fmt.Println("Data successfully loaded into table. Querying...")

//Querying loaded data from table
rows, err := db.Query("SELECT * FROM GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE;")
if err != nil {
log.Fatalf("error while querying data from table; err: %v", err)
}
defer rows.Close()
printRows(rows)

//Downloading file from stage area to system's TMP directory
tmpDir := os.TempDir()
_, err = db.Exec(fmt.Sprintf("GET @%%GOSNOWFLAKE_FILES_TRANSFER_EXAMPLE/data_to_upload.csv file://%v/;", tmpDir))
if err != nil {
log.Fatalf("error while downloading data from internal stage area; err: %v", err)
}
fmt.Printf("File successfully downloaded from internal stage area to %v\n", tmpDir)

//Reading from downloaded file
file, err := os.Open(fmt.Sprintf("%v/data_to_upload.csv.gz", tmpDir))
if err != nil {
log.Fatalf("error while opening downloaded file; err: %v", err)
}
content, err := decompressAndRead(file)
if err != nil {
log.Fatalf("error while reading file; err: %v", err)
}
fmt.Printf("Downloaded file content: %#v\n", content)
}
2 changes: 1 addition & 1 deletion cmd/multistatement/multistatement.go
Expand Up @@ -19,7 +19,7 @@ func getDSN() (string, *sf.Config, error) {
return value
}
if failOnMissing {
log.Fatalf("#{key} environment not set")
log.Fatalf("%v environment not set", key)
}
return ""
}
Expand Down

0 comments on commit fd365a0

Please sign in to comment.