In [None]:
import pandas as pd
from psutil import cpu_count
from dask import dataframe as dd
from dask.multiprocessing import get
import timeit
import warnings


def pd_apply(df, myfunc, *args, **kwargs):
    def wrapped():
        if type(df) == pd.DataFrame:
            pd.concat([df[c].apply(myfunc, args=args, **kwargs) for c in df.columns], axis=1)
        else:
            df.apply(myfunc, args=args, **kwargs)
    return wrapped


def dask_apply(df, npartitions, myfunc, *args, **kwargs):
    if type(df) == pd.DataFrame:
        tmp = kwargs.pop('meta')
        meta = {c: tmp[c].dtype for c in tmp.columns}
        try:
            return dd.from_pandas(df, npartitions=npartitions).apply(myfunc, *args, **kwargs, axis=1, meta=meta).compute(get=get)
        except:
            warnings.warn('Dask applymap not working correctly. Concatenating swiftapplies instead.')
            return pd.concat([swiftapply(df[c], myfunc, *args, **kwargs) for c in df.columns], axis=1)
    else:
        meta = kwargs.pop('meta')
        try:
            return dd.from_pandas(df, npartitions=npartitions).map_partitions(myfunc, *args, **kwargs, meta=meta).compute(get=get)
        except:
            return dd.from_pandas(df, npartitions=npartitions).map(lambda x: myfunc(x, *args, **kwargs), meta=meta).compute(get=get)
        

def estimate_and_apply(df, npartitions, dask_threshold, myfunc, *args, **kwargs):
    try:
        samp = df.iloc[:1000]
    except:
        samp = df.iloc[:df.shape[0]/10]

    wrapped = pd_apply(samp, myfunc, *args, **kwargs)
    n_repeats = 3
    timed = timeit.timeit(wrapped, number=n_repeats)
    samp_proc_est = timed/n_repeats
    est_apply_duration = samp_proc_est / len(samp) * df.shape[0]

    # Get meta information for dask, and check if output is str 
    if type(df) == pd.DataFrame:
        kwargs['meta'] = pd.concat([df.loc[:2, c].apply(myfunc, args=args, **kwargs) for c in df.columns], axis=1)
        str_target = object in kwargs['meta'].dtypes.values
    else:
        kwargs['meta'] = df.iloc[:2].apply(myfunc, args=args, **kwargs)
        str_target = object == kwargs['meta'].dtypes

    # if pandas apply takes too long and output is not str, use dask
    if (est_apply_duration > dask_threshold) and (not str_target):
        return dask_apply(df, npartitions, myfunc, *args, **kwargs)
    else:  # use pandas
        kwargs.pop('meta')
        if type(df) == pd.DataFrame:
            return pd.concat([df[c].apply(myfunc, args=args, **kwargs) for c in df.columns], axis=1)
        else:
            return df.apply(myfunc, args=args, **kwargs)
        
        
def swiftapply(df, myfunc, *args, **kwargs):
    """
    Efficiently apply any function to a pandas dataframe or series
    in the fastest available manner
    :param df: The dataframe or series to apply the function to
    :param myfunc: The function you wish to apply
    :param args: The positional arguments of the function
    :param kwargs: The key word arguments of the function
        You can also specify npartitions and dask_threshold
        npartitions will affect the speed of dask multiprocessing
        dask_threshold is the maximum allowed time (in seconds) for a normal pandas apply
            before switching to a dask operation
    :return: The new dataframe/series with the function applied as quickly as possible
    """
    if 'npartitions' in kwargs.keys():
        npartitions = kwargs.pop('npartitions')
    else:
        npartitions = cpu_count() * 2
    if 'dask_threshold' in kwargs.keys():
        dask_threshold = kwargs.pop('dask_threshold')
    else:
        dask_threshold = 1
    
    if myfunc is not str:
        # If we are manipulating strings, we don't want to try vectorization
        if type(df) == pd.DataFrame:
            str_source = object in df.dtypes.values
        else:
            str_source = object == df.dtypes
            
        if str_source:
            return estimate_and_apply(df, npartitions, dask_threshold, myfunc, *args, **kwargs)
                          
        try:  # try to vectorize
            if type(df) == pd.DataFrame:
                return pd.concat([pd.Series(myfunc(df[c], *args, **kwargs), name=c) for c in df.columns], axis=1)
            else:
                return myfunc(df, *args, **kwargs)
        except:  # if can't vectorize, estimate time to pandas apply
            return estimate_and_apply(df, npartitions, dask_threshold, myfunc, *args, **kwargs)

    else:
        return df.astype(str)
