# ETL Project

This project aims to use Python to interact with Postgre Database.

At part1, we have already learnt some basic interactions between python and psql. Now we're going to use python to create tables and insert csv datasets into psql tables.

### Reference Vedio

https://www.youtube.com/watch?v=POjDCe-_G8k&list=PLBJe2dFI4sgukOW6O0B-OVyX9c6fQKJ2N&index=3

### Prerequisite

Follow the ETL Project Part1 instruction.

Download three datasets from this link and place them in the folder where this notebook is located: https://drive.google.com/drive/folders/1crEAKkiZg60oNKK37WzfpRo_M6E1MXSz

In [83]:
import psycopg2
import pandas as pd
import numpy as np

### Examinate the three datasets.

In [22]:
AccountsCountry = pd.read_csv("Wealth-AccountsCountry.csv")
AccountData = pd.read_csv("Wealth-AccountData.csv")
AccountSeries = pd.read_csv("Wealth-AccountSeries.csv")

In [16]:
AccountsCountry.head(1)

Unnamed: 0,Code,Long Name,Income Group,Region,Lending category,Other groups,Currency Unit,Latest population census,Latest household survey,Special Notes,...,Source of most recent Income and expenditure data,Vital registration complete,Latest agricultural census,Latest industrial data,Latest trade data,Latest water withdrawal data,2-alpha code,WB-2 code,Table Name,Short Name
0,ALB,Republic of Albania,Upper middle income,Europe & Central Asia,IBRD,,Albanian lek,2020 (expected),"Demographic and Health Survey, 2017/18",,...,Living Standards Measurement Study Survey (LSM...,Yes,2012,2013.0,2018.0,2006.0,AL,AL,Albania,Albania


In [17]:
AccountData.head(1)

Unnamed: 0,Country Name,Country Code,Series Name,Series Code,1995 [YR1995],1996 [YR1996],1997 [YR1997],1998 [YR1998],1999 [YR1999],2000 [YR2000],...,2009 [YR2009],2010 [YR2010],2011 [YR2011],2012 [YR2012],2013 [YR2013],2014 [YR2014],2015 [YR2015],2016 [YR2016],2017 [YR2017],2018 [YR2018]
0,Albania,ALB,Human capital (constant 2018 US$),NW.HCA.TO,44900000000.0,43400000000.0,37100000000.0,38800000000.0,42200000000.0,43600000000.0,...,66100000000.0,68100000000.0,68500000000.0,70800000000.0,71600000000.0,72500000000.0,73700000000.0,75700000000.0,78000000000.0,81200000000.0


In [18]:
AccountSeries.head(1)

Unnamed: 0,Code,Indicator Name,Long definition,Source,Topic,Unit of measure,Periodicity,Reference period,Statistical concept and methodology,Previous Indicator Code,Previous Indicator Name
0,NW.HCA.TO,Human capital (constant 2018 US$),Human capital is computed as the present value...,World Bank. 2021. The Changing Wealth of Natio...,Human capital,Constant 2018 US$,Annual,1995-2018,Total wealth is calculated by summing up estim...,,


The relation among these three datasets:

1. AccountsCountry.Code = AccountData.Country Code

2. AccountData.Series Name = AccountSeries.Indicator Name

### Data Cleaning

1. select useful columns from AccountsCountry and rename 'Code' to 'Country Code'

In [23]:
print(AccountsCountry.columns)
# select some useful columns
AccountsCountry_clean = AccountsCountry[['Code','Short Name','Table Name','Long Name','Currency Unit']]
# Rename 'Code' to 'Country Code'
AccountsCountry_clean.rename(columns={'Code': 'Country Code'}, inplace=True)
print(AccountsCountry_clean.columns)

Index(['Code', 'Long Name', 'Income Group', 'Region', 'Lending category',
       'Other groups', 'Currency Unit', 'Latest population census',
       'Latest household survey', 'Special Notes',
       'National accounts base year', 'National accounts reference year',
       'System of National Accounts', 'SNA price valuation',
       'Alternative conversion factor', 'PPP survey years',
       'Balance of Payments Manual in use', 'External debt Reporting status',
       'System of trade', 'Government Accounting concept',
       'IMF data dissemination standard',
       'Source of most recent Income and expenditure data',
       'Vital registration complete', 'Latest agricultural census',
       'Latest industrial data', 'Latest trade data',
       'Latest water withdrawal data', '2-alpha code', 'WB-2 code',
       'Table Name', 'Short Name'],
      dtype='object')
Index(['Country Code', 'Short Name', 'Table Name', 'Long Name',
       'Currency Unit'],
      dtype='object')


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  AccountsCountry_clean.rename(columns={'Code': 'Country Code'}, inplace=True)


2. select useful columns, rename 'Series Name' to 'Indicator Name' and convert columns to float.

In [84]:
print(AccountData.columns)
#select useful columns
AccountData_clean = AccountData[['Country Name','Country Code','Series Name','Series Code','1995 [YR1995]', '1996 [YR1996]', '1997 [YR1997]']]
# rename 'Series Name' to 'indicator name'
AccountData_clean.rename(columns={'Series Name': 'Indicator Name'}, inplace=True)
print(AccountData_clean.columns)
# convert to float
columns_to_convert = ['1995 [YR1995]', '1996 [YR1996]', '1997 [YR1997]'] 
AccountData_clean[columns_to_convert] = AccountData_clean[columns_to_convert].replace('..', np.nan)
AccountData_clean[columns_to_convert] = AccountData_clean[columns_to_convert].astype(float)


Index(['Country Name', 'Country Code', 'Series Name', 'Series Code',
       '1995 [YR1995]', '1996 [YR1996]', '1997 [YR1997]', '1998 [YR1998]',
       '1999 [YR1999]', '2000 [YR2000]', '2001 [YR2001]', '2002 [YR2002]',
       '2003 [YR2003]', '2004 [YR2004]', '2005 [YR2005]', '2006 [YR2006]',
       '2007 [YR2007]', '2008 [YR2008]', '2009 [YR2009]', '2010 [YR2010]',
       '2011 [YR2011]', '2012 [YR2012]', '2013 [YR2013]', '2014 [YR2014]',
       '2015 [YR2015]', '2016 [YR2016]', '2017 [YR2017]', '2018 [YR2018]'],
      dtype='object')
Index(['Country Name', 'Country Code', 'Indicator Name', 'Series Code',
       '1995 [YR1995]', '1996 [YR1996]', '1997 [YR1997]'],
      dtype='object')


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  AccountData_clean.rename(columns={'Series Name': 'Indicator Name'}, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  AccountData_clean[columns_to_convert] = AccountData_clean[columns_to_convert].replace('..', np.nan)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  AccountData_clean[columns_to_convert] = AccountData_clean[columns_to_convert].as

3. select useful columns and rename 'Code' to 'Series Code'

In [25]:
print(AccountSeries.columns)
#select useful columns
AccountSeries_clean = AccountSeries[['Code','Topic','Indicator Name']]
#rename 'Code' to 'Series Code'
AccountSeries_clean.rename(columns={'Code': 'Series Code'}, inplace=True)
print(AccountSeries_clean.columns)

Index(['Code', 'Indicator Name', 'Long definition', 'Source', 'Topic',
       'Unit of measure', 'Periodicity', 'Reference period',
       'Statistical concept and methodology', 'Previous Indicator Code',
       'Previous Indicator Name'],
      dtype='object')
Index(['Series Code', 'Topic', 'Indicator Name'], dtype='object')


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  AccountSeries_clean.rename(columns={'Code': 'Series Code'}, inplace=True)


### Define functions for creation of tables in psql

In [52]:
def create_database():
    #connect to the default database
    conn = psycopg2.connect("host=127.0.0.1 dbname=postgres user=postgres password=s516824")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    #create new database
    cur.execute("DROP DATABASE accounts")
    cur.execute("CREATE DATABASE accounts")
    
    #close connection to default database
    conn.close()
    
    #connect to new database
    conn = psycopg2.connect("host=127.0.0.1 dbname=accounts user=postgres password=s516824")
    cur = conn.cursor()
    
    return cur, conn

In [53]:
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

In [54]:
def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

### Create three new tables

In [68]:
cur, conn = create_database()

In [69]:
AccountsCountry_create = ("""
    CREATE TABLE IF NOT EXISTS accountscountry(
        country_code VARCHAR,
        short_name VARCHAR,
        table_name VARCHAR,
        long_name VARCHAR,
        currency_unit VARCHAR
    )
""")

In [70]:
cur.execute(AccountsCountry_create)
conn.commit()

In [71]:
AccountData_create = ("""
    CREATE TABLE IF NOT EXISTS accountsdata(
        country_name VARCHAR,
        country_code VARCHAR,
        indicator_name VARCHAR,
        series_code VARCHAR,
        year_1995 numeric,
        year_1996 numeric,
        year_1997 numeric
    )
""")

In [72]:
cur.execute(AccountData_create)
conn.commit()

In [73]:
AccountSeries_create = ("""
    CREATE TABLE IF NOT EXISTS accountsseries(
        series_code VARCHAR,
        topic VARCHAR,
        indicator_name VARCHAR
    )
""")
cur.execute(AccountSeries_create)
conn.commit()

### Insert data into tables

In [91]:
AccountsCountry_insert = ("""
    INSERT INTO accountscountry(
        country_code ,
        short_name ,
        table_name ,
        long_name ,
        currency_unit
    )
    VALUES (%s,%s,%s,%s,%s)
""")

In [92]:
for i, row in AccountsCountry_clean.iterrows():
    cur.execute(AccountsCountry_insert, list(row))
conn.commit()

In [93]:
AccountData_insert = ("""
    INSERT INTO accountsdata(
        country_name ,
        country_code ,
        indicator_name ,
        series_code ,
        year_1995 ,
        year_1996 ,
        year_1997 
    )
    VALUES(%s,%s,%s,%s,%s,%s,%s)
""")

In [94]:
for i, row in AccountData_clean.iterrows():
    cur.execute(AccountData_insert, list(row))
conn.commit()

In [88]:
AccountSeries_insert = ("""
    INSERT INTO accountsseries(
        series_code ,
        topic ,
        indicator_name 
    )
    VALUES(%s,%s,%s)
""")

In [89]:
for i, row in AccountSeries_clean.iterrows():
    cur.execute(AccountSeries_insert, list(row))
conn.commit()

We can also use SQL Shell to check if the data is sucessfully inserted into tables. For example, using "select * from accountscountry limit 5;" to check the first 5 rows of accountscountry table.

In [95]:
conn.close()

Now, we have successfully created tables in psql. Then we can procceed to do data transformation in psql.