## Integration of Postgres with Pandas
Pandas is a powerful Python Data Library which is used to process as well as analyze the data. It have robust APIs to work with databases.

Here are the steps involved to use Pandas to work with databases like Postgres.
* We need to make sure `pandas`, `psycopg2-binary` as well as `sqlalchemy` installed using `pip`.
* Pandas uses `sqlalchemy` to interact with database tables based on the connection url.
* `sqlalchemy` is the most popular ORM to hide the complexity of connecting to the databases using our applications.
* Here are the examples using Pandas. 
  * Read from file, Process and Write to Postgres Database Table using Pandas. 
  * Read from Postgres Database Table using Pandas for the validation.

In [1]:
!pip install psycopg2-binary



You should consider upgrading via the 'D:\Project\Data Engineer\data-engineering-on-gcp\deg-venv\Scripts\python.exe -m pip install --upgrade pip' command.


In [2]:
!pip install pandas



You should consider upgrading via the 'D:\Project\Data Engineer\data-engineering-on-gcp\deg-venv\Scripts\python.exe -m pip install --upgrade pip' command.


In [3]:
!pip install sqlalchemy

Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.28-cp39-cp39-win_amd64.whl (2.1 MB)
     ---------------------------------------- 2.1/2.1 MB 8.3 MB/s eta 0:00:00
Collecting greenlet!=0.4.17
  Downloading greenlet-3.0.3-cp39-cp39-win_amd64.whl (290 kB)
     -------------------------------------- 290.8/290.8 KB 9.1 MB/s eta 0:00:00
Installing collected packages: greenlet, sqlalchemy
Successfully installed greenlet-3.0.3 sqlalchemy-2.0.28


You should consider upgrading via the 'D:\Project\Data Engineer\data-engineering-on-gcp\deg-venv\Scripts\python.exe -m pip install --upgrade pip' command.


In [4]:
import pandas as pd

In [5]:
columns = ['order_id', 'order_date', 'order_customer_id', 'order_status']

In [6]:
import json

In [7]:
schemas = json.load(open('../../data/retail_db/schemas.json'))

In [8]:
sorted(schemas['orders'], key=lambda col: col['column_position'])

[{'column_name': 'order_id', 'data_type': 'integer', 'column_position': 1},
 {'column_name': 'order_date', 'data_type': 'string', 'column_position': 2},
 {'column_name': 'order_customer_id',
  'data_type': 'timestamp',
  'column_position': 3},
 {'column_name': 'order_status', 'data_type': 'string', 'column_position': 4}]

In [9]:
columns = [col['column_name'] for col in sorted(schemas['orders'], key=lambda col: col['column_position'])]

In [10]:
columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [11]:
pd.read_csv?

[1;31mSignature:[0m
[0mpd[0m[1;33m.[0m[0mread_csv[0m[1;33m([0m[1;33m
[0m    [0mfilepath_or_buffer[0m[1;33m:[0m [1;34m'FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str]'[0m[1;33m,[0m[1;33m
[0m    [1;33m*[0m[1;33m,[0m[1;33m
[0m    [0msep[0m[1;33m:[0m [1;34m'str | None | lib.NoDefault'[0m [1;33m=[0m [1;33m<[0m[0mno_default[0m[1;33m>[0m[1;33m,[0m[1;33m
[0m    [0mdelimiter[0m[1;33m:[0m [1;34m'str | None | lib.NoDefault'[0m [1;33m=[0m [1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0mheader[0m[1;33m:[0m [1;34m"int | Sequence[int] | None | Literal['infer']"[0m [1;33m=[0m [1;34m'infer'[0m[1;33m,[0m[1;33m
[0m    [0mnames[0m[1;33m:[0m [1;34m'Sequence[Hashable] | None | lib.NoDefault'[0m [1;33m=[0m [1;33m<[0m[0mno_default[0m[1;33m>[0m[1;33m,[0m[1;33m
[0m    [0mindex_col[0m[1;33m:[0m [1;34m'IndexLabel | Literal[False] | None'[0m [1;33m=[0m [1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0musecols[0m[1;33m:

In [None]:
orders = pd.read_csv('../../data/retail_db/orders/part-00000', names=columns)

In [None]:
orders

In [None]:
type(orders)

In [None]:
orders.shape

In [None]:
daily_status_count = orders. \
    groupby(['order_date', 'order_status'])['order_id']. \
    agg(order_count='count'). \
    reset_index()

In [None]:
daily_status_count

In [None]:
daily_status_count.to_sql?

In [None]:
host = '104.197.141.244'
port = 5432
database = 'itversity_retail_db'
user = 'itversity_retail_user'
password = 'itversity'

In [None]:
conn_uri = f'postgresql://{user}:{password}@{host}:{port}/{database}'

In [None]:
daily_status_count.to_sql(
    'daily_status_count',
    conn_uri,
    if_exists='replace',
    index=False
)

In [None]:
pd.read_sql?

In [None]:
dsc = pd.read_sql(
    'daily_status_count',
    conn_uri
)

In [None]:
dsc.shape

In [None]:
dsc

In [None]:
dsc = pd.read_sql(
    '''
        SELECT order_status, sum(order_count) AS order_count FROM daily_status_count
        GROUP BY 1
        ORDER BY 2 DESC
    ''',
    conn_uri
)

In [None]:
dsc