In [1]:
import datetime

class Timer(object):
    """A simple timer class"""
    
    def __init__(self):
        pass
    
    def start(self):
        """Starts the timer"""
        self.start = datetime.datetime.now()
        return self.start
    
    def stop(self, message="Total: "):
        """Stops the timer.  Returns the time elapsed"""
        self.stop = datetime.datetime.now()
        return message + str(self.stop - self.start)
    
    def now(self, message="Now: "):
        """Returns the current time with a message"""
        return message + ": " + str(datetime.datetime.now())
    
    def elapsed(self, message="Elapsed: "):
        """Time elapsed since start was called"""
        return message + str(datetime.datetime.now() - self.start)
    
    def split(self, message="Split started at: "):
        """Start a split timer"""
        self.split_start = datetime.datetime.now()
        return message + str(self.split_start)
    
    def unsplit(self, message="Unsplit: "):
        """Stops a split. Returns the time elapsed since split was called"""
        return message + str(datetime.datetime.now() - self.split_start)

In [2]:
import pandas as pd
import multiprocessing as mp

LARGE_FILE = "/Users/manuel/development/thesis/overlap/filtered_hg19DNase_H3K27ac_dbSUPER_overlapped.csv"
CHUNKSIZE = 100000 # processing 100,000 rows at a time
PROCESSES = 1

def process_frame(df):
        # process data frame
        return len(df)
    
timer = Timer()
timer.start()
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
pool = mp.Pool(PROCESSES) # use 4 processes

funclist = []
for df in reader:
    # process each data frame
    f = pool.apply_async(process_frame,[df])
    funclist.append(f)

pool.close()
print(timer.elapsed())


Elapsed: 0:00:49.762387


In [4]:
result = 0
for f in funclist:
    result += f.get(timeout=10) # timeout in 10 seconds

print("Processes:", PROCESSES)
print("Rows:", result)
print(timer.stop())

Processes: 1
Rows: 8526833


TypeError: 'datetime.datetime' object is not callable

In [5]:
import sys
sys.getsizeof(LARGE_FILE)

139

In [6]:
#!/usr/bin/env python
import sys
from functools import partial

print(sum(chunk.count('\n') for chunk in iter(partial(sys.stdin.read, 1 << 15))))

TypeError: 'functools.partial' object is not iterable

In [10]:
from joblib import Parallel, delayed
import multiprocessing
    
# what are your inputs, and what operation do you want to 
# perform on each input. For example...
inputs = range(10) 
def processInput(i):
    return i + 1

num_cores = multiprocessing.cpu_count()
    
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)

In [11]:
results

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [12]:
multiprocessing.cpu_count()

8

In [17]:
import pandas as pd
import numpy as np
df1 = pd.DataFrame(np.random.rand(5,3), columns=('A','B','C'))
df2 = pd.DataFrame(np.random.rand(5,3), columns=('A','B','C'))

list_of_dataframes = [df1, df2]
df = pd.concat(list_of_dataframes)

In [18]:
df

Unnamed: 0,A,B,C
0,0.758565,0.942706,0.760712
1,0.251083,0.620642,0.035258
2,0.368615,0.585478,0.942799
3,0.058856,0.130447,0.346643
4,0.815392,0.555016,0.972334
0,0.753393,0.797039,0.901714
1,0.952692,0.887806,0.489298
2,0.869094,0.816985,0.531164
3,0.285997,0.849833,0.529344
4,0.902823,0.153098,0.179003
