# Connecting to Relational Databases - Postgres as example

Here we will use SQL Alchemy as the connector to the database we setup on Heroku. SQL Alchemy is a Python toolkit and Object Relational Mapper that helps you connect to SQL with full power and performance. [Here is an example of how we do the connection](https://towardsdatascience.com/deploy-free-postgresql-database-in-heroku-and-ingest-data-8002c574a57d). I have already open a PostgreSQL on Heruko. Heruko is a platform where supports database, web deployment, and etc.

In [1]:
import os
import subprocess
import pandas as pd
import yaml
from sqlalchemy.engine.create import create_engine
from sqlalchemy.types import Integer, String, DECIMAL, DateTime, VARCHAR
from sqlalchemy import Column, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import psycopg2
# os.environ["HEROKU_APP_NAME"] = "wjp-postgres-db"

In [2]:
os.chdir("/home/jovyan")

## PostgresSQL on Heroku

**Once we setup the heroku app name and create a postgresSQL database, we will get the app name and the db_url. We now use psycopg2 to interact with Postgres, most of the time SQL databases use the same approach.**

But since we are using SQL Alchemy for the database interaction, we need to trnasform the db_url to a format that SQL Alchemy will accept, see below(deprecated)

In [3]:
# the postgres database on heroku (Set up by us)
heroku_app_name = "wjp-postgres-db"

# option1: If you paste it directly, remeber to remove them before push, I will use yaml file to load the info
def get_postgres_url(
    dbname=None, 
    host=None, 
    user=None, 
    password=None, 
    config_file=True,
    local_db=False,
    path="src/configs/postgres-config.yaml"
    ):

    if local_db:
        user = "postgres"
        password = "123"
        host = "postgres-image-dev"  # or use the container name postgres-env
        dbname = "postgres"
        return f"postgresql+psycopg2://{user}:{password}@{host}:5432/{dbname}"

    elif config_file:
        with open(path) as file:
            config = yaml.load(file, Loader=yaml.FullLoader)
            user, password, host, dbname, port = config["user"], config["password"], config["host"], config["dbname"], config["port"]
        return f"postgresql+psycopg2://{user}:{password}@{host}:5432/{dbname}?sslmode=require"

final_db_url = get_postgres_url(local_db=True)
engine = create_engine(final_db_url)
# print(final_db_url)

In [4]:
final_db_url

'postgresql+psycopg2://postgres:123@postgres-image-dev:5432/postgres'

## Interact with postgres
### Example 1 - Use pandas to perform operations on postgres
1. **For creating tables**: This is NOT recommend since you are using pandas's exclusive functions and you need to wait for pandas to catch up with the latest update from SQL databases. Use SQL Syntax is for table creation is more common.
2. **For reading data**: You may use pandas to fetch data since it uses SQL syntax!

In [7]:
# Just a data from the web
DATA_URL = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/latest/owid-covid-latest.csv"

# pandas read data
df = pd.read_csv(DATA_URL)

In [8]:
df.head()

Unnamed: 0,iso_code,continent,location,last_updated_date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,female_smokers,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
0,AFG,Asia,Afghanistan,2021-11-07,156397.0,0.0,21.0,7284.0,0.0,0.571,...,,,37.746,0.5,64.83,0.511,,,,
1,OWID_AFR,,Africa,2021-11-07,8532010.0,2554.0,4660.429,219492.0,142.0,179.714,...,,,,,,,,,,
2,ALB,Europe,Albania,2021-11-07,189125.0,1131.0,546.429,2955.0,7.0,4.429,...,7.1,51.2,,2.89,78.57,0.795,,,,
3,DZA,Africa,Algeria,2021-11-07,207156.0,77.0,100.571,5945.0,4.0,3.571,...,0.7,30.4,83.741,1.9,76.88,0.748,,,,
4,AND,Europe,Andorra,2021-11-07,15618.0,0.0,14.571,130.0,0.0,0.0,...,29.0,37.8,,,83.73,0.868,,,,


In [8]:
# pandas.to_sql
# table name
# the engine is from the above, as long as you can create engine then you can put it here, it doesn't metter if it's a local engine or on cloud
# In order to avoid writing DataFrame index as a column

df.to_sql(
    "table1",  # table name
    con=engine,
    if_exists='replace',
    index=False,  # In order to avoid writing DataFrame index as a column
    # dtype={
    #     "last_updated_date": Date(),
    #     "total_cases": Integer(),
    #     "new_cases": Integer()
    # }
)

In [36]:
# When it comes to read data, using pandas is quite a good choice
# note that pd.read_sql may change the data type

query = \
"""
select * 
from table1
limit 10;
"""

# Usually I use triple quote to let me maintain SQL coding style instead of one long query in one line
# See detail in: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
conn = engine.connect()
df_2 = pd.read_sql(query, conn)
df_2.head()

Unnamed: 0,iso_code,continent,location,last_updated_date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,female_smokers,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
0,AFG,Asia,Afghanistan,2021-11-07,156397.0,0.0,21.0,7284.0,0.0,0.571,...,,,37.746,0.5,64.83,0.511,,,,
1,OWID_AFR,,Africa,2021-11-07,8532010.0,2554.0,4660.429,219492.0,142.0,179.714,...,,,,,,,,,,
2,ALB,Europe,Albania,2021-11-07,189125.0,1131.0,546.429,2955.0,7.0,4.429,...,7.1,51.2,,2.89,78.57,0.795,,,,
3,DZA,Africa,Algeria,2021-11-07,207156.0,77.0,100.571,5945.0,4.0,3.571,...,0.7,30.4,83.741,1.9,76.88,0.748,,,,
4,AND,Europe,Andorra,2021-11-07,15618.0,0.0,14.571,130.0,0.0,0.0,...,29.0,37.8,,,83.73,0.868,,,,


### Example 2 - Use SQl query to perform raw operations on postgres

Now we use the same covid table but using SQL Syntax and add some other features

1. Cheack all databses

In [11]:
# check all databases you have in postgres, and output raw data without pandas
query = \
"""
SELECT datname FROM pg_database;
"""
with engine.connect() as connection:
    result = connection.execute(text(query))
result.fetchall()

[('postgres',), ('template1',), ('template0',)]

2. drop table if there is

In [50]:
query = \
"""
DROP TABLE IF EXISTS covid19;
"""
# biuld the connection, notice that we need to use text() to trnasform plain text to SQL compatible string
# Most of the time, use "with" statement can help you to close the connection right away after this operation is complete to avoid unintentional operations. But here just for clean code
# Let's drop the table if exists

with engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.closed)
    print(result.close())
    print(result.closed)

