In [2]:
%load_ext sql

In [3]:
%env DATABASE_URL=postgresql://sms_user:sms_password@localhost:5432/sms_db

env: DATABASE_URL=postgresql://sms_user:sms_password@localhost:5432/sms_db


In [4]:
%%sql
CREATE TABLE orders (
  order_id INT NOT NULL,
  order_date TIMESTAMP NOT NULL,
  order_customer_id INT NOT NULL,
  order_status VARCHAR(45) NOT NULL,
  PRIMARY KEY (order_id)
);

Done.


[]

In [5]:
%%sql
CREATE TABLE order_items (
  order_item_id INT NOT NULL,
  order_item_order_id INT NOT NULL,
  order_item_product_id INT NOT NULL,
  order_item_quantity INT NOT NULL,
  order_item_subtotal FLOAT NOT NULL,
  order_item_product_price FLOAT NOT NULL,
  PRIMARY KEY (order_item_id)
);

 * postgresql://sms_user:***@localhost:5432/sms_db
Done.


[]

In [4]:
%%sql 

SELECT * FROM information_schema.tables 
WHERE table_catalog = 'sms_db' AND table_schema = 'public'
LIMIT 10

5 rows affected.


table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
sms_db,public,t,BASE TABLE,,,,,,YES,NO,
sms_db,public,courses,BASE TABLE,,,,,,YES,NO,
sms_db,public,users,BASE TABLE,,,,,,YES,NO,
sms_db,public,orders,BASE TABLE,,,,,,YES,NO,
sms_db,public,order_items,BASE TABLE,,,,,,YES,NO,


In [7]:
%%sql

SELECT * FROM orders LIMIT 10

 * postgresql://sms_user:***@localhost:5432/sms_db
0 rows affected.


order_id,order_date,order_customer_id,order_status


In [8]:
%%sql

SELECT * FROM order_items LIMIT 10

 * postgresql://sms_user:***@localhost:5432/sms_db
0 rows affected.


order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price


## Reading data from retail_db to DataFrames

In [13]:
import pandas as pd
def get_df(path, schema):
    df = pd.read_csv(
        path,
        header=None,
        names=schema
    )
    return df

In [14]:
orders_path = "/home/vspatil1330/retail_db/orders/part-00000"
orders_schema = [
    "order_id",
    "order_date",
    "order_customer_id",
    "order_status"
]
orders = get_df(orders_path, orders_schema)

In [15]:
order_items_path = "/home/vspatil1330/retail_db/order_items/part-00000"
order_items_schema = [
    "order_item_id",
    "order_item_order_id",
    "order_item_product_id",
    "order_item_quantity",
    "order_item_subtotal",
    "order_item_product_price"
]
order_items = get_df(order_items_path, order_items_schema)

In [8]:
 orders.head(3)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25 00:00:00.0,11599,CLOSED
1,2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
2,3,2013-07-25 00:00:00.0,12111,COMPLETE


In [9]:
order_items.head(3)

Unnamed: 0,order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price
0,1,1,957,1,299.98,299.98
1,2,2,1073,1,199.99,199.99
2,3,2,502,5,250.0,50.0


# Batchloading

Let us understand how we should take care of loading data in batches.
We will perform load using multiple approaches to understand which one is better.

* Approach 1: Insert and commit each record. Whenever there is a commit in database, there is considerable amount of overhead.

* Approach 2: Insert one record at a time, but commit at the end.

* Approach 3: Insert all records at once and commit at the end.

* Approach 4: Insert records in chunks or batches and commit per chunk or batch.

We should follow the fourth approach while dealing with huge amounts of data. It will facilitate us to restartability or recoverability.

#### Approach 1: 
Insert and commit each record. 
Whenever there is a commit in database, there is considerable amount of overhead.

In [4]:
%run get_database.ipynb

In [5]:
def get_cursor(connection):
    return connection.cursor()

