# L2 Exercise 2 : Creating Denormalized Tables

<img src="https://wiki.postgresql.org/images/a/a4/PostgreSQL_logo.3colors.svg" width="250" height="250">

<center><h1><span style='color:blue'>Environment preparation</span></h1></center>

Udacity environment has been prepared to ease student task, i.e. has a Postgres instance available for training exercises.

Let's create one based on Kubernetes.

* Add Pyscopg2 module to Python
* Load in K8s Postgresql

In [1]:
# Load package
# !pip install psycopg2-binary

<h3><span style='color:blue'>Using K8S PostgreSQL</span></h3>

Obviously you need a k8s avaible like: Minikube, Minishift, Docker (with K8s)

Helm is need to, go to [helm.sh](http://helm.sh)

In [2]:
from time import sleep
import os

In [3]:
helm_version = !helm version --short
assert helm_version[0][:2] == 'v3', "Expected HELM version not available, visit https://helm.sh"

#!curl -fsSL -o /tmp/get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
#!chmod 700 /tmp/get_helm.sh
#!ls -al /tmp/
#!./get_helm.sh

In [4]:
!helm repo add bitnami https://charts.bitnami.com/bitnami

"bitnami" has been added to your repositories


In [5]:
CHART_INSTANCE_NAME = 'dend-l2e2'
os.environ['postgresql_port_instance_name'] = CHART_INSTANCE_NAME + "-postgresql"
os.getenv('postgresql_port_instance_name')

'dend-l2e2-postgresql'

In [6]:
helm_chart_out = !helm install {CHART_INSTANCE_NAME} stable/postgresql

In [7]:
postgresql_port_forward_command = helm_chart_out[-2].strip()
os.environ['postgresql_port_forward_command'] = postgresql_port_forward_command
os.getenv('postgresql_port_forward_command')

'kubectl port-forward --namespace default svc/dend-l2e2-postgresql 5432:5432 &'

In [8]:
# Waits until postgresl is running on 
max_checks_postgresql_run = 20

!kubectl get pods

while max_checks_postgresql_run > 0:

    postgres_is_running = !kubectl get pods|fgrep {CHART_INSTANCE_NAME}|fgrep "1/1"|fgrep "Running"
    
    if len(postgres_is_running) > 0 and not postgres_is_running[0] == 'No resources found.':
        break
    else:
        sleep(5)

        max_checks_postgresql_run -= 1

!kubectl get pods
assert max_checks_postgresql_run > 0, "Probably Postgresql is not running"

NAME                     READY   STATUS    RESTARTS   AGE
dend-l2e2-postgresql-0   0/1     Pending   0          1s
NAME                     READY   STATUS    RESTARTS   AGE
dend-l2e2-postgresql-0   1/1     Running   0          31s


<h3><span style='color:blue'>Open Proxy to PostgreSQL on K8s</span></h3>
Run next command in a separate terminal (if not run on Jupyter ;-))

In [9]:
%%script env --bg bash --out console_out
nohup kubectl port-forward --namespace default svc/dend-l2e2-postgresql 5432:5432 &

#%%script env postgres_port_forward_command="$postgres_port_forward_command" --bg bash --out console_out
#nohup kubectl port-forward --namespace default svc/dend-l1e1-postgresql 5432:5432 &

In [10]:
# Getting postgresql password from console out
postgresql_password = helm_chart_out[15].split('(')[1][:-1]
postgresql_password = !{postgresql_password}
postgresql_password = postgresql_password[0]

In [11]:
# Getting console command to connect with current instance of postgres
k8s_psql_command = helm_chart_out[19].strip().replace("$POSTGRES_PASSWORD", postgresql_password) + " -c "

In [12]:
!ps -ef|fgrep 'kubectl port-forward'

  501 68964     1   0  5:58PM ??         0:00.13 kubectl port-forward --namespace default svc/dend-l2e2-postgresql 5432:5432
  501 68968 68911   0  5:58PM ttys001    0:00.01 /bin/sh -c ps -ef|fgrep 'kubectl port-forward'
  501 68970 68968   0  5:58PM ttys001    0:00.00 fgrep kubectl port-forward


