# <center>Using PostgreSQL Upsert</center>

### Author:  Bryan Cafferky  - For Demonstration Purposes Only

### Not intended for production use.

### If you don't have SQLAlchemy installed uncomment one fo the 2 cells below and execute it to install SQLLAlchemy.

In [None]:
# pip install sqlalchemy  # Run only if you do not have SQLAlchemy installed!

In [None]:
# conda install -c anaconda sqlalchemy # Using Conda - Run only if you do not have SQLAlchemy installed!

### PostgreSQL is CASE SENSITIVE so we will stick to lower case. 

In [52]:
from sqlalchemy import create_engine

# connection string: driver://username:password@server/database
conn = create_engine('postgresql+psycopg2://postgres:bryan@localhost/postgres')

### We need to create the table with a primary key before saving data to it.
### The Primary Key uniquely identifies each row which we need so we can tell PostgreSQL which rows to Update or Delete.

In [53]:
conn.execute('DROP TABLE IF EXISTS youtube_customer')
conn.execute('DROP TABLE IF EXISTS youtube_cust_trans')

<sqlalchemy.engine.result.ResultProxy at 0x1f5d253ee50>

In [54]:
conn.execute('''
CREATE TABLE youtube_customer (
    customerkey   INTEGER PRIMARY KEY NOT NULL,
    lastname      TEXT,
    birthdate     TEXT,
    maritalstatus TEXT,
    yearlyincome  REAL,
    modifieddate  TEXT,
    etllastupdate TIMESTAMP DEFAULT current_timestamp 
);
''')

<sqlalchemy.engine.result.ResultProxy at 0x1f5d251d400>

In [55]:
import pandas as pd

### Load the master table...

In [56]:
custdf = pd.read_csv("data/dimcustomer.csv", index_col = False)

In [57]:
custdf

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,modifieddate
0,11000,Yang,1971-10-06,M,90000.0,2019-01-01
1,11001,Huang,1976-05-10,S,60000.0,2019-01-01
2,11002,Torres,1971-02-09,M,60000.0,2019-01-01
3,11003,Zhu,1973-08-14,S,70000.0,2019-01-01


### Save the dataframe to a table...

### Warning!!! Do not use if_exists='replace' or you will lose the primary key because the table gets dropped and created again!

In [58]:
custdf.to_sql('youtube_customer', conn, if_exists='append', index = False)

In [59]:
pd.read_sql_query("select * from youtube_customer", conn)

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,modifieddate,etllastupdate
0,11000,Yang,1971-10-06,M,90000.0,2019-01-01,2021-06-27 10:14:00.489163
1,11001,Huang,1976-05-10,S,60000.0,2019-01-01,2021-06-27 10:14:00.489163
2,11002,Torres,1971-02-09,M,60000.0,2019-01-01,2021-06-27 10:14:00.489163
3,11003,Zhu,1973-08-14,S,70000.0,2019-01-01,2021-06-27 10:14:00.489163


#### Getting meta data...

In [60]:
# Get table schema...
import pandas as pd 

pd.read_sql_query("""
SELECT * FROM information_schema.columns
WHERE table_name = 'youtube_customer'
ORDER BY ordinal_position
""", conn)

Unnamed: 0,table_catalog,table_schema,table_name,column_name,ordinal_position,column_default,is_nullable,data_type,character_maximum_length,character_octet_length,...,is_identity,identity_generation,identity_start,identity_increment,identity_maximum,identity_minimum,identity_cycle,is_generated,generation_expression,is_updatable
0,postgres,public,youtube_customer,customerkey,1,,NO,integer,,,...,NO,,,,,,NO,NEVER,,YES
1,postgres,public,youtube_customer,lastname,2,,YES,text,,1073742000.0,...,NO,,,,,,NO,NEVER,,YES
2,postgres,public,youtube_customer,birthdate,3,,YES,text,,1073742000.0,...,NO,,,,,,NO,NEVER,,YES
3,postgres,public,youtube_customer,maritalstatus,4,,YES,text,,1073742000.0,...,NO,,,,,,NO,NEVER,,YES
4,postgres,public,youtube_customer,yearlyincome,5,,YES,real,,,...,NO,,,,,,NO,NEVER,,YES
5,postgres,public,youtube_customer,modifieddate,6,,YES,text,,1073742000.0,...,NO,,,,,,NO,NEVER,,YES
6,postgres,public,youtube_customer,etllastupdate,7,CURRENT_TIMESTAMP,YES,timestamp without time zone,,,...,NO,,,,,,NO,NEVER,,YES


### Lets load the transaction file...

In [61]:
transdf = pd.read_csv("data/dimcustomertransactions.csv", index_col = None)
transdf

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,actionind,modifieddate
0,11000,Yang,1971-10-06,M,250000.0,U,2020-01-01
1,11001,Jones,1976-05-10,S,360000.0,U,2019-02-01
2,333301,Murhpy,1975-02-09,M,33000.0,A,2018-01-01
3,333302,Jain,1980-01-09,M,28000.0,A,2020-02-01
4,11002,Torres,1971-02-09,M,60000.0,D,2020-02-01


