In [None]:
import os
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

# SQLAlchemy - from PostgreSQL to .parquet

In [None]:
host = '111.111.111.111'
dbname = 'db_new'
port = 15555

In [None]:
# user, password, host, port, dbname
conn_string='postgresql://{}:{}@{}:{}/{}'.format(input(),input(),host,port,dbname)
engine=create_engine(conn_string)
engine

In [None]:
%%time
mon_list=['202201','202202']
for d in mon_list:
    print(d)
    query_sales = """
        select *
        from base_sales_{}
        where product_group='food';
        """.format(d)
    df = pd.read_sql_query(query_sales, engine)
    df.to_parquet('base_sales_{}.parquet'.format(d))

# Psycopg2 - from PostgreSQL, samples

## Connect to PostgreSQL

In [None]:
HOST="fffffffff"
PORT="6432"
DBNAME='olist'
USER="user1"
PASSWORD='olist'

In [None]:
conn = psycopg2.connect("""
    host={}
    port={}
    dbname={}
    user={}
    password={}
    target_session_attrs=read-write
    sslmode=verify-full
""".format(HOST,PORT,DBNAME,USER,PASSWORD)) 

In [None]:
# reconnection
def refcon():
    global conn
    global cursor
    cursor.close()
    conn.close()
    conn = psycopg2.connect("""
        host={}
        port={}
        dbname={}
        user={}
        password={}
        target_session_attrs=read-write
        sslmode=verify-full
    """) 
    cursor = conn.cursor()

## Query about database

In [None]:
print("Info about server PostgreSQL")
print(conn.get_dsn_parameters(), "\n")

In [None]:
cursor = conn.cursor()

In [None]:
cursor.execute("SELECT version();")
record = cursor.fetchone()
print("You connect to - ", record, "\n")

In [None]:
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema','pg_catalog');")
record = cursor.fetchall()
print("List of tables - ", record, "\n")

In [None]:
query_txt="SELECT table_name, ordinal_position, column_name FROM information_schema.columns WHERE table_schema NOT IN ('information_schema','pg_catalog');"
cursor.execute(query_txt)
record = cursor.fetchall()
print("List of tables and fields - ", record, "\n")

In [None]:
# create dataframe with list of tables and fields
df_shema=pd.DataFrame(list(record), columns=['table_name','column_position','column_name']).sort_values(['table_name','column_position'])

## SQL queries - samples

In [None]:
# sql-query to dataframe
def sql_full(sql_text):
    try:
        cursor.execute(sql_text)
        tmp = cursor.fetchall()
        col_names = []
        for item in cursor.description:
            col_names.append(item[0])
        df = pd.DataFrame(tmp, columns=col_names)
        return [col_names]+tmp, df
    except Exception:
        print('Error SQL query')  
        refcon()
        return 'NO', pd.DataFrame(columns = ['in'])

In [None]:
query_txt="""
SELECT
  COUNT(DISTINCT order_item_id) num_order_item,
  COUNT(DISTINCT order_id) num_order,
  COUNT(DISTINCT product_id) num_product,
  COUNT(DISTINCT seller_id) num_seller,
  COUNT(*) totalRows
FROM
  order_items;
"""
q_txt, q_pandas = sql_full(query_txt)
q_pandas.head()

In [None]:
query_txt_closed_deals="""
SELECT '' || STRING_AGG('o.' || column_name, ', ') || ''
FROM information_schema.columns
WHERE table_name = 'closed_deals'
AND table_schema NOT IN ('information_schema','pg_catalog')
AND column_name NOT IN ('seller_id');
"""
q_txt_closed_deals, q_pandas = sql_full(query_txt_closed_deals)

query_txt_order_items="""
SELECT '' || STRING_AGG('ord.' || column_name, ', ') || ''
FROM information_schema.columns
WHERE table_name = 'order_items'
AND table_schema NOT IN ('information_schema','pg_catalog')
AND column_name NOT IN ('seller_id');
"""
q_txt_order_items, q_pandas = sql_full(query_txt_order_items)

query_txt_products="""
SELECT '' || STRING_AGG('p.' || column_name, ', ') || ''
FROM information_schema.columns
WHERE table_name = 'products'
AND table_schema NOT IN ('information_schema','pg_catalog')
AND column_name NOT IN ('product_id');
"""
q_txt_products, q_pandas = sql_full(query_txt_products)

