# COPY INTO and MERGE Commands

### COPY INTO Command

> - Incrementally loads data into Delta Lake tables from Cloud Storage  
> - Supports schema evolution  
> - Supports wide range of file formats (CSV, JSON, Parquet, Delta)  
> - Alternative to Auto Loader for batch ingestion  

Documentation - [COPY INTO](https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-copy-into)

#### Create the table to copy the data into

In [0]:
drop table if exists swathy_catalog.demo_db.raw_stock_prices;
CREATE TABLE IF NOT EXISTS swathy_catalog.demo_db.raw_stock_prices;

#### Incrementally load new files into the table

In [0]:
copy into swathy_catalog.demo_db.raw_stock_prices
from '/Volumes/workspace/default/databricks-1/stockprices'
fileformat = json
format_options ('inferSchema' = 'true')
copy_options ('mergeSchema' = 'true');



In [0]:
select * from swathy_catalog.demo_db.raw_stock_prices;

In [0]:
--add another file in volume and run again

COPY INTO swathy_catalog.demo_db.raw_stock_prices
FROM '/Volumes/workspace/default/databricks-1/stockprices/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true'); --mergeschema=true means if you have any new column in another file add that column as well

In [0]:
select * from swathy_catalog.demo_db.raw_stock_prices

### MERGE Statement
> - Used for upserts (Insert/ Update/ Delete operations in a single statement)
> - Allows merging new data into a target table based on matching condition

Documentation - [MERGE INTO](https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-merge-into)

#### Create the table to merge the data into

In [0]:
drop table if exists swathy_catalog.demo_db.stock_prices;
CREATE TABLE IF NOT EXISTS swathy_catalog.demo_db.stock_prices
(
  stock_id STRING,
  price DOUBLE,
  trading_date DATE
);

#### Merge the source data into target table
1. Insert new stocks received
2. Update price and trading_date if updates receieved
3. Delete stocks which are de-listed from the exchange (status = 'DELISTED')

In [0]:
merge into swathy_catalog.demo_db.stock_prices as target
using swathy_catalog.demo_db.raw_stock_prices as source
on target.stock_id = source.stock_id
when matched and source.status = 'ACTIVE' 
then update set 
  target.price = source.price, 
  target.trading_date = source.trading_date
when matched and source.status = 'DELISTED' 
then delete
when not matched and source.status = 'ACTIVE' 
then insert (stock_id, price, trading_date) values (source.stock_id, source.price, source.trading_date);

In [0]:
select * from swathy_catalog.demo_db.stock_prices

In [0]:
delete from swathy_catalog.demo_db.raw_stock_prices;

copy into swathy_catalog.demo_db.raw_stock_prices
from '/Volumes/workspace/default/databricks-1/stockprices'
fileformat = json
format_options ('inferSchema' = 'true')
copy_options ('mergeSchema' = 'true');

In [0]:
select * from swathy_catalog.demo_db.raw_stock_prices;

In [0]:
merge into swathy_catalog.demo_db.stock_prices as target
using swathy_catalog.demo_db.raw_stock_prices as source
on target.stock_id = source.stock_id
when matched and source.status = 'ACTIVE' 
then update set 
  target.price = source.price, 
  target.trading_date = source.trading_date
when matched and source.status = 'DELISTED' 
then delete
when not matched and source.status = 'ACTIVE' 
then insert (stock_id, price, trading_date) values (source.stock_id, source.price, source.trading_date);

In [0]:
select * from swathy_catalog.demo_db.stock_prices;

In [0]:
merge into customers tgt
using (select customer_id, cust_name, email, city from json.`/Volumes/workspace/default/databricks-1/customers1.json`) src
on tgt.customer_id = src.customer_id
when not matched then insert *