# Build Image and ETL Processes

This notbook shows sample ETL processes to use persistent-postgres-cstore. [cstore_fdw](https://github.com/citusdata/cstore_fdw) does not support `INSERT INTO ...` though it supports `COPY` and `INSERT INTO ... SELECT`, [see](https://stackoverflow.com/questions/44064004/change-a-normal-table-to-a-foreign-cstore-fdw-table). ETL processs are below,

1. [Buid a persistent-postgres image and run it as staging](#1.-Buid-a-persistent-postgres-image-and-run-it-as-staging)
2. [Intert data into staging container from csv files](#2.-Intert-data-into-staging-container-from-csv-files)
3. [Commit staging container to image and stop staging container](#3.-Commit-staging-container-to-image-and-stop-staging-container)
4. [Buid a persistent-postgres-cstore image and run it](#4.-Buid-a-persistent-postgres-cstore-image-and-run-it)
5. [Insert data into cstore from staging](#5.-Insert-data-into-cstore-from-staging)
6. [Commit cstore container to image](#6.-Commit-cstore-container-to-image)
7. [Operation test of selecting data from columnar table](#7.-Operation-test-of-selecting-data-from-the-columnar-store)

In [1]:
import os
import subprocess
import traceback
import psycopg2
import pandas as pd

In [2]:
subprocess.call("./download_customer_reviews.sh")

0

## 1. Buid a persistent-postgres image and run it as staging

To load csv data into persistent-postgres-cstore, it builds an image of persistnet-postgres (no coloumner store). Because cstore_fdw does not suppoert `INSERT INTO ...`, [see](https://stackoverflow.com/questions/44064004/change-a-normal-table-to-a-foreign-cstore-fdw-table). After building persitent-postgres image, it runs the container as staging.

In [3]:
subprocess.call("docker build -t persistent-postgres:0.1 postgres/lib/postgres/11".split())

0

In [4]:
subprocess.call("docker run --name persistent-postgres -p 5432:5432 -e POSTGRES_USER=dwhuser -d persistent-postgres:0.1".split())

0

## 2. Load data into staging container from csv files

To load csv data into staging container, 1) it connects the postgres on the staging container, 2) defines some helper functions and queries , and 3) executes the queries for the staging container.

In [5]:
conn = psycopg2.connect("host=127.0.0.1 dbname=dwhuser user=dwhuser password=dwhuser")
cur = conn.cursor()

In [6]:
def execute_query(query):
    """Execute query for a postgres connection.
    
    Args:
        query str: An executed query.
    
    Return:
        boolean: True or False of the result of an execution.
    """
    try:
        cur.execute(query)
        conn.commit()
        return True
    except:
        traceback.print_exc()
        return False

In [7]:
def staging_table_insert(insert_query, df):
    """Insert data into staging table. An inserted table must have same columns of pandas.DataFrame.
    If the table dose not have the same columns, an error will be occured.
    
    Args:
        insert_query str: An inserted query.
        df pandas.DataFrame: An inserted data.
    
    Return:
        boolean: True or False of the result of an execution.
    """
    try:
        for i, row in df.iterrows():
            cur.execute(insert_query, list(row))
        conn.commit()
        return True
    except:
        traceback.print_exc()
        return False

In [8]:
# Drop staging table.
staging_table_drop_query = """
DROP TABLE IF EXISTS staging_customer_reviews;
"""

# Create staging table.
staging_table_create_query = """
CREATE TABLE staging_customer_reviews (
    customer_id TEXT
    ,review_date DATE
    ,review_rating INTEGER
    ,review_votes INTEGER
    ,review_helpful_votes INTEGER
    ,product_id CHAR(10)
    ,product_title TEXT
    ,product_sales_rank BIGINT
    ,product_group TEXT
    ,product_category TEXT
    ,product_subcategory TEXT
    ,similar_product_ids TEXT
);
"""

# Insert data into staging table.
staging_table_insert_query = """
INSERT INTO staging_customer_reviews (
    customer_id
    ,review_date
    ,review_rating
    ,review_votes
    ,review_helpful_votes
    ,product_id
    ,product_title
    ,product_sales_rank
    ,product_group
    ,product_category
    ,product_subcategory
    ,similar_product_ids
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""

In [9]:
print("Drop staging table:", execute_query(staging_table_drop_query))
print("Create staging table:", execute_query(staging_table_create_query))

Drop staging table: True
Create staging table: True


In [10]:
print("Insert into staging table:", staging_table_insert(staging_table_insert_query, pd.read_csv('customer_reviews_1998.csv.gz', header=None)))
print("Insert into staging table:", staging_table_insert(staging_table_insert_query, pd.read_csv('customer_reviews_1999.csv.gz', header=None)))

Insert into staging table: True
Insert into staging table: True


## 3. Commit staging container to image and stop staging container

To persit data into the staging container, it commits the container to the persistent-postgres image. Then it stops the staging container.

In [11]:
subprocess.call("docker commit persistent-postgres persistent-postgres:0.1".split())

0

In [12]:
subprocess.call("docker stop persistent-postgres".split())

0

## 4. Buid a persistent-postgres-cstore image and run it

It builds an image of persistent-postgres-cstore (columner store) and runs the container.

In [13]:
subprocess.call("docker build -t persistent-postgres-cstore:0.1 postgres".split())

0

In [14]:
subprocess.call("docker run --name persistent-postgres-cstore -p 5432:5432 -e POSTGRES_USER=dwhuser -d persistent-postgres-cstore:0.1".split())

0

## 5. Insert data into cstore from staging

Insert data into cstore from staging table by using `INSERT INTO ... SELECT`, [see](https://stackoverflow.com/questions/44064004/change-a-normal-table-to-a-foreign-cstore-fdw-table). The process are 1) it connects the cstore of postgres on the container, and 2) executes the queries for the staging container by using the helper function.

In [15]:
conn = psycopg2.connect("host=127.0.0.1 dbname=dwhuser user=dwhuser password=dwhuser")
cur = conn.cursor()

In [16]:
# Drop table.
table_drop_query = """
DROP FOREIGN TABLE IF EXISTS customer_reviews;
"""

# Create extension.
extension_create_query = """
-- load extension first time after install
CREATE EXTENSION cstore_fdw;
"""

# Create server.
foreign_server_create_query = """
-- create server object
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
"""

# Create table.
table_create_query = """
-- create foreign table
CREATE FOREIGN TABLE customer_reviews
(
    customer_id TEXT,
    review_date DATE,
    review_rating INTEGER,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids TEXT
)
SERVER cstore_server
OPTIONS(compression 'pglz')
;
"""

# Insert data into table.
table_insert_query = """
INSERT INTO customer_reviews 
SELECT
    *
FROM
    staging_customer_reviews
;
"""

In [17]:
print("Drop table:", execute_query(table_drop_query))
print("Create extension:", execute_query(extension_create_query))
print("Create foreign server:", execute_query(foreign_server_create_query))
print("Create table:", execute_query(table_create_query))

Drop table: True
Create extension: True
Create foreign server: True
Create table: True


In [18]:
print("Insert into table:", execute_query(table_insert_query))

Insert into table: True


## 6. Commit cstore container to image

To persit data into the container, it commits the container to the persistent-postgres-cstore image. Then it stops the container.

In [19]:
subprocess.call("docker commit persistent-postgres-cstore persistent-postgres-cstore:0.1".split())

0

## 7. Operation test of selecting data from the columnar store

Operation test of selecting from the columnar store, 1) it finds all reviews a particular customer made on the Dune series in 1998, and 2) gets a correlation between a book's titles's length and its review ratings.

In [20]:
find_query="""
-- Find all reviews a particular customer made on the Dune series in 1998.
SELECT
    customer_id
    ,review_date
    ,review_rating
    ,product_id
FROM
    customer_reviews
WHERE
    customer_id ='A27T7HVDXA3K2A'
    AND product_title LIKE '%Dune%'
    AND review_date >= '1998-01-01'
    AND review_date <= '1998-12-31'
;
"""

In [21]:
pd.read_sql(sql=find_query, con=conn, index_col='customer_id')

Unnamed: 0_level_0,review_date,review_rating,product_id
customer_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
A27T7HVDXA3K2A,1998-04-10,5,0399128964
A27T7HVDXA3K2A,1998-04-10,5,044100590X
A27T7HVDXA3K2A,1998-04-10,5,0441172717
A27T7HVDXA3K2A,1998-04-10,5,0881036366
A27T7HVDXA3K2A,1998-04-10,5,1559949570


In [22]:
correlation_query="""
-- Do we have a correlation between a book's title's length and its review ratings?
SELECT
    width_bucket(length(product_title), 1, 50, 5) title_length_bucket
    ,round(avg(review_rating), 2) AS review_average
    ,count(*)
FROM
   customer_reviews
WHERE
    product_group = 'Book'
GROUP BY
    title_length_bucket
ORDER BY
    title_length_bucket
;
"""

In [23]:
pd.read_sql(sql=correlation_query, con=conn, index_col='title_length_bucket')

Unnamed: 0_level_0,review_average,count
title_length_bucket,Unnamed: 1_level_1,Unnamed: 2_level_1
1,4.26,139034
2,4.24,411318
3,4.34,245671
4,4.32,167361
5,4.3,118422
6,4.4,116412