In [13]:
# Checks if proxy is enabled
pids_kubectl_proxy = !ps -ef|fgrep 'kubectl port-forward'|fgrep $CHART_INSTANCE_NAME|cut -d ' ' -f4
assert len(pids_kubectl_proxy) > 1, f"No kubectl proxy found, try in a console: '{postgresql_port_forward_command}'"

<h3><span style='color:blue'>Check Postgresql availibity</span></h3>
We had created an Postgresql on a K8s infraestructure, next we will test if it is avaiable

In [14]:
# Checks postgresql connection
select_1_postgresql_out = !{k8s_psql_command} 'SELECT 1;'
assert len(select_1_postgresql_out) > 0, 'Postgresql -select 1- failed, check it'
!{k8s_psql_command} 'SELECT version();'

                                                 version                        
                         
--------------------------------------------------------------------------------
-------------------------
 PostgreSQL 11.6 on x86_64-pc-linux-gnu, compiled by gcc (Debian 6.3.0-18+deb9u1
) 6.3.0 20170516, 64-bit
(1 row)

pod "dend-l2e2-postgresql-client" deleted


<h3><span style='color:blue'>Initialize Postgresql Student DB for excercise</span></h3>

In [15]:
!{k8s_psql_command} "CREATE ROLE student WITH LOGIN ENCRYPTED PASSWORD 'student'"

CREATE ROLE
pod "dend-l2e2-postgresql-client" deleted


In [16]:
!{k8s_psql_command} 'alter user student createdb;'

ALTER ROLE
pod "dend-l2e2-postgresql-client" deleted


In [17]:
!{k8s_psql_command} 'create database studentdb;'

If you don't see a command prompt, try pressing enter.
CREATE DATABASE
pod "dend-l2e2-postgresql-client" deleted


In [18]:
!{k8s_psql_command} 'grant all privileges on database studentdb to student;'

GRANT
pod "dend-l2e2-postgresql-client" deleted


In [19]:
!{k8s_psql_command} 'SELECT usename, usecreatedb FROM pg_user;'

 usename  | usecreatedb 
----------+-------------
 postgres | t
 student  | t
(2 rows)

pod "dend-l2e2-postgresql-client" deleted


# L2 Exercise 2 : Creating Denormalized Tables

### Walk through the basics of modeling data from normalized from to denormalized form. We will create tables in PostgreSQL, insert rows of data, and do simple JOIN SQL queries to show how these mutliple tables can work together. 

#### Remember the examples shown are simple, but imagine these situations at scale with large datasets, many users, and the need for quick response time. 

### Import the library 
Note: An error might popup after this command has exectuted. If it does read it careful before ignoring. 

In [20]:
import psycopg2

### Create a connection to the database, get a cursor, and set autocommit to true

In [21]:
try: 
    conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Postgres database")
    print(e)
try: 
    cur = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get cursor to the Database")
    print(e)
conn.set_session(autocommit=True)

#### Let's start with our normalized (3NF) database set of tables we had in the last exercise but we have added a new table `sales`. 

`Table Name: transactions2 
column 0: transaction Id
column 1: Customer Name
column 2: Cashier Id
column 3: Year `

`Table Name: albums_sold
column 0: Album Id
column 1: Transaction Id
column 3: Album Name` 

`Table Name: employees
column 0: Employee Id
column 1: Employee Name `

`Table Name: sales
column 0: Transaction Id
column 1: Amount Spent
`

### Table contents

Table: transactions2

* (1, 'Amanda', 1, 2000)
* (2, 'Toby', 1, 2000)
* (3, 'Max', 2, 2018)

Table: albums_sold

* (1, 1, 'Rubber Soul')
* (2, 1, 'Let It Be')
* (3, 2, 'My Generation')
* (4, 3, 'Meet the Beatles')
* (5, 3, 'Help!')

Table: employees

* (1, 'Sam')
* (2, 'Bob')

Table: Sales

* (1, 40)
* (2, 19)
* (3, 45)

### We add CREATE statements for all tables and INSERT data into the tables

In [22]:
## Creating table "transactions2" on 3F 
try:
    cur.execute("""
    CREATE TABLE IF NOT EXISTS transactions2
    (
        transaction_id int,
        customer_name text,
        cashier_id int,
        year int
    )
    """)
    print ("Table created")
