# split dataframe by key and do stuff over partitions

allows joining / grouping

 1. map column(s) to hash, mod to map rows to partition
 2. create individual file for each partition
 3. if several dfs have been partitioned, can do joins on partitions separately (and in parallel)

In [1]:
import os
import pandas as pd
import numpy as np
import toolz
from pdpart import Partitioned

def make_test_data(n, filename=None):
    chars = [chr(i) for i in range(ord('a'), ord('z') + 1)]
    df = pd.DataFrame({"key": np.random.choice(chars, size=n),
                       "val": np.random.rand(n)})
    if filename is not None:
        df.to_csv(filename, index=False)
    return df

DATA_DIR = "../data"
filename = os.path.join(DATA_DIR, "test.csv")
data = make_test_data(401, filename)

In [5]:
# split a dataframe into partitions by column "key"

parts = Partitioned(os.path.join(DATA_DIR, "parts"), by="key", n_partition=7, compression="gzip").init_dir()
[parts.append(_df) for _df in pd.read_csv(filename, chunksize=100)]
    
# do something on partitions
def do_sth(fn):
    df = pd.read_csv(fn, compression="gzip")
    return df.val.sum()

# check correct
np.allclose(sum(toolz.map(do_sth, parts.partitions())), data.val.sum())

True

In [8]:
# apply transformations per partition and store in new directory

def transform(fn_in, fn_out, compression=None):
    df = pd.read_csv(fn_in, compression=compression)
    df["val"] *= 2.
    df.to_csv(fn_out, index=False, compression=compression)

dest = Partitioned.from_existing(os.path.join(DATA_DIR, "dest"), parts).init_dir()
for fn_in, fn_out in zip(parts.partitions(), dest.partitions()):
    transform(fn_in, fn_out, "gzip")
   
# check correct
np.allclose(sum(toolz.map(do_sth, dest.partitions())), 2 * data.val.sum())

True

In [None]:
%ls ../data/*