In [1]:
import gc
import os
import threading
import timeit

In [2]:
import multiprocessing
import ray
ray.init(num_cpus=multiprocessing.cpu_count() // 2) # Circumvent SMT
import modin.pandas as pd
import pandas as realpd

2021-10-31 19:13:54,542	INFO services.py:1263 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [3]:
df = pd.read_csv("fhv_tripdata_2021-07.csv")

In [4]:
c_DOLocation = "DOLocationID"
c_dropoff_datetime = "dropoff_datetime"
c_PULocation = "PULocationID"

def order_one(df):
    # Filter DOLocationID first (probably slower?)
    dolocation = df[c_DOLocation]
    t1 = df[dolocation.notna()]
    pulocation = t1[c_PULocation]
    return t1[pulocation.notna()]

def order_two(df):
    # Filter PULocationID first (probably faster?)
    pulocation = df[c_PULocation]
    t1 = df[pulocation.notna()]
    dolocation = t1[c_DOLocation]
    return t1[dolocation.notna()]

In [66]:
print(df.dtypes)
for c, tpe in df.dtypes.iteritems():
    if "float" in str(tpe):
        print(tpe)
c = df["SR_Flag"]
print(c.describe())
print(c.notna().describe())
print(c.notna())
print(pd.cut(c[c.notna()].values, bins=10))
print(pd.cut(c[c.notna()].values, bins=10, labels=[f"bin{i}" for i in range(10)]))

dispatching_base_num       object
pickup_datetime            object
dropoff_datetime           object
PULocationID              float64
DOLocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
Name: 0, dtype: object
float64
float64
float64
count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: SR_Flag, dtype: float64
count     1197985
unique          1
top         False
freq      1197985
Name: SR_Flag, dtype: object
0          False
1          False
2          False
3          False
4          False
           ...  
1197980    False
1197981    False
1197982    False
1197983    False
1197984    False
Name: SR_Flag, Length: 1197985, dtype: bool


IndexError: tuple index out of range

In [8]:
# Assume for now that all dataframes are immutable, so we can just maintain a global store of histograms
# In the future, we can store a histogram dataframe as a member of each dataframe, then invalidate it when
# an in-place updating function is invoked
# TODO do something for non-integer types? also add a special bin for NaN/nulls

histograms = {} # Maps id(df): histogram

In [36]:
def histogram(df, bins=10): # Technically, should key the dict on #bins too, but whatever
    """
    Computes and caches a dataframe containing bin information for each numeric column in df.
    
    The count of null/NaN fields in each column is also stored.
    """
    # if id(df) in histograms:
    #     return histograms[id(df)]
    hist_df = pd.DataFrame(index=["none", "size"
                                 ] + [f"bin{i}" for i in range(bins)] + [f"__lbound_{i}" for i in range(bins + 1)])
    for colname, tpe in df.dtypes.iteritems():
        if "float" in str(tpe) or "int" in str(tpe):
            col = df[colname]
            notna_mask = col.notna()
            nacol = pd.Series({"none": col.isna().sum()})
            if notna_mask.any():
                vals, bounds = pd.cut(col[notna_mask].values, bins=bins, retbins=True, labels=[f"bin{i}" for i in range(bins)])
                binned = pd.Series(vals.value_counts()).append(pd.Series({f"__lbound_{i}": b for i, b in enumerate(bounds)}))
            else:
                binned = pd.Series({f"bin{i}": 0 for i in range(bins)}).append(
                    pd.Series({f"__lbound_{i}": 0 for i in range(bins + 1)}))
            # Need to call pd.Series again in order to wrap the vanilla pandas frame
            nacol = nacol.append(binned).append(pd.Series({"size": df.size}))
            # ignore the bound for the "none" column
            hist_df = hist_df.join(pd.DataFrame({colname: nacol}), how="left")
    histograms[id(df)] = hist_df
    return hist_df

histogram(df)



Unnamed: 0,PULocationID,DOLocationID,SR_Flag
none,997007.0,168971.0,1197985
size,8385895.0,8385895.0,8385895
bin0,17537.0,105065.0,0
bin1,15127.0,99637.0,0
bin2,22177.0,149891.0,0
bin3,22193.0,81395.0,0
bin4,20711.0,90570.0,0
bin5,13175.0,61559.0,0
bin6,26806.0,108687.0,0
bin7,17929.0,70318.0,0


In [54]:
class DfOp:
    def apply(self, df):
        """Apply this operation to the dataframe in question."""
        pass
    
    
class Comparison:
    EQ = 0
    NE = 1
    LT = 2
    LTE = 3
    GT = 4
    GTE = 5
    
    @staticmethod
    def size_estimate(df, colname, comp, value):
        if value is None and comp == Comparison.NE:
            hist = histograms[id(df)]
            size = hist[colname]["size"]
            na_size = hist[colname]["none"]
            return 1 - (na_size / size)
        else:
            raise NotImplementedError()
    
    @staticmethod
    def get_mask(col, comp, value):
        if value is None:
            if comp == Comparison.EQ:
                return col.isna()
            elif comp == Comparison.NE:
                return col.notna()
            else:
                raise NotImplementedError()
        elif comp == Comparison.EQ:
            return col == value
        elif comp == Comparison.NE:
            return col != value
        elif comp == Comparison.LT:
            return col < value
        elif comp == Comparison.LTE:
            return col <= value
        elif comp == Comparison.GT:
            return col > value
        elif comp == Comparison.GTE:
            return col >= value
        
        
class FilterOp(DfOp):
    def __init__(self, colname, comp, value):
        self.colname = colname
        self.comp = comp
        self.value = value
    
    def apply(self, df):
        mask = Comparison.get_mask(df[self.colname], self.comp, self.value)
        return df[mask]
        
            
class DeferredQuery:
    def __init__(self, df):
        self.df = df
        self.operations = []
    
    def compute(self):
        """
        Examines the list of operations and reorders them for efficiency.
        For now, the only operation that is reordered is filtering, in which case a filtering predicate which
        would result in a smaller dataset is applied first.
        
        Afterwards, these operations are applied to compute a new dataframe.
        The self.operations field remains unchanged.
        """
        newops = []
        df = self.df
        # All filters are interchangeable, so we just need to iterate over all of them. For now, assume
        # there are exactly either 0, 1, or 2 filters in the list for simplicity.
        l = len(self.operations)
        if l == 0:
            return df
        elif l == 1:
            return self.operations[0].apply(df)
        elif l == 2:
            # There might be some scenarios where if 2 columns are strongly correlated, filtering by one
            # might change the cardinality estimate of another -- we'll worry about that later
            op0 = self.operations[0]
            op1 = self.operations[1]
            card0 = Comparison.size_estimate(df, op0.colname, op0.comp, op0.value)
            card1 = Comparison.size_estimate(df, op1.colname, op1.comp, op1.value)
            if card0 > card1:
                # Apply op1 first (innermost)
                return op0.apply(op1.apply(df))
            else:
                return op1.apply(op0.apply(df))
        else:
            raise NotImplementedError()
        return result
        
    
    def filter(self, filterop):
        self.operations.append(filterop)
        return self


In [61]:
def order_one(df):
    # Filter PULocationID first (probably slower?)
    histogram(df)
    q = DeferredQuery(df)
    q \
        .filter(FilterOp("DOLocationID", Comparison.NE, None)) \
        .filter(FilterOp("PULocationID", Comparison.NE, None)) \
        .compute()
    
def order_two(df):
    # Filter DOLocationID first (probably slower?)
    histogram(df)
    q = DeferredQuery(df)
    q \
        .filter(FilterOp("DOLocationID", Comparison.NE, None)) \
        .filter(FilterOp("PULocationID", Comparison.NE, None)) \
        .compute()

In [None]:
dup_counts = [1, 3, 10, 50]
filenames = [(f"dup_{i}_" if i != 1 else "") + "fhv_tripdata_2021-07.csv" for i in dup_counts]
for i, fn in zip(dup_counts, filenames):
    if not os.path.exists(fn):
            os.system(f"./repeat_csv.sh fhv_tripdata_2021-07.csv {i}")
            # t = pd.read_csv("fhv_tripdata_2021-07.csv")

c_DOLocation = "DOLocationID"
c_dropoff_datetime = "dropoff_datetime"
c_PULocation = "PULocationID"

def make_test_fn(pd_handle, csv_path, test_fn):
    df = pd_handle.read_csv(csv_path)
    return lambda: test_fn(df)

for fn in filenames:
    gc.collect()
    print("Timing order one on naive pandas with input", fn)
    print(timeit.timeit(make_test_fn(realpd, fn, order_one), number=10))

    gc.collect()
    print("Timing order one on modin with input", fn)
    print(timeit.timeit(make_test_fn(pd, fn, order_one), number=10))

    gc.collect()
    print("Timing order two on naive pandas with input", fn)
    print(timeit.timeit(make_test_fn(realpd, fn, order_two), number=10))

    gc.collect()
    print("Timing order two on modin with input", fn)
    print(timeit.timeit(make_test_fn(pd, fn, order_two), number=10))


Timing order one on naive pandas with input fhv_tripdata_2021-07.csv




1.2192974019999383
Timing order one on modin with input fhv_tripdata_2021-07.csv
