In [1]:
import csv
import io
import psycopg2
import subprocess
import time
import timeit
from functools import wraps
from memory_profiler import memory_usage
from pathlib import Path
from typing import Iterator, Dict, Any, Optional

In [2]:
logfile = '../time_log.txt'

In [3]:
# https://hakibenita.com/fast-load-data-python-postgresql
def profile(fn):
    @wraps(fn)
    def inner(*args, **kwargs):
        fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
        print(f'\n{fn.__name__}({fn_kwargs_str})')

        # Measure time
        t = time.perf_counter()
        retval = fn(*args, **kwargs)
        elapsed = time.perf_counter() - t
        print(f'Time   {elapsed:0.4}')

        # Measure memory
        mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)

        print(f'Memory {max(mem) - min(mem)}')
        return retval

    return inner

In [4]:
connection = psycopg2.connect(
    host="localhost",
    database="estoque_teste",
    user="rg3915",
    password="1234",
)
connection.autocommit = True

In [5]:
@profile
def csv_to_list(filename: str) -> list:
    '''
    Lê um csv e retorna um OrderedDict.
    Créditos para Rafael Henrique
    https://bit.ly / 2FLDHsH
    '''
    with open(filename) as csv_file:
        reader = csv.DictReader(csv_file, delimiter=',')
        csv_data = [line for line in reader]
    return csv_data

In [6]:
max_rows = 10000

In [7]:
home = str(Path.home())
filename = f'{home}/dados/produtos_{max_rows}.csv'

In [8]:
# Ler um csv consome memória.
items = csv_to_list(filename)


csv_to_list()
Time   0.05657
Memory 3.73828125


In [9]:
items[:5]

[{'title': 'RbwgRYURhGtF', 'quantity': '6400'},
 {'title': 'mTMDWLahbrjY', 'quantity': '3658'},
 {'title': 'TmQjUevhJmsF', 'quantity': '8242'},
 {'title': 'UsOPzMXRJprQ', 'quantity': '2260'},
 {'title': 'gnAPXyfEmkUR', 'quantity': '4236'}]

In [10]:
def timelog(total_items, _time, logfile, resource):
    total_items = f'{total_items:,}'.replace(',', '.')
    space = ' ' * (10 - len(total_items))
    time = round((_time), 3)
    subprocess.call(f"printf '{total_items} {space} -> {time}s\t --> Inserindo {total_items} registros com {resource}.\n' >> {logfile}", shell=True)

## One by one

In [11]:
@profile
def insert_one_by_one(connection, items: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        for item in items:
            cursor.execute("""
                INSERT INTO core_product (title, quantity)
                VALUES (
                    %(title)s,
                    %(quantity)s
                );
            """, {
                'title': item['title'],
                'quantity': int(item['quantity']),
            })

In [12]:
# Sem profile vai demorar a metade do tempo.
tic1 = timeit.default_timer()
insert_one_by_one(connection, items)  # <--- insert data one by one
toc1 = timeit.default_timer()
time1 = toc1 - tic1
round((time1), 3)


insert_one_by_one()
Time   11.89
Memory 0.02734375


24.051

In [13]:
timelog(len(items), time1, logfile, 'psycopg2 one by one (with profile)')

In [14]:
# without profile

In [15]:
def insert_one_by_one(connection, items: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        for item in items:
            cursor.execute("""
                INSERT INTO core_product (title, quantity)
                VALUES (
                    %(title)s,
                    %(quantity)s
                );
            """, {
                'title': item['title'],
                'quantity': int(item['quantity']),
            })

In [16]:
tic2 = timeit.default_timer()
insert_one_by_one(connection, items)  # <--- insert data one by one
toc2 = timeit.default_timer()
time2 = toc2 - tic2
round((time2), 3)

12.726

In [17]:
timelog(len(items), time2, logfile, 'psycopg2 one by one')

## executemany

In [22]:
@profile
def insert_executemany(connection, items: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        all_items = [{
            'title': item['title'],
            'quantity': int(item['quantity'])
        } for item in items]

        cursor.executemany("""
            INSERT INTO core_product (title, quantity)
            VALUES (
                %(title)s,
                %(quantity)s
            );
        """, all_items)

In [23]:
tic3 = timeit.default_timer()
insert_executemany(connection, items)  # <--- insert data executemany
toc3 = timeit.default_timer()
time3 = toc3 - tic3
round((time3), 3)


insert_executemany()
Time   11.64
Memory 2.0078125


23.203

In [24]:
timelog(len(items), time3, logfile, 'psycopg2 executemany (with profile)')

In [25]:
# without profile

In [26]:
def insert_executemany(connection, items: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        all_items = [{
            'title': item['title'],
            'quantity': int(item['quantity'])
        } for item in items]

        cursor.executemany("""
            INSERT INTO core_product (title, quantity)
            VALUES (
                %(title)s,
                %(quantity)s
            );
        """, all_items)

In [27]:
tic4 = timeit.default_timer()
insert_executemany(connection, items)  # <--- insert data executemany
toc4 = timeit.default_timer()
time4 = toc4 - tic4
round((time4), 3)

11.798

In [28]:
timelog(len(items), time4, logfile, 'psycopg2 executemany')

```
100.000     -> 4.866s	 --> Inserindo 100.000 registros com Django bulk_create.
10.000      -> 11.798s	 --> Inserindo 10.000 registros com psycopg2 executemany.
```

bulk_create

https://docs.djangoproject.com/en/2.2/ref/models/querysets/#bulk-create

Github: bulk_create

https://github.com/django/django/blob/master/django/db/models/query.py#L455

Github: Atomic transation

https://github.com/django/django/blob/master/django/db/models/query.py#L491

Database transactions

https://docs.djangoproject.com/en/3.0/topics/db/transactions/



## copy_from

In [29]:
@profile
def insert_with_copy_from(connection):
    with open(filename, 'r') as f:
        next(f)
        connection.cursor().copy_from(f, 'core_product', sep=',', columns=('title', 'quantity'))

In [30]:
tic5 = timeit.default_timer()
insert_with_copy_from(connection)  # <--- insert data copy_from
toc5 = timeit.default_timer()
time5 = toc5 - tic5
round((time5), 3)


insert_with_copy_from()
Time   0.03871
Memory 0.0


0.119

In [31]:
timelog(max_rows, time5, logfile, 'psycopg2 copy_from (with profile)')

In [32]:
# without profile

In [33]:
def insert_with_copy_from(connection):
    with open(filename, 'r') as f:
        next(f)
        connection.cursor().copy_from(f, 'core_product', sep=',', columns=('title', 'quantity'))

In [34]:
tic5 = timeit.default_timer()
insert_with_copy_from(connection)  # <--- insert data copy_from
toc5 = timeit.default_timer()
time5 = toc5 - tic5
round((time5), 3)

0.058

In [35]:
timelog(max_rows, time5, logfile, 'psycopg2 copy_from')

```
10.000      -> 0.433s	 --> Inserindo 10.000 registros com Django bulk_create.
10.000      -> 0.058s	 --> Inserindo 10.000 registros com psycopg2 copy_from.
```




In [38]:
round(0.433/0.058, 2)

7.47

In [39]:
# Próximo passo
# insert_with_subprocess.py
# Rodar no terminal.