In [1]:
import pandas as pd
import timeit
csv = "../5mSalesRecords.csv"

In [2]:
import tempfile
class CSVSplitter:
    def __init__(self, csv_filename, chunk_size= 100000):
        self.csv_filename = csv_filename
        self.chunk_size = chunk_size  # lines
        self.chunk_dir =  tempfile.TemporaryDirectory()
        self.chunk_files = []
        self._split()

    def _split(self):
        self.chunk_files = []
        with open(self.csv_filename, 'r') as f:
            count = 0
            header = f.readline()
            lines = []
            for line in f:
                count += 1
                lines.append(line)
                if count % self.chunk_size == 0:
                    self.write_chunk(header, count // self.chunk_size, lines)
                    lines = []
                    
            # write remainder
            if len(lines) > 0:
                self.write_chunk((count // self.chunk_size) + 1, lines)
                
    def write_chunk(self, header, part, lines):
        chunk_filename = f"{self.chunk_dir.name}/data_part_{str(part)}.csv"
        with open(chunk_filename, 'w') as f_out:
            f_out.write(header)
            f_out.writelines(lines)
            self.chunk_files.append(chunk_filename)
    
    def csv_files(self):
        return self.chunk_files
    
    def cleanup(self):
        self.chunk_dir.cleanup()
        

    

In [3]:
from contextlib import contextmanager
from timeit import default_timer as timer
import sys

@contextmanager
def time_and_log(name: str, silent: bool = False):
  """Time and log block (using `with` statement)"""

  if not silent:
    caption = name if name else 'operation'
    print(f"[{caption}] Starting...")
    sys.stdout.flush()
  started_at = timer()
  yield()
  elapsed = timer() - started_at
  if not silent:
    print(f"[{caption}] Processed in in {elapsed:.2f}s.")
    sys.stdout.flush()
  return elapsed


In [None]:
from multiprocessing import Pool
def test_apply(row):
    return row.iloc[0]

def process(chunk):  
    # print("processing")
    df = pd.read_csv(chunk, skipinitialspace=True, index_col=None)    
    foo = df.apply(test_apply, axis=1)
    return len(df)
    
@time_and_log("test_batching")
def test_csv_batching():
    csv_splitter = CSVSplitter(csv, 100000)
    try:
        print(f"Pooling against {len(csv_splitter.csv_files())} files")
        with Pool(5) as p:
            for result in p.map(process, csv_splitter.csv_files()):                
                print(result)
            
    finally:
        csv_splitter.cleanup()

test_csv_batching()

[test_batching] Starting...
Pooling against 50 files


Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.12/3.12.2/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/python@3.12/3.12.2/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/python@3.12/3.12.2/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/queues.py", line 389, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'process' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Process SpawnPoolWorker-2:
Traceback (most

In [None]:
def test_apply(row):
    return row.iloc[0]

@time_and_log("test_pandas")
def test_pandas():
    df = pd.read_csv(csv, skipinitialspace=True, index_col=None)
    foo = df.apply(test_apply, axis=1)
    return foo

test_pandas()