except psycopg2.Error as e:
    print("Error: Could not create table")
    print(e)

Table created


In [23]:
## Creating table "albums_sold2" on 3F 
try:
    cur.execute("""
    CREATE TABLE IF NOT EXISTS albums_sold
    (
        album_id int, 
        transaction_id int,
        album_name text
    )
    """)
    print ("Table created")    
except psycopg2.Error as e:
    print("Error: Could not create table")
    print(e)

Table created


In [24]:
## Creating table "employees" on 3F 
try:
    cur.execute("""
    CREATE TABLE IF NOT EXISTS employees
    (
        employee_id int, 
        employee_name text
    )
    """)
    print ("Table created")    
except psycopg2.Error as e:
    print("Error: Could not create table")
    print(e)

Table created


In [25]:
## Creating table "albums_sold2" on 3F 
try:
    cur.execute("""
    CREATE TABLE IF NOT EXISTS sales
    (
        transaction_id int,
        amount_spent real
    )
    """)
    print ("Table created")    
except psycopg2.Error as e:
    print("Error: Could not create table")
    print(e)

Table created


**Adding data to tables**

In [26]:
# Insert row in transactions
def insert_into_transactions2(params):
    try: 
        cur.execute(
        """
        INSERT INTO transactions2 (transaction_id, customer_name, cashier_id, year) \
        VALUES (%s, %s, %s, %s)
        """ , params
        )
        print(f"Row inserted {params}")
    except psycopg2.Error as e:
        print("Error: Could not create a row")
        print(e)
        
insert_into_transactions2((1, 'Amanda', 1, 2000))
insert_into_transactions2((2, 'Toby', 1, 2000))
insert_into_transactions2((3, 'Max', 2, 2018))

Row inserted (1, 'Amanda', 1, 2000)
Row inserted (2, 'Toby', 1, 2000)
Row inserted (3, 'Max', 2, 2018)


In [27]:
# Insert row in albums_sold
def insert_into_albums_sold(params):
    try: 
        cur.execute(
        """
        INSERT INTO albums_sold (album_id, transaction_id, album_name) \
        VALUES (%s, %s, %s)
        """ , params
        )
        print(f"Row inserted {params}")
    except psycopg2.Error as e:
        print("Error: Could not create a row")
        print(e)
        
insert_into_albums_sold((1, 1, 'Rubber Soul'))
insert_into_albums_sold((2, 1, 'Let it Be'))
insert_into_albums_sold((3, 2, 'My Generation'))
insert_into_albums_sold((4, 3, 'Meet the Beatles'))
insert_into_albums_sold((5, 3, 'Help!'))

Row inserted (1, 1, 'Rubber Soul')
Row inserted (2, 1, 'Let it Be')
Row inserted (3, 2, 'My Generation')
Row inserted (4, 3, 'Meet the Beatles')
Row inserted (5, 3, 'Help!')


In [28]:
# Insert row in transactions
def insert_into_employees(params):
    try: 
        cur.execute(
        """
        INSERT INTO employees (employee_id, employee_name) \
        VALUES (%s, %s)
        """ , params
        )
        print(f"Row inserted {params}")
    except psycopg2.Error as e:
        print("Error: Could not create a row")
        print(e)
        
insert_into_employees((1, 'Sam'))
insert_into_employees((2, 'Bob'))

Row inserted (1, 'Sam')
Row inserted (2, 'Bob')


In [29]:
# Insert row in sales
def insert_into_sales(params):
    try: 
        cur.execute(
        """
        INSERT INTO sales (transaction_id, amount_spent) \
        VALUES (%s, %s)
        """ , params
        )
        print(f"Row inserted {params}")
    except psycopg2.Error as e:
        print("Error: Could not create a row")
        print(e)
        
insert_into_sales((1, 40))
insert_into_sales((2, 19))
insert_into_sales((3, 45))

Row inserted (1, 40)
Row inserted (2, 19)
Row inserted (3, 45)


### Confirm the tables were created with the data

In [30]:
print("Table: transactions2\n")
try: 
    cur.execute("SELECT * FROM transactions2;")
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

print("\nTable: albums_sold\n")
try: 
    cur.execute("SELECT * FROM albums_sold;")
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