q_txt_ordcustgeo="c.customer_unique_id, c.customer_zip_code_prefix, c.customer_city, c.customer_state, c.customers_geo_lat, c.customers_geo_lng"

q_txt_rev="""rev.num_reviews_per_order, rev.min_review_create_date, rev.max_review_create_date, rev.min_review_answer_date, 
rev.max_review_answer_date, rev.review_id, rev.review_score"""

query_txt=f"""
WITH geoavg AS (SELECT 
  geolocation_zip_code_prefix,
  AVG(geolocation_lat) AS lat,
  AVG(geolocation_lng) AS lng
FROM geolocation
GROUP BY
    geolocation_zip_code_prefix),  

selzip AS (
SELECT 
    s.*, 
    geoavg.lat AS sellers_geo_lat,
    geoavg.lng AS sellers_geo_lng
FROM sellers s LEFT JOIN geoavg ON s.seller_zip_code_prefix=geoavg.geolocation_zip_code_prefix),

selzipclose AS (
SELECT 
    s.*, 
    {q_txt_closed_deals[1][0]}
FROM selzip s LEFT JOIN closed_deals o ON s.seller_id=o.seller_id),


oit AS (SELECT 
    {q_txt_order_items[1][0]}, 
    s.* 
FROM order_items ord LEFT JOIN selzipclose s ON ord.seller_id=s.seller_id),

order_items1 AS (SELECT
    oit.*, 
    {q_txt_products[1][0]}
FROM oit LEFT JOIN products p ON oit.product_id=p.product_id),


custgeo AS (
SELECT 
    c.*, 
    geoavg.lat AS customers_geo_lat,
    geoavg.lng AS customers_geo_lng
FROM customers c LEFT JOIN geoavg ON c.customer_zip_code_prefix=geoavg.geolocation_zip_code_prefix),

ordcustgeo AS (
SELECT
    ord.*, 
    {q_txt_ordcustgeo}
FROM orders ord LEFT JOIN custgeo c ON ord.customer_id=c.customer_id),

itemavg AS (SELECT 
  order_id,
  MAX(order_item_id) AS order_item_count,
  SUM(price) AS sum_price,
  SUM(freight_value) AS sum_freight_value,
  SUM(price)+SUM(freight_value) AS sum_price_freight

FROM order_items1
GROUP BY
    order_id),  

orditem1 AS (
SELECT 
    o.*, 
    i.order_item_count, i.sum_price, i.sum_freight_value, i.sum_price_freight
FROM ordcustgeo o LEFT JOIN itemavg i ON o.order_id=i.order_id),

ordprod AS (
SELECT 
  COUNT(*) as num_prod,
  order_id,
  product_id
FROM order_items1

GROUP BY
  order_id, 
  product_id),
  
unprod AS(
SELECT 
  COUNT(*) as num_uniq_prod,
  order_id
FROM ordprod

GROUP BY
  order_id),

orditem2 AS (
SELECT 
    o.*, 
    unprod.num_uniq_prod
FROM orditem1 o LEFT JOIN unprod ON o.order_id=unprod.order_id),

tmp AS (
SELECT 
  MAX(review_answer_timestamp) as max_review_answer_date,
  order_id
FROM order_reviews
GROUP BY
  order_id),
 
score AS( 
SELECT 
  s.review_score AS review_score,
  s.review_id AS review_id,
  s.review_answer_timestamp AS review_answer_timestamp,
  s.order_id AS order_id
FROM order_reviews s RIGHT JOIN tmp ON s.review_answer_timestamp=tmp.max_review_answer_date AND s.order_id=tmp.order_id),

avgr AS(
SELECT 
  order_id,
  COUNT(*) as num_reviews_per_order,
  MIN(review_creation_date) as min_review_create_date,
  MAX(review_creation_date) as max_review_create_date,
  MIN(review_answer_timestamp) as min_review_answer_date,
  MAX(review_answer_timestamp) as max_review_answer_date

  
FROM order_reviews

GROUP BY
  order_id),
  
rev AS (
SELECT 
a.*,
score.review_id AS review_id,
score.review_score
FROM avgr a LEFT JOIN score ON a.order_id=score.order_id)

SELECT 
    s.*,
    {q_txt_rev}
FROM orditem2 s LEFT JOIN rev ON s.order_id=rev.order_id;

"""
#print(query_txt)
q_txt, df_orders = sql_full(query_txt)
print(df_orders.shape)
df_orders.head(1)

In [None]:
# Close communication with the database
cursor.close()
conn.close()