False
None
True


3. create table

In [51]:
query = \
"""
CREATE TABLE IF NOT EXISTS covid19 (
    last_updated_date DATE, 
	location VARCHAR(64),
    total_cases DECIMAL(32)
);
"""

with engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.closed)
    result.close()
    print(result.closed)


False
True


4. insert value to table, here we provide two ways
    * traditional insert: easy but not good for data > 10000 rows, operate through python
    * bulk insert: You need to create class object beforehand and need a promary key , operate through python
    * use sql syntax to copy files like csv direclty to databases:
        * [reading1](https://www.postgresqltutorial.com/import-csv-file-into-posgresql-table/)
        * [reading 2](https://www.2ndquadrant.com/en/blog/7-best-practice-tips-for-postgresql-bulk-data-loading/)

In [52]:
# insert data, but this is slow since you need to type one by one
# so how do we make this better? by making the string below flexible
query = \
"""
INSERT INTO covid19 (last_updated_date, location, total_cases)
VALUES ('2021-11-04', 'Andorra', 204.0),
       ('2021-11-03', 'Australia', 254.0)
"""

with engine.connect() as conn:
    result = conn.execute(text(query))
    result.close()


# modefy and insert row by row
def insert_query(row):
    return f"""
    INSERT INTO covid19(last_updated_date, location, total_cases)
    VALUES ('{row[0]}', '{row[1]}', {row[2]})
    """

with engine.connect() as conn:
    features = ["last_updated_date", "location", "total_cases"]
    # becarefule that you need to handle NA value first
    for idx, row in df[features][:5].iterrows():
        q = insert_query(row)
        conn.execute(text(q))

False
True


bulk insert

In [27]:
df[["last_updated_date", "location", "total_cases"]].head()

Unnamed: 0,last_updated_date,location,total_cases
0,2021-11-07,Afghanistan,156397.0
1,2021-11-07,Africa,8532010.0
2,2021-11-07,Albania,189125.0
3,2021-11-07,Algeria,207156.0
4,2021-11-07,Andorra,15618.0


In [10]:
# 1. Define table using sql alchemy
Base = declarative_base()
class CovidTable(Base):
    __tablename__ = "covid_using_csv"
    id = Column(Integer, primary_key=True)
    last_updated_date = Column(DateTime)
    location = Column(VARCHAR(64))
    total_cases = Column(DECIMAL(32))

# 2. create session and load the data
# the data format is like [{'last_updated_date': '2021-11-07'}, {'last_updated_date': '2021-11-07'}]
Session = sessionmaker(engine)
s = Session()
s.bulk_insert_mappings(
    CovidTable,
    df[["last_updated_date", "location", "total_cases"]].to_dict(orient="records")
)
s.commit()

# 3. check the data
query = \
"""
select * 
from covid_using_csv
limit 10;
"""
conn = engine.connect()
df_3 = pd.read_sql(query, conn)
df_3.head()

Unnamed: 0,id,last_updated_date,location,total_cases
0,901,2021-11-07,Afghanistan,156397.0
1,902,2021-11-07,Africa,8532010.0
2,903,2021-11-07,Albania,189125.0
3,904,2021-11-07,Algeria,207156.0
4,905,2021-11-07,Andorra,15618.0


In [55]:
# select table and output raw data tuple
query = \
"""
select *
from covid19;
"""

with engine.connect() as conn:
    result = conn.execute(text(query))

result.fetchall()

[(datetime.date(2021, 11, 4), 'Andorra', Decimal('204')),
 (datetime.date(2021, 11, 3), 'Australia', Decimal('254')),
 (datetime.date(2021, 11, 7), 'Afghanistan', Decimal('156397')),
 (datetime.date(2021, 11, 7), 'Africa', Decimal('8532010')),
 (datetime.date(2021, 11, 7), 'Albania', Decimal('189125')),
 (datetime.date(2021, 11, 7), 'Algeria', Decimal('207156')),
 (datetime.date(2021, 11, 7), 'Andorra', Decimal('15618')),
 (datetime.date(2021, 11, 7), 'Angola', Decimal('64674')),
 (datetime.date(2021, 11, 7), 'Afghanistan', Decimal('156397')),
 (datetime.date(2021, 11, 7), 'Africa', Decimal('8532010')),
 (datetime.date(2021, 11, 7), 'Albania', Decimal('189125')),
 (datetime.date(2021, 11, 7), 'Algeria', Decimal('207156')),
 (datetime.date(2021, 11, 7), 'Andorra', Decimal('15618'))]

In [56]:
# If not using pandas, SQL Alchemy will return a list of tuple
# View all tables (inlcuding tables in the system)
with engine.connect() as connection:
    result = connection.execute(text("SELECT * FROM information_schema.tables limit 3;"))
result.fetchall()

[('postgres', 'pg_catalog', 'pg_statistic', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None),
 ('postgres', 'pg_catalog', 'pg_type', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None),
 ('postgres', 'public', 'table1', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None)]

## Example 3 - Use db-connector.py

This is a self created script so that you don't need to create connections yourself. I just need the dbname, host, user, password and app_name. Then you just need to worry about the SQL syntax. In real life development, it is very common to use another helper script to do the connection. We want to keep one Python for just one purpose.

In [58]:
from src.db.DBConnector import SQLConnect

connector = SQLConnect(
    user="postgres",
    password="123",
    host="postgres-image-dev",  # or use the container name postgres-env
    dbname="postgres",
    local_db=True
)

In [59]:
connector.executeQuery("select * from covid19 limit 3;")

Unnamed: 0,last_updated_date,location,total_cases
0,2021-11-04,Andorra,204.0
1,2021-11-03,Australia,254.0
2,2021-11-07,Afghanistan,156397.0


In [60]:
query = \
"""
select * from covid19 limit 3;
"""
res = connector.executeQuery(query, to_pandas=False)  # to_pandas defaults to True, this will return pandas dataframe
res

[(datetime.date(2021, 11, 4), 'Andorra', Decimal('204')),
 (datetime.date(2021, 11, 3), 'Australia', Decimal('254')),
 (datetime.date(2021, 11, 7), 'Afghanistan', Decimal('156397'))]

In [61]:
connector.viewAllTables(view_type="public")

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
2,postgres,public,table1,BASE TABLE,,,,,,YES,NO,
10,postgres,public,covid19,BASE TABLE,,,,,,YES,NO,
97,postgres,public,covid_using_csv,BASE TABLE,,,,,,YES,NO,
185,postgres,public,covid_using_csv_2,BASE TABLE,,,,,,YES,NO,


create and insert value

In [63]:
# 2. create session and load the data by buk
# the data format is like [{'last_updated_date': '2021-11-07'}, {'last_updated_date': '2021-11-07'}]
query = \
"""
CREATE TABLE IF NOT EXISTS covid_using_csv_2 (
    id serial primary key,
    last_updated_date DATE, 
	location VARCHAR(64),
    total_cases DECIMAL(32)
);
"""
connector.executeQuery(query)

Base = declarative_base()
class CovidTable2(Base):
    __tablename__ = "covid_using_csv_2"
    id = Column(Integer, primary_key=True)
    last_updated_date = Column(DateTime)
    location = Column(VARCHAR(64))
    total_cases = Column(DECIMAL(32))

connector.insertValue(
    dataframe=df[["last_updated_date", "location", "total_cases"]], 
    insert_format_func=None, 
    bulk_table_format=CovidTable2, 
    bulk=True
)

table create
data inserted


In [68]:
# modefy and insert row by row
def insert_query(row):
    return f"""
    INSERT INTO covid19(last_updated_date, location, total_cases)
    VALUES ('{row[0]}', '{row[1]}', {row[2]})
    """

connector.insertValue(
    dataframe=df[["last_updated_date", "location", "total_cases"]][:3].dropna(), 
    insert_format_func=insert_query, 
    bulk_table_format=None, 
    bulk=False
)

data inserted


In [70]:
query = \
"""
select * from covid_using_csv_2 limit 30;
"""
res = connector.executeQuery(query, to_pandas=True)  # to_pandas defaults to True, this will return pandas dataframe
res

Unnamed: 0,id,last_updated_date,location,total_cases
0,1,2021-11-07,Afghanistan,156397.0
1,2,2021-11-07,Africa,8532010.0
2,3,2021-11-07,Albania,189125.0
3,4,2021-11-07,Algeria,207156.0
4,5,2021-11-07,Andorra,15618.0
5,6,2021-11-07,Angola,64674.0
6,7,2021-11-05,Anguilla,
7,8,2021-11-07,Antigua and Barbuda,4091.0
8,9,2021-11-07,Argentina,5296781.0
9,10,2021-11-07,Armenia,320433.0


## MySQL in local Docker container

You can skip this for now, this is just for you NTU class

In [18]:
# specify database configurations
config = {
    'host': '0.0.0.0',
    'port': 3306,
    'user': 'root',
    'password': '123',
    'database': 'test_db'
}
db_user = config.get('user')
db_pwd = config.get('password')
db_host = config.get('host')
db_port = config.get('port')
db_name = config.get('database')

# specify connection string
DB_URL = f'mysql+pymysql://{db_user}:{db_pwd}@{db_host}:{db_port}/{db_name}'
print(DB_URL)

mysql+pymysql://root:123@0.0.0.0:3306/test_db


In [11]:
import os
print(os.getcwd())
os.chdir("/home/jovyan/src")
print(os.getcwd())

/home/jovyan/src
/home/jovyan/src


In [19]:
from src.db.DBConnector import SQLConnect
connect = SQLConnect(DB_URL)
connect.executeQuery("show databases;")