Skip to content

Commit

Permalink
memory tracking for out of memory results etc
Browse files Browse the repository at this point in the history
  • Loading branch information
rohankumar42 committed Aug 14, 2020
1 parent 60344f0 commit 31bc249
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 57 deletions.
34 changes: 23 additions & 11 deletions pandasql/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
_filter_ancestors
from pandasql.sql_utils import get_sqlite_connection
from pandasql.cost_model import CostModel
from pandasql.memory_utils import _free_memory, \
_estimate_pandas_memory_from_sqlite
from pandasql.api_status import SUPPORTED_VIA_FALLBACK

DB_FILE = mkstemp("_pandasql.db")[1]
Expand Down Expand Up @@ -35,6 +37,7 @@ def __init__(self, name=None, sources=None):
self._cached_result = None
self._cached_on_sqlite = False
self._computed_on_pandas = False
self._out_of_memory = False
self.update = None
self.columns = pd.Index([])
self._sql_query = None
Expand All @@ -47,7 +50,13 @@ def __init__(self, name=None, sources=None):
@property
def result(self):
if self._cached_result is None:
return None
if self._out_of_memory:
raise MemoryError('The result of this dataframe is too big to '
'fit in memory. Please try accessing a '
'smaller subset of the data you need, '
'e.g., using df.head().')
else:
return None
elif hasattr(self, 'process_result') and not self._computed_on_pandas:
# If result was not computed on Pandas, post-process result
return self.process_result(self._cached_result)
Expand Down Expand Up @@ -118,11 +127,7 @@ def _compute_pandas(self, offload=False):

return self.result

def _compute_sqlite(self, to_pandas=True):

if not to_pandas:
raise NotImplementedError('TODO: Support for not bringing back '
'computed results has not been added.')
def _compute_sqlite(self):

if not self._cached_on_sqlite:
# Compute result and store in SQLite table
Expand All @@ -131,11 +136,18 @@ def _compute_sqlite(self, to_pandas=True):
SQL_CON.execute(compute_query)
self._cached_on_sqlite = True

if self._cached_result is None and to_pandas:
# Read table as Pandas DataFrame
read_query = 'SELECT * FROM {}'.format(self.name)
self._cached_result = pd.read_sql_query(read_query, con=SQL_CON)
self.columns = self._cached_result.columns
if self._cached_result is None:

estimated = _estimate_pandas_memory_from_sqlite(self.name)
if estimated > _free_memory():
# Cannot bring back result because it's too big for memory
self._out_of_memory = True
else:
# Read table as Pandas DataFrame
read_query = 'SELECT * FROM {}'.format(self.name)
self._cached_result = pd.read_sql_query(
read_query, con=SQL_CON)
self.columns = self._cached_result.columns

return self.result

Expand Down
45 changes: 8 additions & 37 deletions pandasql/io.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import os
import subprocess
from tempfile import mkstemp

import pandas as pd

from pandasql.core import DB_FILE, SQL_CON, DataFrame, _new_name
from pandasql.memory_utils import _estimate_pandas_memory_from_csv, _free_memory

SAMPLE_LINES = 1000
MEMORY_THRESHOLD = 4_000_000_000 # 4 GB
CHUNKSIZE = 10_000


Expand All @@ -18,13 +15,13 @@ def read_csv(file_name, name=None, sql_load=False, **kwargs):

name = name or _new_name()

if sql_load: # read through sqlite
return _csv_to_sql(file_name, name=name, **kwargs)
if sql_load: # read through SQLite
return _csv_to_sqlite(file_name, name=name, **kwargs)

estimated_size = _estimate_pd_memory(file_name)
estimated_size = _estimate_pandas_memory_from_csv(file_name)

if estimated_size > MEMORY_THRESHOLD: # TODO: set auto threshold
return _csv_chunking(file_name, name=name, **kwargs)
if estimated_size > _free_memory():
return _read_csv_by_chunking(file_name, name=name, **kwargs)

return DataFrame(pd.read_csv(file_name, **kwargs), name=name,
offload=True, loaded_on_sqlite=False)
Expand All @@ -42,7 +39,7 @@ def read_pickle(*args, name=None, **kwargs):
return DataFrame(pd.read_pickle(*args, **kwargs), name=name)


def _csv_to_sql(file_name, name, **kwargs):
def _csv_to_sqlite(file_name, name, **kwargs):
chunk = pd.read_csv(file_name, nrows=SAMPLE_LINES, **kwargs)

# sends first N lines to sqlite to establish correct types
Expand All @@ -57,7 +54,7 @@ def _csv_to_sql(file_name, name, **kwargs):
return df


def _csv_chunking(file_name, name, **kwargs):
def _read_csv_by_chunking(file_name, name, **kwargs):
for chunk in pd.read_csv(file_name, chunksize=CHUNKSIZE,
nrows=None, **kwargs):
chunk.to_sql(name=name, con=SQL_CON, index=False, if_exists='append')
Expand All @@ -66,29 +63,3 @@ def _csv_chunking(file_name, name, **kwargs):
df = DataFrame(None, name=name, offload=False, loaded_on_sqlite=True)
df.columns = cols
return df


##############################################################################
# Utility Functions
##############################################################################
def _estimate_pd_memory(file_name, **kwargs):
kwargs['nrows'] = SAMPLE_LINES
df = pd.read_csv(file_name, **kwargs)
temp = mkstemp(".csv_topn")[1]

if isinstance(df, pd.Series):
sample_memory_usage = df.memory_usage(deep=True)
else:
sample_memory_usage = df.memory_usage(deep=True).sum()