print("\nTable: employees\n")
try: 
    cur.execute("SELECT * FROM employees;")
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()
    
print("\nTable: Sales\n")
try: 
    cur.execute("SELECT * FROM sales;")
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

Table: transactions2

(1, 'Amanda', 1, 2000)
(2, 'Toby', 1, 2000)
(3, 'Max', 2, 2018)

Table: albums_sold

(1, 1, 'Rubber Soul')
(2, 1, 'Let it Be')
(3, 2, 'My Generation')
(4, 3, 'Meet the Beatles')
(5, 3, 'Help!')

Table: employees

(1, 'Sam')
(2, 'Bob')

Table: Sales

(1, 40.0)
(2, 19.0)
(3, 45.0)


#### Let's say we need to do a query that gives us:

* transaction_id
* customer_name
* cashier name
* year 
* album sold
* amount sold

we will need to perform a 3 way `JOIN` on the 4 tables we have created. 

In [31]:
try: 
    cur.execute("""

    SELECT transactions2.transaction_id, customer_name, employee_name, year, album_name, amount_spent
    FROM transactions2 
        JOIN albums_sold on transactions2.transaction_id = albums_sold.transaction_id
        JOIN employees on transactions2.cashier_id = employees.employee_id
        JOIN sales on transactions2.transaction_id = sales.transaction_id
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

(1, 'Amanda', 'Sam', 2000, 'Rubber Soul', 40.0)
(1, 'Amanda', 'Sam', 2000, 'Let it Be', 40.0)
(2, 'Toby', 'Sam', 2000, 'My Generation', 19.0)
(3, 'Max', 'Bob', 2018, 'Meet the Beatles', 45.0)
(3, 'Max', 'Bob', 2018, 'Help!', 45.0)


#### Great we were able to get the data we wanted.

### But, we had a to 3 way `JOIN` to get there. While it's great we had that flexibility, we need to remember that joins are slow and if we have a read heavy workload that required low latency queries we want to reduce the number of `JOINS`.  Let's think about denormalizing our normalized tables.

###  With denormalization we want to think about the queries we are running and how we can reduce our number of JOINS even if that means duplicating data 

#### Query 1 : `select transaction_id, customer_name, amount_sent FROM <min number of tables>` 
This should give the amount spent on each transaction 

In [32]:
%%time
try: 
    cur.execute("""

    SELECT transactions2.transaction_id, customer_name, amount_spent
    FROM transactions2 
        JOIN albums_sold on transactions2.transaction_id = albums_sold.transaction_id
        JOIN sales on transactions2.transaction_id = sales.transaction_id
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

(1, 'Amanda', 40.0)
(1, 'Amanda', 40.0)
(2, 'Toby', 19.0)
(3, 'Max', 45.0)
(3, 'Max', 45.0)
CPU times: user 1.16 ms, sys: 1.33 ms, total: 2.49 ms
Wall time: 7.87 ms


#### Query 2: `select cashier_name, SUM(amount_spent) FROM <min number of tables> GROUP BY cashier_name` 
This should give the total sales by cashier 

In [33]:
try: 
    cur.execute("""

    select employee_name, SUM(amount_spent) 
    FROM transactions2 
        JOIN employees on transactions2.cashier_id = employees.employee_id
        JOIN sales on transactions2.transaction_id = sales.transaction_id
    GROUP BY employee_name
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

('Sam', 59.0)
('Bob', 45.0)


###  Query 1: `select transaction_id, customer_name, amount_spent FROM <min number of tables>`

There are two ways to do this, you can do a JOIN on the `sales` and `transactions2` table but we want to minimize the use of `JOINS`.  

Let's add `amount_spent` to the `transactions` table so that we will not need to do a JOIN at all. 

`Table Name: transactions 
column 0: transaction Id
column 1: Customer Name
column 2: Cashier Id
column 3: Year
column 4: amount_spent`

In [34]:
## Creating table "transactions" on DeNormalized 
try:
    cur.execute("""
    CREATE TABLE IF NOT EXISTS transactions
    (
        transaction_id int,
        customer_name text,
        cashier_id int,
        year int,
        amount_spent real
    )
    """)
    print ("Table created")
except psycopg2.Error as e:
    print("Error: Could not create table")
    print(e)

Table created


In [35]:
# Populate transactions 
try: 
    cur.execute("""
    INSERT into transactions (
    SELECT transactions2.transaction_id, customer_name, cashier_id, year, amount_spent
    FROM transactions2 
        JOIN albums_sold on transactions2.transaction_id = albums_sold.transaction_id
        JOIN sales on transactions2.transaction_id = sales.transaction_id
    )
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)
    

### Great we can now do a simplifed query to get the information we need. No  `JOIN` is needed.

In [36]:
%%time
try: 
    cur.execute("""
    SELECT transaction_id, customer_name, amount_spent
    FROM transactions
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

(1, 'Amanda', 40.0)
(1, 'Amanda', 40.0)
(2, 'Toby', 19.0)
(3, 'Max', 45.0)
(3, 'Max', 45.0)
CPU times: user 917 µs, sys: 1.23 ms, total: 2.15 ms
Wall time: 5.34 ms


### Query 2: `select cashier_name, SUM(amount_spent) FROM <min number of tables> GROUP BY cashier_name` 

We could also do a `JOIN` on the tables we have created, but what if we do not want to have any `JOINS`, why not create a new table with just the information we need. 

`Table Name: cashier_sales
col: Transaction Id
Col: Cashier Name
Col: Cashier Id
col: Amount_Spent
`

In [37]:
## Creating table "cashier_sales" on DeNormalized 
try:
    cur.execute("""
    CREATE TABLE IF NOT EXISTS cashier_sales
    (
        transaction_id int,
        cashier_name text,
        cashier_id int,
        amount_spent real
    )
    """)
    print ("Table created")
except psycopg2.Error as e:
    print("Error: Could not create table")
    print(e)

Table created


In [38]:
try: 
    cur.execute("""
    insert into cashier_sales (
        select transactions2.transaction_id, employee_name, employee_id, amount_spent
        FROM transactions2 
            JOIN employees on transactions2.cashier_id = employees.employee_id
            JOIN sales on transactions2.transaction_id = sales.transaction_id
    )
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)


#### Now let's run our query

In [39]:
%%time
try: 
    cur.execute("""
    select cashier_name, SUM(amount_spent) FROM cashier_sales GROUP BY cashier_name;
    """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

('Sam', 59.0)
('Bob', 45.0)
CPU times: user 816 µs, sys: 1.38 ms, total: 2.2 ms
Wall time: 4.36 ms


#### We have successfully taken normalized table and denormalized them in order to speed up our performance and allow for simplier queries to be executed. 

### Drop the tables. 

In [40]:
try: 
    cur.execute("DROP table albums_sold")
except psycopg2.Error as e: 
    print("Error: Dropping table")
    print (e)
try: 
    cur.execute("DROP table employees")
except psycopg2.Error as e: 
    print("Error: Dropping table")
    print (e)
try: 
    cur.execute("DROP table transactions")
except psycopg2.Error as e: 
    print("Error: Dropping table")
    print (e)
try: 
    cur.execute("DROP table transactions2")
except psycopg2.Error as e: 
    print("Error: Dropping table")
    print (e)
try: 
    cur.execute("DROP table sales")
except psycopg2.Error as e: 
    print("Error: Dropping table")
    print (e)
try: 
    cur.execute("DROP table cashier_sales")
except psycopg2.Error as e: 
    print("Error: Dropping table")
    print (e)

### And finally close your cursor and connection. 

In [41]:
cur.close()
conn.close()

<h2><span style='color:blue'>Remove Environment</span></h2>

In [42]:
# Clears proxy
pids_kubectl_proxy = !ps -ef|fgrep 'kubectl port-forward'|fgrep $CHART_INSTANCE_NAME|cut -d ' ' -f4
!kill -9 {pids_kubectl_proxy[0]}

In [43]:
# Removes chart instances
!helm uninstall {CHART_INSTANCE_NAME}

release "dend-l2e2" uninstalled


In [44]:
# Removes persistent Volume
!kubectl get pvc|fgrep {CHART_INSTANCE_NAME}|cut -d ' '  -f1| xargs -t kubectl delete pvc

kubectl delete pvc data-dend-l2e2-postgresql-0
persistentvolumeclaim "data-dend-l2e2-postgresql-0" deleted