In [6]:
query = ("""INSERT INTO orders
         (order_id, order_date, order_customer_id, order_status)
         VALUES
         (%s, %s, %s, %s)""")

In [13]:
def load_orders(connection, cursor, query, data):
    for rec in data:
        cursor.execute(query, rec)
        connection.commit()

In [7]:
cursor = get_cursor(sms_connection)

In [19]:
%%time
load_orders(sms_connection, cursor, query, orders.values.tolist()[:10000])

CPU times: user 689 ms, sys: 842 ms, total: 1.53 s
Wall time: 13.1 s


In [15]:
%%sql
select * from orders 
limit 10;

 * postgresql://sms_user:***@localhost:5432/sms_db
10 rows affected.


order_id,order_date,order_customer_id,order_status
1,2013-07-25 00:00:00,11599,CLOSED
2,2013-07-25 00:00:00,256,PENDING_PAYMENT
3,2013-07-25 00:00:00,12111,COMPLETE
4,2013-07-25 00:00:00,8827,CLOSED
5,2013-07-25 00:00:00,11318,COMPLETE
6,2013-07-25 00:00:00,7130,COMPLETE
7,2013-07-25 00:00:00,4530,COMPLETE
8,2013-07-25 00:00:00,2911,PROCESSING
9,2013-07-25 00:00:00,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00,5648,PENDING_PAYMENT


In [8]:
cursor.execute('TRUNCATE TABLE orders')

In [21]:
sms_connection.commit()

## Approach 2
Inserting one row at a time but committing at the end. Even though it is much faster than previous approach, it is transferring one record at a time between Python Engine and Database Engine.

We can further tune by leveraging batch insert.

In [22]:
def load_orders(connection, cursor, query, data):
    for rec in data:
        cursor.execute(query, rec)
    connection.commit()

cursor = get_cursor(sms_connection)



In [23]:
%%time
# Inserting all orders
load_orders(sms_connection, cursor, query, orders.values.tolist())

CPU times: user 1.81 s, sys: 1.91 s, total: 3.72 s
Wall time: 13.6 s


In [24]:
cursor.execute('TRUNCATE TABLE orders')

In [41]:
sms_connection.commit() 
## do this so it will get reflected in the database

In [30]:
%%sql
select * from orders;
## if you dont commit this will fail as the transaction is undergoing.
## commit to save the transaction and view it later

 * postgresql://sms_user:***@localhost:5432/sms_db
0 rows affected.


order_id,order_date,order_customer_id,order_status


## Approach 3

All the records will be inserted as part of one batch insert operation. 
If there is lot of data to be inserted, then this might start running into issues such as out of memory.
Also, if the job fails in the middle then all the data that is transferred thus far will be lost.
Hence it is better to batch with manageable size and then insert as well as commit.

In [31]:
def load_orders(connection, cursor, query, data):
    cursor.executemany(query, data)
    connection.commit()

cursor = get_cursor(sms_connection)



In [32]:
%%time
load_orders(sms_connection, cursor, query, orders.values.tolist())


CPU times: user 1.86 s, sys: 1.76 s, total: 3.62 s
Wall time: 13.1 s


In [33]:
cursor.execute('TRUNCATE TABLE orders')

## Approach 4

In [35]:
len(orders.values.tolist())

68883

In [36]:
len(orders.values.tolist())

68883

In [37]:
def load_orders(connection, cursor, query, data, batch_size=10000):
    for i in range(0, len(data), batch_size):
        cursor.executemany(query, data[i:i+batch_size])
        connection.commit()

In [38]:
cursor = get_cursor(sms_connection)

In [39]:
%%time
# Inserting all orders
load_orders(sms_connection, cursor, query, orders.values.tolist())

CPU times: user 1.88 s, sys: 1.7 s, total: 3.59 s
Wall time: 12.9 s


In [40]:
%%sql

SELECT count(1) FROM orders

 * postgresql://sms_user:***@localhost:5432/sms_db
1 rows affected.


count
68883