In [62]:
transdf.to_sql('youtube_custtrans', conn, if_exists='replace', index = False)
pd.read_sql_query("select * from youtube_custtrans", conn)

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,actionind,modifieddate
0,11000,Yang,1971-10-06,M,250000.0,U,2020-01-01
1,11001,Jones,1976-05-10,S,360000.0,U,2019-02-01
2,333301,Murhpy,1975-02-09,M,33000.0,A,2018-01-01
3,333302,Jain,1980-01-09,M,28000.0,A,2020-02-01
4,11002,Torres,1971-02-09,M,60000.0,D,2020-02-01


### Let's update the customer table with the transactions.
- Add    - If the customer is not found, insert transaction as new customer.
- Change - If the customer is found, update the existing customer with the transaction data.
- Delete - If the DropInd = 'Y', remove the customer row. 

#### Note:  You must supply column names to allow the ETLLastUpdate to Default...

_____

## Upserting from a transaction table...

### Note:  Use PgAdmin to test the Upsert statement!

In [None]:
#  Sample code - do not execute...
INSERT INTO mytable 
  SELECT * FROM mytransactions 
  ON CONFLICT(keycolumn) 
    DO UPDATE SET columnname=excluded.columnname;

####  - You can filter on the transaction table in the query.
####  - Notice we do not update BirthDate.

In [63]:
sql = '''
INSERT INTO youtube_customer(customerkey, lastname, birthdate, maritalstatus, yearlyincome, modifieddate)
  SELECT customerkey, lastname, birthdate, maritalstatus, yearlyincome, modifieddate 
  FROM youtube_custtrans 
  WHERE ActionInd in ('A', 'U')
  ON CONFLICT(customerkey) 
  DO UPDATE SET 
    lastname=excluded.lastname,
    maritalstatus=excluded.maritalstatus,
    yearlyincome = excluded.yearlyincome, 
    modifieddate = excluded.modifieddate,
    etllastupdate = current_timestamp
  WHERE excluded.modifieddate > youtube_customer.modifieddate
'''
print(sql)


INSERT INTO youtube_customer(customerkey, lastname, birthdate, maritalstatus, yearlyincome, modifieddate)
  SELECT customerkey, lastname, birthdate, maritalstatus, yearlyincome, modifieddate 
  FROM youtube_custtrans 
  WHERE ActionInd in ('A', 'U')
  ON CONFLICT(customerkey) 
  DO UPDATE SET 
    lastname=excluded.lastname,
    maritalstatus=excluded.maritalstatus,
    yearlyincome = excluded.yearlyincome, 
    modifieddate = excluded.modifieddate,
    etllastupdate = current_timestamp
  WHERE excluded.modifieddate > youtube_customer.modifieddate



In [64]:
conn.execute(sql)

<sqlalchemy.engine.result.ResultProxy at 0x1f5d2573c10>

In [65]:
pd.read_sql_query("select * from youtube_custtrans", conn)

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,actionind,modifieddate
0,11000,Yang,1971-10-06,M,250000.0,U,2020-01-01
1,11001,Jones,1976-05-10,S,360000.0,U,2019-02-01
2,333301,Murhpy,1975-02-09,M,33000.0,A,2018-01-01
3,333302,Jain,1980-01-09,M,28000.0,A,2020-02-01
4,11002,Torres,1971-02-09,M,60000.0,D,2020-02-01


In [66]:
pd.read_sql_query("SELECT * FROM youtube_customer ORDER BY customerkey", conn)

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,modifieddate,etllastupdate
0,11000,Yang,1971-10-06,M,250000.0,2020-01-01,2021-06-27 10:25:38.039102
1,11001,Jones,1976-05-10,S,360000.0,2019-02-01,2021-06-27 10:25:38.039102
2,11002,Torres,1971-02-09,M,60000.0,2019-01-01,2021-06-27 10:14:00.489163
3,11003,Zhu,1973-08-14,S,70000.0,2019-01-01,2021-06-27 10:14:00.489163
4,333301,Murhpy,1975-02-09,M,33000.0,2018-01-01,2021-06-27 10:25:38.039102
5,333302,Jain,1980-01-09,M,28000.0,2020-02-01,2021-06-27 10:25:38.039102


In [67]:
# Delete
conn.execute('''DELETE FROM youtube_customer  
                WHERE CustomerKey IN (
                SELECT CustomerKey 
                FROM youtube_custtrans where ActionInd = 'D');''')

pd.read_sql_query("select * from youtube_customer ORDER BY customerkey", conn)

Unnamed: 0,customerkey,lastname,birthdate,maritalstatus,yearlyincome,modifieddate,etllastupdate
0,11000,Yang,1971-10-06,M,250000.0,2020-01-01,2021-06-27 10:25:38.039102
1,11001,Jones,1976-05-10,S,360000.0,2019-02-01,2021-06-27 10:25:38.039102
2,11003,Zhu,1973-08-14,S,70000.0,2019-01-01,2021-06-27 10:14:00.489163
3,333301,Murhpy,1975-02-09,M,33000.0,2018-01-01,2021-06-27 10:25:38.039102
4,333302,Jain,1980-01-09,M,28000.0,2020-02-01,2021-06-27 10:25:38.039102


# Let's close the connection. 

In [68]:
conn.dispose()