with open(temp, "w+") as f:
subprocess.call(["head", "-n", str(SAMPLE_LINES), file_name], stdout=f)

sample_disk_size = os.path.getsize(temp)
os.remove(temp)

full_disk_size = os.path.getsize(file_name)

est_memory_size = (full_disk_size / sample_disk_size) * sample_memory_usage

return est_memory_size
54 changes: 54 additions & 0 deletions pandasql/memory_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
import subprocess
import psutil
from tempfile import mkstemp

import pandas as pd
import pandasql as ps


SAMPLE_ROWS = 1000
# What proportion of available memory should we actually consider available
SAFETY_FACTOR = 0.8


def _estimate_pandas_memory_from_csv(file_name, **kwargs):
kwargs['nrows'] = SAMPLE_ROWS
df = pd.read_csv(file_name, **kwargs)
temp = mkstemp(".csv_topn")[1]

if isinstance(df, pd.Series):
sample_memory_usage = df.memory_usage(deep=True)
else:
sample_memory_usage = df.memory_usage(deep=True).sum()

with open(temp, "w+") as f:
subprocess.call(["head", "-n", str(SAMPLE_ROWS), file_name], stdout=f)

sample_disk_size = os.path.getsize(temp)
os.remove(temp)

full_disk_size = os.path.getsize(file_name)

est_memory_size = (full_disk_size / sample_disk_size) * sample_memory_usage

return est_memory_size


def _estimate_pandas_memory_from_sqlite(table_name):
sample = pd.read_sql(f'SELECT * FROM {table_name} LIMIT {SAMPLE_ROWS}',
con=ps.core.SQL_CON)
stats = pd.read_sql('SELECT SUM(ncell) as nrows FROM dbstat '
f'WHERE name="{table_name}"', con=ps.core.SQL_CON)
total_rows = stats['nrows'][0]

sample_memory_usage = sample.memory_usage(deep=True).sum()
sample_rows = len(sample)

est_memory_size = (total_rows / sample_rows) * sample_memory_usage

return est_memory_size


def _free_memory():
return SAFETY_FACTOR * psutil.virtual_memory().available
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

requires = [
'pandas==1.0.5',
'psutil>=5.7.2'
]

setuptools.setup(
Expand Down
18 changes: 10 additions & 8 deletions tests/test_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
import os

import psutil
from tempfile import mkstemp

import pandasql as ps
Expand All @@ -17,8 +17,7 @@ def setUp(self):
ps.offloading_strategy('ALWAYS')
arr = np.random.randint(low=0, high=100_000_000, size=(25_000, 2))
np.savetxt(self.FILE_NAME, arr, delimiter=',', fmt='%i',
header='c0,c1', # noqa
comments='')
header='c0,c1', comments='')
self.addCleanup(os.remove, self.FILE_NAME)

def test_loading_csv(self):
Expand All @@ -30,7 +29,7 @@ def test_loading_csv(self):

assertDataFrameEqualsPandas(df, base_df)

def test_loading_csv_sql(self):
def test_loading_csv_sqlite(self):
df = ps.read_csv(self.FILE_NAME, sql_load=True)
base_df = pd.read_csv(self.FILE_NAME)

Expand All @@ -39,18 +38,21 @@ def test_loading_csv_sql(self):

assertDataFrameEqualsPandas(df, base_df)

def test_loading_csv_sql_chunk(self):
old_threshold = ps.io.MEMORY_THRESHOLD
ps.io.MEMORY_THRESHOLD = 1
def test_loading_csv_in_chunks(self):
memory_thresh = 10 ** 4
new_factor = memory_thresh / psutil.virtual_memory().available
old_factor = ps.memory_utils.SAFETY_FACTOR
ps.memory_utils.SAFETY_FACTOR = new_factor

df = ps.read_csv(self.FILE_NAME)
base_df = pd.read_csv(self.FILE_NAME)

ps.memory_utils.SAFETY_FACTOR = old_factor

df['c0'] += 1
base_df['c0'] += 1

assertDataFrameEqualsPandas(df, base_df)
ps.io.MEMORY_THRESHOLD = old_threshold


if __name__ == "__main__":
Expand Down
28 changes: 27 additions & 1 deletion tests/test_offloading.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest

import psutil
import pandas as pd
import pandasql as ps
from .utils import assertDataFrameEqualsPandas
Expand Down Expand Up @@ -63,6 +63,32 @@ def test_run_with_missing_dependencies_pandas(self):
selection = df[df['n'] >= 5]
self.assertRaises(RuntimeError, lambda: selection.compute())

def test_result_out_of_memory(self):
ps.offloading_strategy('ALWAYS')

size = 10 ** 4

base_df = pd.DataFrame([{'n': i, 's': str(i*2)} for i in range(size)])
base_selection = base_df[base_df['n'] >= 5]
base_limit = base_selection.head()

df = ps.DataFrame(base_df)

memory_thresh = 10 ** 4
new_factor = memory_thresh / psutil.virtual_memory().available
old_factor = ps.memory_utils.SAFETY_FACTOR
ps.memory_utils.SAFETY_FACTOR = new_factor

# Should fail since the result is too big to be brought back
selection = df[df['n'] >= 5]
self.assertRaises(MemoryError, lambda: selection.compute())

# Should run since the result is small enough to be brought back
limit = selection.head()
assertDataFrameEqualsPandas(limit, base_limit)

ps.memory_utils.SAFETY_FACTOR = old_factor


if __name__ == "__main__":
unittest.main()

0 comments on commit 31bc249

Please sign in to comment.