## Copy

Loads data into a table or into a location.

COPY INTO <table>

Loads data files to an existing table.

From:

- Named Internal Stage
- Named External Stage
- Table Stage 
- User Stage
- External Location: bypass of External stage.
    - AWS:  COPY INTO mytable FROM 's3://mybucket/./../a.csv';
    - GCP:  COPY INTO mytable FROM 'gcs://mybucket/./../a.csv'; 
    - Azure COPY INTO mytable FROM 'azure://myaccount.blob.core.windows.net/mycontainer/./../a.csv';

    

Base Syntax:

    COPY INTO [<namespace>.]<table_name>
         FROM { internalStage | externalStage | externalLocation }
    [ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
    [ PATTERN = '<regex_pattern>' ]
    [ FILE_FORMAT = ( { FORMAT_NAME = '[<namespace>.]<file_format_name>' |
                        TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
    [ copyOptions ]
    [ VALIDATION_MODE = RETURN_<n>_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS ]

Applying basic data Transformation:


    COPY INTO [<namespace>.]<table_name> [ ( <col_name> [ , <col_name> ... ] ) ]
         FROM ( SELECT [<alias>.]$<file_col_num>[.<element>] [ , [<alias>.]$<file_col_num>[.<element>] ... ]
                FROM { internalStage | externalStage } )
    [ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
    [ PATTERN = '<regex_pattern>' ]
    [ FILE_FORMAT = ( { FORMAT_NAME = '[<namespace>.]<file_format_name>' |
                        TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
  

  


Optional Parameters:
1. Files

    [ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]

List of file names separeted by comma. 
For internal stages: Name of the file only.
For external stages: the file path is set by concatenating the URL in the stage definition and the list of resolved file names.

Limit of 1000 files that can be specified.


2. Pattern
    PATTERN = 'regex_pattern'


A regular expression pattern string.


3. File Format 
    You can manually specify the format, or use a named format name.

    FILE_FORMAT = ( FORMAT_NAME = 'file_format_name' )

    OR 
    
    FILE_FORMAT = ( TYPE = CSV | JSON | AVRO | ORC | PARQUET | XML [ ... ] )


4. Validation Mode:

     RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

 Instructs the COPY command to validate the data files instead of loading them into the specified table.

    - RETURN_n_ROWS = Return N number of rows
    - RETURN_ERRORS = return all errors of this specific copy command.
    - RETURN_ALL_ERRORS = RETURN_ERRORS + files with errors that were partially loaded during an earlier load because the ON_ERROR copy option was set to CONTINUE during the load.

***Important***
- VALIDATION_MODE does not support COPY statements that transform data during a load.
- VALIDATION_MODE isn’t supported for Iceberg tables.
- Use the VALIDATE table function to view all errors encountered during a previous load. Note that this function also does not support COPY statements that transform data during a load.




### Copy Options

### CLUSTER_AT_INGEST_TIME = TRUE | FALSE

    Default = False

Pre-cluster data directly during ingestion for tables that are configured with clustering keys -> important for partition pruning.   

        
### ENFORCE_LENGTH / TRUNCATECOLUMNS  = TRUE | FALSE

    ENFORCE_LENGTH   = Default = True
    TRUNCATECOLUMNS  = Default = False

Decides if copy will break if string in the file is bigger than the table definition 
            
         
### FILE_PROCESSOR = (SCANNER = <custom_scanner_type> SCANNER_OPTIONS = (<scanner_options>))

Specifies the scanner and the scanner options that are used for processing unstructured data

### FORCE = TRUE | FALSE

    DEFAULT = FALSE

Loads all file, even if the files were already loaded before (14 days period)
         
### INCLUDE_METADATA = ( <column_name> = METADATA$<field> [ , <column_name> = METADATA${field} ... ] )
    
    DEFAULT = NULL
    
A user-defined mapping between a target table’s existing columns to its METADATA$ columns
- METADATA$FILENAME
- METADATA$FILE_ROW_NUMBER
- METADATA$FILE_CONTENT_KEY
- METADATA$FILE_LAST_MODIFIED
- METADATA$START_SCAN_TIME


Important:
- The INCLUDE_METADATA target column name must first exist in the table. The target column name is not automatically added if it doesn’t exist.
- Use a unique column name for the INCLUDE_METADATA columns.
- If name conflict with a column in the data file, the METADATA$ value that is defined by INCLUDE_METADATA takes precedence
- When you load a CSV file with INCLUDE_METADATA, set the file format option ERROR_ON_COLUMN_COUNT_MISMATCH to FALSE.
            
         
###         LOAD_MODE = { FULL_INGEST | ADD_FILES_COPY }

        DEFAULT = FULL_INGEST 

Specifies the mode to use when you load data from Parquet files into a Snowflake-managed Iceberg table.
         
###         LOAD_UNCERTAIN_FILES = TRUE | FALSE

         DEFAULT = FALSE 

Skips files when load status is unknown.

The load status is unknown if all of the following conditions are true:

- The file’s LAST_MODIFIED date (that is, the date when the file was staged) is older than 64 days.
- The initial set of data was loaded into the table more than 64 days earlier.    
- If the file was already loaded successfully into the table, this event occurred more than 64 days earlier.
         
 ###        MATCH_BY_COLUMN_NAME = CASE_SENSITIVE | CASE_INSENSITIVE | NONE

        DEFAULT = None

 String that specifies whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data.
 Does not work with copy transform.

 Works with the data formats:

 This copy option is supported for the following data formats:

- JSON        
- Avro        
- ORC        
- Parquet        
- CSV

Conditions:
- The column must have the exact same name as the column in the table.
- Column order does not matter
- The column in the table must have a data type that is compatible with the values in the column represented in the data.
    For example, string, number, and Boolean values can all be loaded into a variant column.         

            
###         ON_ERROR = { CONTINUE | SKIP_FILE | SKIP_FILE_<num> | 'SKIP_FILE_<num>%' | ABORT_STATEMENT }

         DEFAULT Copy = ABORT_STATEMENT   
         DEFAULT Snowpipe = SKIP_FILE   

CONTINUE -> Continue to load the file if errors are found
SKIP_FILE-> Skip a file when an error is found
    SKIP_FILE_10 -> Skip 10 files
    SKIP_FILE_10% -> Skip 10% of the files
ABORT_STATEMENT ->  Stop the load operation if any error is found in a data file   
            
         
###         PURGE = TRUE | FALSE

         DEFAULT = FALSE

Remove the data files from the stage automatically after the data is loaded successfully.

If the purge operation fails for any reason, no error is returned. 
         
 ###        RETURN_FAILED_ONLY = TRUE | FALSE

         DEFAULT = FALSE

Return only files that have failed to load in the statement result.

         
 ###        SIZE_LIMIT = <num>

         DEFAULT = NULL

Maximum size in bytes to be loaded.
       

In [None]:
%%sql -r dataframe_5
CREATE DATABASE IF NOT EXISTS SF_CERT_PREP;
USE SF_CERT_PREP;

In [None]:
%%sql -r dataframe_1
CREATE OR REPLACE STAGE SF_CERT_PREP.public.aws_stage
    url='s3://bucketsnowflakes3'
      DIRECTORY = (ENABLE = TRUE) -- better to see the file metadata, but not mandatory   
    ;


In [None]:
%%sql -r dataframe_2
LIST @SF_CERT_PREP.public.aws_stage;

In [None]:
%%sql -r dataframe_17
select $1,$2, $3 from @SF_CERT_PREP.public.aws_stage/Loan_payments_data.csv;

-- By default SF uses CSV as file format.

In [None]:
%%sql -r dataframe_19
select $1,$2, $3 from @SF_CERT_PREP.public.aws_stage/OrderDetails.csv;

In [None]:
%%sql -r dataframe_20
select $1,$2, $3 from @SF_CERT_PREP.public.aws_stage/sampledata.csv;

In [None]:
%%sql -r dataframe_25
create or replace file format SF_CERT_PREP.public.ff_csv_skip_header
type = csv
field_delimiter=','
skip_header=1;

In [None]:
%%sql -r dataframe_10
create or replace file format SF_CERT_PREP.public.ff_csv_parse_header
type = csv
field_delimiter=','
PARSE_HEADER = TRUE -- Mandatory for when ingesting metadata;
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE   -- mandatory  for when ingesting metadata;!

In [None]:
%%sql -r dataframe_3
CREATE OR REPLACE TABLE SF_CERT_PREP.public.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30)
)

In [None]:
%%sql -r dataframe_11
COPY INTO SF_CERT_PREP.public.ORDERS
FROM @SF_CERT_PREP.public.aws_stage
file_format =SF_CERT_PREP.public.ff_csv_skip_header
files = ('OrderDetails.csv');

In [None]:
%%sql -r dataframe_21
COPY INTO SF_CERT_PREP.public.ORDERS
FROM @SF_CERT_PREP.public.aws_stage
file_format =SF_CERT_PREP.public.ff_csv_skip_header
pattern ='OrderDetails.csv';



In [None]:
%%sql -r dataframe_23
-- The same file will not be loaded again.
-- Option 1 truncate table
-- Option 2 force=true

truncate table SF_CERT_PREP.public.ORDERS;
 
COPY INTO SF_CERT_PREP.public.ORDERS
FROM @SF_CERT_PREP.public.aws_stage
file_format =SF_CERT_PREP.public.ff_csv_skip_header
pattern ='OrderDetails.csv'
force=true
;

## Metadata Columns

### METADATA$FILENAME
Name of the staged data file the current row belongs to. Includes the full path to the data file.

### METADATA$FILE_ROW_NUMBER
Row number for each record in the staged data file.

### METADATA$FILE_CONTENT_KEY
Checksum of the staged data file the current row belongs to.

#### METADATA$FILE_LAST_MODIFIED
Last modified timestamp of the staged data file the current row belongs to. Returned as TIMESTAMP_NTZ.

#### METADATA$START_SCAN_TIME
Start timestamp of operation for each record in the staged data file. Returned as TIMESTAMP_LTZ.

In [None]:
%%sql -r dataframe_22
CREATE OR REPLACE TABLE SF_CERT_PREP.public.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    SUBCATEGORY VARCHAR(30),
    META_FILENAME TEXT,
    META_FILE_ROW_NUMBER INT,
    META_FILE_CONTENT_KEY TEXT,
    META_FILE_LAST_MODIFIED TIMESTAMP_LTZ,
    META_START_SCAN_TIME TIMESTAMP_LTZ    
)

In [None]:
%%sql -r dataframe_14
 COPY INTO SF_CERT_PREP.public.ORDERS
     FROM @SF_CERT_PREP.public.aws_stage
     file_format =SF_CERT_PREP.public.ff_csv_parse_header
      pattern ='OrderDetails.csv'
      INCLUDE_METADATA = (
META_FILENAME=METADATA$FILENAME
META_FILE_ROW_NUMBER=METADATA$FILE_ROW_NUMBER
META_FILE_CONTENT_KEY=METADATA$FILE_CONTENT_KEY
META_FILE_LAST_MODIFIED= METADATA$FILE_LAST_MODIFIED
META_START_SCAN_TIME=METADATA$START_SCAN_TIME      
      )
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE -- mandatory now!
      
      ;            



In [None]:
%%sql -r dataframe_4
select * from SF_CERT_PREP.public.orders;

-- Note that the fields "order_id" and "subcategory" now have null values.
-- this is caused because the names of the columns in the source files were a bit different than the ones in the table.

In [None]:
%%sql -r dataframe_9
-- In this case, the one approach is to rename the columns in the files or in the table and then reload.
-- Since using Include_metadata does not allow any transformation

CREATE OR REPLACE TABLE SF_CERT_PREP.public.ORDERS (
    "ORDER ID" VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    "Sub-Category" VARCHAR(30),
    META_FILENAME TEXT,
    META_FILE_ROW_NUMBER INT,
    META_FILE_CONTENT_KEY TEXT,
    META_FILE_LAST_MODIFIED TIMESTAMP_LTZ,
    META_START_SCAN_TIME TIMESTAMP_LTZ    
);

 COPY INTO SF_CERT_PREP.public.ORDERS
     FROM @SF_CERT_PREP.public.aws_stage
     file_format =SF_CERT_PREP.public.ff_csv_parse_header -- mandatory to match column by name
      pattern ='OrderDetails.csv'
      INCLUDE_METADATA = (
META_FILENAME=METADATA$FILENAME
META_FILE_ROW_NUMBER=METADATA$FILE_ROW_NUMBER
META_FILE_CONTENT_KEY=METADATA$FILE_CONTENT_KEY
META_FILE_LAST_MODIFIED= METADATA$FILE_LAST_MODIFIED
META_START_SCAN_TIME=METADATA$START_SCAN_TIME      
      )
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE -- mandatory now!
      
      ;        

select * from SF_CERT_PREP.public.orders;


In [None]:
%%sql -r dataframe_24
-- The second option is to remove the include_metadata and dynamically insert the columns with copy transform.
-- More realistic solution.
CREATE OR REPLACE TABLE SF_CERT_PREP.public.ORDERS (
    ORDER_ID VARCHAR(30),
    AMOUNT INT,
    PROFIT INT,
    QUANTITY INT,
    CATEGORY VARCHAR(30),
    Sub_Category VARCHAR(30),
    META_FILENAME TEXT,
    META_FILE_ROW_NUMBER INT,
    META_FILE_CONTENT_KEY TEXT,
    META_FILE_LAST_MODIFIED TIMESTAMP_LTZ,
    META_START_SCAN_TIME TIMESTAMP_LTZ    
);

 COPY INTO SF_CERT_PREP.public.ORDERS
     FROM(
        select
         $1
,$2
,$3
,$4
,$5
,$6
,METADATA$FILENAME
,METADATA$FILE_ROW_NUMBER
,METADATA$FILE_CONTENT_KEY
,METADATA$FILE_LAST_MODIFIED
,METADATA$START_SCAN_TIME  from  @SF_CERT_PREP.public.aws_stage)
     file_format =SF_CERT_PREP.public.ff_csv_skip_header
      pattern ='OrderDetails.csv'
      
      
      ;        

select * from SF_CERT_PREP.public.orders;


## Copy into Stage

Unload data

    
    COPY INTO { internalStage | externalStage | externalLocation }
         FROM { [<namespace>.]<table_name> | ( <query> ) }
    [ PARTITION BY <expr> ]
    [ FILE_FORMAT = ( { FORMAT_NAME = '[<namespace>.]<file_format_name>' |
                        TYPE = { CSV | JSON | PARQUET } [ formatTypeOptions ] } ) ]
    [ copyOptions ]
    [ VALIDATION_MODE = RETURN_ROWS ]
    [ HEADER ]


### Optional parameters:

#### PARTITION BY expr:


Specifies an expression used to partition the unloaded table rows into separate files


copyOptions ::=
OVERWRITE = TRUE | FALSE

    DEFAULT = FALSE 

whether the COPY command overwrites existing files with matching names.


SINGLE = TRUE | FALSE

    DEFAULT = FALSE 

whether to generate a single file or multiple files. If FALSE, a filename prefix must be included in path.

***Important***:
If SINGLE = TRUE, then COPY ignores the FILE_EXTENSION file format option and outputs a file simply named data


MAX_FILE_SIZE = <num>

    DEFAULT = 16MB
    MAX = 5GB

maximum size (in bytes) of each file to be generated in parallel per thread.
Snowflake utilizes parallel execution to optimize performance. The number of threads can’t be modified.


INCLUDE_QUERY_ID = TRUE | FALSE

    DEFAULT = FALSE

uniquely identify unloaded files by including a universally unique identifier (UUID) in the filenames of unloaded data files


DETAILED_OUTPUT = TRUE | FALSE


    DEFAULT = FALSE

TRUE: output includes a row for each file unloaded to the specified stage
FALSE: output consists of a single row that describes the entire unload operation
    

In [None]:
%%sql -r dataframe_29
create or replace stage SF_CERT_PREP.public.my_internal_stage
 DIRECTORY = (ENABLE = TRUE) ;

In [None]:
%%sql -r dataframe_32
select * from SF_CERT_PREP.public.orders;

In [None]:
%%sql -r dataframe_30
copy into @SF_CERT_PREP.public.my_internal_stage
from SF_CERT_PREP.public.orders;

In [None]:
%%sql -r dataframe_31
list @SF_CERT_PREP.public.my_internal_stage;

In [None]:
%%sql -r dataframe_33
copy into @SF_CERT_PREP.public.my_internal_stage/version2
file_Format = (type =CSV)
header = true
from SF_CERT_PREP.public.orders;

## Querying history

### INFORMATION_SCHEMA.LOAD_HISTORY


    select * from SF_CERT_PREP.INFORMATION_SCHEMA.LOAD_HISTORy where table_name='ORDERS';

It will show only dataloads for the current version of the table. In case table is dropped, it will not be shown here.


### snowflake.account_usage.load_history

    select * from  snowflake.account_usage.load_history where table_name='ORDERS';


it shows the history of data loaded into tables using the COPY INTO <table> command within the last 365 days (1 year).
The view displays one row for each file loaded.

In most cases, latency for the view may be up to 90 minutes. 

The latency for a given table’s load history in the view may be up to 2 days if both of the following conditions are true:

- Fewer than 32 DML statements have been added to the given table since it was last updated in LOAD_HISTORY.
- Fewer than 100 rows have been added to the given table since it was last updated in LOAD_HISTORY.

IMPORTANT: This view does not return the history of data loaded using Snowpipe


### snowflake.account_usage.copy_history

This Account Usage view can be used to query Snowflake data loading history for the last 365 days (1 year).

- it displays load activity for COPY INTO statements and Snowpipe. 
- The view avoids the 10,000 row limitation of the LOAD_HISTORY view.

    select * from SNOWFLAKE.ACCOUNT_USAGE.COPY_HISTORY where table_name='ORDERS';

In most cases, latency for the view may be up to 120 minutes. 
The latency for a given table’s load history in the view may be up to 2 days if both of the following conditions are true:

- Fewer than 32 DML statements have been added to the given table since it was last updated in LOAD_HISTORY.
- Fewer than 100 rows have been added to the given table since it was last updated in LOAD_HISTORY.

    

In [None]:
%%sql -r dataframe_16
select * from SF_CERT_PREP.INFORMATION_SCHEMA.LOAD_HISTORy where table_name='ORDERS';
-- Only show live loads, all loads that happeed in tables dropped, will not be shown here

In [None]:
select * from  snowflake.account_usage.load_history where table_name='ORDERS';
-- do not include snowpipe

In [None]:
%%sql -r dataframe_27
select * from SNOWFLAKE.ACCOUNT_USAGE.COPY_HISTORY where table_name='ORDERS';
-- include snowpipe copy