[Process large datasets without running out of memory](https://pythonspeed.com/memory/)

### use sqlite for large data-sets that do not fit into memory

In [1]:
import sqlite3
import pandas as pd
df = pd.read_csv("../data/creditcard.csv")

In [2]:
df.memory_usage(index=True, deep=True).sum()/1024**2

67.36017608642578

In [3]:
df.columns

Index(['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10',
       'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20',
       'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount',
       'Class'],
      dtype='object')

In [4]:
df[['Time', 'Amount', 'Class']]

Unnamed: 0,Time,Amount,Class
0,0.0,149.62,0
1,0.0,2.69,0
2,1.0,378.66,0
3,1.0,123.50,0
4,2.0,69.99,0
...,...,...,...
284802,172786.0,0.77,0
284803,172787.0,24.79,0
284804,172788.0,67.88,0
284805,172788.0,10.00,0


In [5]:
# create a db

connection = sqlite3.connect("creditcard_fraud.sqlite")

# if your data-frame is too large for your memory - load it in chunks:
for df in pd.read_csv("../data/creditcard.csv", chunksize=1000):
    df[['Time', 'Amount', 'Class']].\
        to_sql("class", connection, if_exists="append", index=True, index_label='id')
    df.drop(['Time', 'Amount', 'Class'], axis=1).\
        to_sql("variables", connection, if_exists="append", index=True, index_label='id')

In [6]:
cursor = connection.cursor()
cursor.execute('SELECT name from sqlite_master where type= "table"')
print(cursor.fetchall())

[('class',), ('variables',)]


In [7]:
cursor.execute('SELECT * FROM class LIMIT 2;')
names = [i[0] for i in cursor.description]
print(names)

['id', 'Time', 'Amount', 'Class']


In [8]:
cursor.execute("CREATE INDEX c_id ON class(id)")

cursor.execute("CREATE INDEX v_id ON variables(id)")

OperationalError: index c_id already exists

In [None]:
pd.read_sql("""\
            SELECT Class, AVG(Amount) AS mean_amount, AVG(V5) AS mean_v5
            FROM variables v
            INNER JOIN class c
            ON v.id = c.id
            WHERE c.Amount > 100
            GROUP BY Class
            """, connection)

## duck-db with pyarrow

In [None]:
!pip install duckdb

In [None]:
!pip install pyarrow

In [5]:
!pip install psutil

Collecting psutil
  Downloading psutil-5.8.0-cp36-cp36m-manylinux2010_x86_64.whl (291 kB)
[K     |████████████████████████████████| 291 kB 4.1 MB/s eta 0:00:01
[?25hInstalling collected packages: psutil
Successfully installed psutil-5.8.0


In [8]:
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import duckdb
import psutil
import os

### write the data into an arrow file

In [2]:
df = pd.read_csv('../data/creditcard.csv', memory_map=True)
table = pa.Table.from_pandas(df.fillna(0))
with pa.OSFile('../data/creditcard.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, table.schema) as writer:
        writer.write_table(table)

In [3]:
df_sub = pd.read_csv('../data/creditcard_subsampled.csv', memory_map=True)
table_sub = pa.Table.from_pandas(df_sub.fillna(0))
with pa.OSFile('../data/creditcard_subsampled.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, table_sub.schema) as writer:
        writer.write_table(table_sub)

### read the arrow file as memory-mapped file

In [13]:
source = None
reader = None
table = None
del source, reader, table

rss: aka “Resident Set Size”, this is the non-swapped physical memory a process has used.


In [14]:
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20

In [15]:
source = pa.memory_map("../data/creditcard.arrow", 'r')
reader = pa.RecordBatchFileReader(source)
table = reader.read_all()

In [16]:
memory_post_arrow = psutil.Process(os.getpid()).memory_info().rss >> 20
print(f'consumed {memory_post_arrow - memory_init} mb memory')

consumed 0 mb memory


In [17]:
source_sub = pa.memory_map("../data/creditcard_subsampled.arrow", 'r')
reader_sub = pa.RecordBatchFileReader(source_sub)
table_sub = reader_sub.read_all()

In [18]:
memory_post_arrow = psutil.Process(os.getpid()).memory_info().rss >> 20
print(f'consumed {memory_post_arrow - memory_init} mb memory')

consumed -23 mb memory


### import the memory-mapped file into duck-db

In [19]:
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20

In [20]:
import duckdb
rel = duckdb.from_arrow_table(table)

In [21]:
memory_post_duckdb = psutil.Process(os.getpid()).memory_info().rss >> 20
print(f'consumed {memory_post_duckdb - memory_init} mb memory')

consumed 5 mb memory


In [6]:
# rel_sub = duckdb.from_arrow_table(table_sub)

In [None]:
# this is not working
# joined = rel.join(rel_sub, on=['Time', 'Amount'])

In [22]:
rel.query("arrow", "SELECT * FROM arrow LIMIT 10").df()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0
5,2.0,-0.425966,0.960523,1.141109,-0.168252,0.420987,-0.029728,0.476201,0.260314,-0.568671,...,-0.208254,-0.559825,-0.026398,-0.371427,-0.232794,0.105915,0.253844,0.08108,3.67,0
6,4.0,1.229658,0.141004,0.045371,1.202613,0.191881,0.272708,-0.005159,0.081213,0.46496,...,-0.167716,-0.27071,-0.154104,-0.780055,0.750137,-0.257237,0.034507,0.005168,4.99,0
7,7.0,-0.644269,1.417964,1.07438,-0.492199,0.948934,0.428118,1.120631,-3.807864,0.615375,...,1.943465,-1.015455,0.057504,-0.649709,-0.415267,-0.051634,-1.206921,-1.085339,40.8,0
8,7.0,-0.894286,0.286157,-0.113192,-0.271526,2.669599,3.721818,0.370145,0.851084,-0.392048,...,-0.073425,-0.268092,-0.204233,1.011592,0.373205,-0.384157,0.011747,0.142404,93.2,0
9,9.0,-0.338262,1.119593,1.044367,-0.222187,0.499361,-0.246761,0.651583,0.069539,-0.736727,...,-0.246914,-0.633753,-0.120794,-0.38505,-0.069733,0.094199,0.246219,0.083076,3.68,0


In [23]:
rel.query("arrow", "SELECT * FROM arrow LIMIT 10").fetch_arrow_table()

pyarrow.Table
Time: double not null
V1: double not null
V2: double not null
V3: double not null
V4: double not null
V5: double not null
V6: double not null
V7: double not null
V8: double not null
V9: double not null
V10: double not null
V11: double not null
V12: double not null
V13: double not null
V14: double not null
V15: double not null
V16: double not null
V17: double not null
V18: double not null
V19: double not null
V20: double not null
V21: double not null
V22: double not null
V23: double not null
V24: double not null
V25: double not null
V26: double not null
V27: double not null
V28: double not null
Amount: double not null
Class: int64 not null

### create persistent view of table

In [25]:
con = duckdb.connect(database='../data/creditcard.duckdb', read_only=False)

In [26]:
df = pd.read_csv('../data/creditcard_subsampled.csv', memory_map=True)
df.to_sql('cc1', con)

In [28]:
con.execute("SELECT * FROM cc1 LIMIT 1").fetchnumpy()

{'level_0': array([0], dtype=int32),
 'index': array([541], dtype=int32),
 'Time': array([406.], dtype=float32),
 'V1': array([-2.3122265], dtype=float32),
 'V2': array([1.951992], dtype=float32),
 'V3': array([-1.6098508], dtype=float32),
 'V4': array([3.9979055], dtype=float32),
 'V5': array([-0.5221879], dtype=float32),
 'V6': array([-1.4265453], dtype=float32),
 'V7': array([-2.5373874], dtype=float32),
 'V8': array([1.3916572], dtype=float32),
 'V9': array([-2.7700894], dtype=float32),
 'V10': array([-2.772272], dtype=float32),
 'V11': array([3.2020333], dtype=float32),
 'V12': array([-2.8999074], dtype=float32),
 'V13': array([-0.5952219], dtype=float32),
 'V14': array([-4.2892537], dtype=float32),
 'V15': array([0.3897241], dtype=float32),
 'V16': array([-1.1407472], dtype=float32),
 'V17': array([-2.8300557], dtype=float32),
 'V18': array([-0.01682247], dtype=float32),
 'V19': array([0.4169557], dtype=float32),
 'V20': array([0.12691055], dtype=float32),
 'V21': array([0.517232

In [29]:
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20

In [30]:
table.to_pandas().to_sql("cc2", con)

In [31]:
memory_arrow_to_db = psutil.Process(os.getpid()).memory_info().rss >> 20
print(f'memory consumption: {memory_arrow_to_db - memory_init} mb')

memory consumption: 990 mb


In [33]:
table = None
df = None
del table, df

In [34]:
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20

In [35]:
con.execute("""SELECT cl.*
               FROM class cl 
               INNER JOIN new_class al 
               ON al.time = cl.time 
               AND al.Amount = cl.amount 
               WHERE al.class = 0""").fetchnumpy()

{'id': array([    0,     1,     7, ..., 99539, 99539, 99539], dtype=int32),
 'Time': array([0.00000e+00, 0.00000e+00, 7.00000e+00, ..., 1.63152e+05,
        1.63152e+05, 1.63152e+05], dtype=float32),
 'Amount': array([149.62,   2.69,  40.8 , ...,   7.56,   7.56,   7.56], dtype=float32),
 'Class': array([0, 0, 0, ..., 0, 0, 0], dtype=int32)}

In [36]:
memory_db = psutil.Process(os.getpid()).memory_info().rss >> 20
print(f'memory consumption: {memory_db - memory_init} mb')

memory consumption: 0 mb


In [41]:
con.execute("CREATE TABLE credit_sub AS SELECT * FROM read_csv_auto('../data/creditcard_subsampled.csv');")

<duckdb.DuckDBPyConnection at 0x7fa82816fe68>

In [46]:
con.execute("show tables").fetchnumpy()

{'name': array(['all', 'cc1', 'cc2', 'class', 'credit_sub', 'ix_cc1_level_0',
        'ix_cc2_index', 'new_class', 'sqlite_master'], dtype=object)}

In [48]:
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20
con.execute("CREATE TABLE credit_pq AS SELECT * FROM parquet_scan('../data/creditcard.parquet');")

<duckdb.DuckDBPyConnection at 0x7fa82816fe68>

In [49]:
memory_parquet = psutil.Process(os.getpid()).memory_info().rss >> 20
print(f'memory consumption: {memory_parquet - memory_init} mb')

memory consumption: 0 mb


In [50]:
con.execute("show tables").fetchnumpy()

{'name': array(['all', 'cc1', 'cc2', 'class', 'credit_pq', 'credit_sub',
        'ix_cc1_level_0', 'ix_cc2_index', 'new_class', 'sqlite_master'],
       dtype=object)}

In [None]:
con.execute("DROP TABLE credit_pq;")

In [33]:
print(dir(con))

['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'append', 'arrow', 'begin', 'close', 'commit', 'cursor', 'df', 'duplicate', 'execute', 'executemany', 'fetch_arrow_table', 'fetch_df', 'fetch_df_chunk', 'fetchall', 'fetchdf', 'fetchnumpy', 'fetchone', 'from_arrow_table', 'from_csv_auto', 'from_df', 'from_parquet', 'register', 'rollback', 'table', 'table_function', 'unregister', 'values', 'view']


In [None]:
rel.create_view

In [5]:
print(dir(rel))

['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'aggregate', 'arrow', 'create', 'create_view', 'df', 'distinct', 'except_', 'execute', 'filter', 'insert', 'insert_into', 'intersect', 'join', 'limit', 'order', 'project', 'query', 'set_alias', 'to_arrow_table', 'to_df', 'union', 'write_csv']


In [36]:
print(dir(duckdb))

['DuckDBPyConnection', 'DuckDBPyRelation', 'DuckDBPyResult', '__doc__', '__file__', '__git_revision__', '__loader__', '__name__', '__package__', '__spec__', '__version__', '_clean_default_connection', 'aggregate', 'alias', 'arrow', 'comment', 'connect', 'df', 'distinct', 'filter', 'from_arrow_table', 'from_csv_auto', 'from_df', 'from_parquet', 'identifier', 'keyword', 'limit', 'numeric_const', 'operator', 'order', 'project', 'query', 'string_const', 'token_type', 'tokenize', 'values', 'write_csv']


In [37]:
print(dir(table))

['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__pyx_vtable__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '_column', '_ensure_integer_index', '_to_pandas', 'add_column', 'append_column', 'cast', 'column', 'column_names', 'columns', 'combine_chunks', 'drop', 'equals', 'field', 'filter', 'flatten', 'from_arrays', 'from_batches', 'from_pandas', 'from_pydict', 'itercolumns', 'nbytes', 'num_columns', 'num_rows', 'remove_column', 'rename_columns', 'replace_schema_metadata', 'schema', 'select', 'set_column', 'shape', 'slice', 'take', 'to_batches', 'to_pandas', 'to_pydict', 'to_string', 'validate']


In [38]:
print(dir(df))

['Amount', 'Class', 'T', 'Time', 'V1', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V2', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', '_AXIS_LEN', '_AXIS_NAMES', '_AXIS_NUMBERS', '_AXIS_ORDERS', '_AXIS_REVERSED', '_AXIS_TO_AXIS_NUMBER', '__abs__', '__add__', '__and__', '__annotations__', '__array__', '__array_priority__', '__array_wrap__', '__bool__', '__class__', '__contains__', '__copy__', '__deepcopy__', '__delattr__', '__delitem__', '__dict__', '__dir__', '__div__', '__doc__', '__eq__', '__finalize__', '__floordiv__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getitem__', '__getstate__', '__gt__', '__hash__', '__iadd__', '__iand__', '__ifloordiv__', '__imod__', '__imul__', '__init__', '__init_subclass__', '__invert__', '__ior__', '__ipow__', '__isub__', '__iter__', '__itruediv__', '__ixor__', '__le__', '__len__', '__lt__', '__matmul__', '__mod__', '__module__', '__mul__', '__ne_

In [38]:
probe = duckdb.from_csv('../data/creditcard_subsampled.csv', sep=',', con)

SyntaxError: positional argument follows keyword argument (<ipython-input-38-e715292e4466>, line 1)

In [39]:
duckdb.from_csv?

Object `duckdb.from_csv` not found.


In [29]:
# open database file
db = duckdb.open('../data/creditcard.duckdb')
# read CSV again
read_csv = duckdb.from_csv('../data/creditcard_subsampled.csv', sep=',')#, columns=['C1', 'C2'], types=[duckdb.int32, duckdb.int32])
# write into table, this will trigger write into persistent storage
read_csv.store(db, 'test')
# read from table and filter again
#db.table('test').filter('C1 > 100').print()

AttributeError: module 'duckdb' has no attribute 'open'

In [28]:
rel.store(con, 'cc_1')

TypeError: 'NoneType' object is not callable

In [23]:
con = duckdb.connect(database='../data/creditcard.duckdb', read_only=False)


In [21]:
import duckdb
# con = duckdb.connect(database=':memory:', read_only=False)
con = duckdb.connect(database='../data/creditcard.duckdb', read_only=False)
con.register('creditcard', rel)

RuntimeError: TypeError: 'NoneType' object is not callable

In [14]:
duckdb.connect?

https://www.gitmemory.com/issue/cwida/duckdb/908/694783944
https://dev.to/volkmarr/duckdb-an-embedded-db-for-data-wrangling-4hfm
https://shekhargulati.com/2019/12/15/the-5-minute-introduction-to-duckdb-the-sqlite-for-analytics/
https://duckdb.org/docs/api/python

### unlist list of lists

In [None]:
import random
import string
import numpy as np
list_of_lists_of_strings = [[''.join(random.choices(
    string.ascii_uppercase + string.digits, k=kl)) for kl in np.random.randint(1, 6, 4)] for _ in range(1000)]
display(list_of_lists_of_strings[0:5])

In [None]:
from itertools import chain
unlisted_list_comprehension = [s for l in list_of_lists_of_strings for s  in l]
unlisted_itertools = list(chain(*list_of_lists_of_strings))
display(unlisted_list_comprehension[50:55])
display(unlisted_itertools[50:55])

In [None]:
%timeit [s for l in list_of_lists_of_strings for s  in l]
%timeit list(chain(*list_of_lists_of_strings))