In [1]:
import pandas as pd; import numpy as np; import janitor as jn
from numba import njit, prange

In [2]:
from subprocess import Popen
from contextlib import contextmanager
from os import getpid
from time import sleep
from signal import SIGINT

@contextmanager
def perf_stat():
    p = Popen(["perf", "stat", "-p", str(getpid())])
    sleep(0.5)
    yield
    p.send_signal(SIGINT)

In [3]:
@njit(parallel=True)
def _get_indices_dual_non_monotonic_non_equi(
    left_region: np.ndarray,
    right_region: np.ndarray,
    left_index: np.ndarray,
    right_index: np.ndarray,
    starts: np.ndarray,
    counts: np.ndarray,
):
    """
    Retrieves the matching indices
    for the left and right regions.
    Strictly for non-equi joins,
    where only two join conditions are present.
    """
    # two step pass
    # first pass gets the length of the final indices
    # second pass populates the final indices with actual values
    count_indices = np.empty(counts.size, dtype=np.intp)
    total_length = np.intp(0)
    for num in prange(counts.size):
        l_region = left_region[num]
        size = counts[num]
        start = starts[num]
        counter = np.intp(0)
        for n in range(size):
            r_region = right_region[start + n]
            out = (l_region <= r_region)
            total_length += out
            counter += out
        count_indices[num] = counter
    start_indices = np.zeros(starts.size, dtype=np.intp)
    start_indices[1:] = np.cumsum(count_indices)[:-1]
    l_index = np.empty(total_length, dtype=np.intp)
    r_index = np.empty(total_length, np.intp)
    for num in prange(starts.size):
        indexer = start_indices[num]
        size = counts[num]
        l_ind = left_index[num]
        r_indexer = starts[num]
        l_region = left_region[num]
        width = count_indices[num]
        # if width == size, 
        # no need for comparision within the iteration
        if width == size:
            for n in range(size):
                l_index[indexer+n] = l_ind
                r_index[indexer+n] = right_index[r_indexer + n]
        else:
            for n in range(size):
                if not width:
                    break
                pos_right = r_indexer + n
                r_region = right_region[pos_right]
                if l_region > r_region:
                    continue
                l_index[indexer] = l_ind
                r_index[indexer] = right_index[pos_right]
                indexer += 1
                width -= 1
    return l_index, r_index

In [4]:
with perf_stat():
    _get_indices_dual_non_monotonic_non_equi(np.array([1, 3, 1, 3, 2, 2]),
 np.array([1, 3, 3, 2, 2, 1, 3]),
 np.array([2, 0, 1, 3, 4, 6]),
 np.array([3, 4, 6, 0, 5, 2, 1]),
 np.array([0, 1, 1, 5, 5, 6]),
 np.array([7, 6, 6, 2, 2, 1]))

In [5]:
@njit(parallel=True)
def _get_indices_dual_non_monotonic_non_equi(
    left_region: np.ndarray,
    right_region: np.ndarray,
    left_index: np.ndarray,
    right_index: np.ndarray,
    starts: np.ndarray,
    counts: np.ndarray,
):
    """
    Retrieves the matching indices
    for the left and right regions.
    Strictly for non-equi joins,
    where only two join conditions are present.
    """
    # two step pass
    # first pass gets the length of the final indices
    # second pass populates the final indices with actual values
    count_indices = np.empty(counts.size, dtype=np.intp)
    total_length = np.intp(0)
    for num in prange(counts.size):
        l_region = left_region[num]
        size = counts[num]
        start = starts[num]
        counter = np.intp(0)
        for n in range(size):
            r_region = right_region[start + n]
            out = (l_region <= r_region)
            total_length += out
            counter += out
        count_indices[num] = counter
    start_indices = np.zeros(starts.size, dtype=np.intp)
    start_indices[1:] = np.cumsum(count_indices)[:-1]
    l_index = np.empty(total_length, dtype=np.intp)
    r_index = np.empty(total_length, np.intp)
    for num in prange(starts.size):
        indexer = start_indices[num]
        size = counts[num]
        l_ind = left_index[num]
        r_indexer = starts[num]
        l_region = left_region[num]
        width = count_indices[num]

        for n in range(size):
            if not width:
                break
            pos_right = r_indexer + n
            r_region = right_region[pos_right]
            is_true = l_region <= r_region
            # if l_region > r_region:
            #     continue
            l_index[indexer] = l_ind if is_true else l_index[indexer]
            r_index[indexer] = right_index[pos_right] if is_true else r_index[indexer]
            indexer += 1 if is_true else indexer
            width -= 1 if is_true else width
    return l_index, r_index


 Performance counter stats for process id '1565526':

          1,282.62 msec task-clock                       #    0.757 CPUs utilized             
               189      context-switches                 #  147.355 /sec                      
                 1      cpu-migrations                   #    0.780 /sec                      
            21,886      page-faults                      #   17.064 K/sec                     
     5,506,678,805      cpu_core/cycles/                 #    4.293 GHz                         (99.53%)
     3,360,007,592      cpu_atom/cycles/                 #    2.620 GHz                         (0.49%)
    10,260,607,455      cpu_core/instructions/           #    1.86  insn per cycle              (99.49%)
       661,426,637      cpu_atom/instructions/           #    0.12  insn per cycle              (0.52%)
     2,081,996,954      cpu_core/branches/               #    1.623 G/sec                       (99.44%)
       171,493,789      cpu_atom/branches/

In [6]:
with perf_stat():
    _get_indices_dual_non_monotonic_non_equi(np.array([1, 3, 1, 3, 2, 2]),
 np.array([1, 3, 3, 2, 2, 1, 3]),
 np.array([2, 0, 1, 3, 4, 6]),
 np.array([3, 4, 6, 0, 5, 2, 1]),
 np.array([0, 1, 1, 5, 5, 6]),
 np.array([7, 6, 6, 2, 2, 1]))


 Performance counter stats for process id '1565526':

          1,007.48 msec task-clock                       #    0.723 CPUs utilized             
                68      context-switches                 #   67.495 /sec                      
                 7      cpu-migrations                   #    6.948 /sec                      
            20,592      page-faults                      #   20.439 K/sec                     
     3,956,685,454      cpu_core/cycles/                 #    3.927 GHz                         (92.90%)
     1,224,242,849      cpu_atom/cycles/                 #    1.215 GHz                         (0.81%)
     7,441,371,624      cpu_core/instructions/           #    1.88  insn per cycle              (92.86%)
       825,232,458      cpu_atom/instructions/           #    0.21  insn per cycle              (3.73%)
     1,494,840,654      cpu_core/branches/               #    1.484 G/sec                       (92.80%)
       166,707,911      cpu_atom/branches/

In [7]:
events = pd.read_csv('/Users/samuel.oranyeli/Downloads/results.csv', parse_dates=['start','end']).iloc[:, 1:]

FileNotFoundError: [Errno 2] No such file or directory: '/Users/samuel.oranyeli/Downloads/results.csv'

In [None]:
events.dtypes

In [None]:
a = (events
.conditional_join(
    events,
    ('start', 'end', '<='),
    ('end', 'start', '>='),
    # ('id', 'id', '!='),
    # ('audience','audience','>'),
    use_numba = True,
    df_columns = ['id', 'start', 'end'],
    right_columns = ['id', 'start', 'end'])
)

a

In [None]:
# %%timeit
# (events
# .conditional_join(
#     events,
#     ('start', 'end', '<='),
#     ('end', 'start', '>='),
#     # ('id', 'id', '!='),
#     # ('audience','audience','>'),
#     use_numba = True,
#     df_columns = ['id', 'start', 'end'],
#     right_columns = ['id', 'start', 'end'])
# )

In [None]:
# %%timeit
# (events
# .conditional_join(
#     events,
#     ('start', 'end', '<='),
#     ('end', 'start', '>='),
#     # ('id', 'id', '!='),
#     # ('audience','audience','>'),
#     use_numba = False,
#     df_columns = ['id', 'start', 'end'],
#     right_columns = ['id', 'start', 'end'])
# )

In [None]:
(events
.conditional_join(
    events,
    ('start', 'end', '<='),
    ('end', 'start', '>='),
    ('id', 'id', '!='),
    # ('audience','audience','>'),
    use_numba = False,
    df_columns = ['id', 'start', 'end'],
    right_columns = ['id', 'start', 'end'])
)

In [None]:
# # %%timeit
# (events
# .conditional_join(
#     events,
#     ('start', 'end', '>='),
#     ('end', 'start', '<='),
#     ('id', 'id', '!='),
#     # ('audience','audience','>'),
#     use_numba = True,
#     df_columns = ['id', 'start', 'end'],
#     right_columns = ['id', 'start', 'end'])
# )

In [None]:
C=dict(key=[f"c{num}" for num in range(1,8)],
       vol=[35,15,5,35,18,90,17],
       profit=[45,35,55,12,15,55,11],
       unitsSold=[15,10,30,10,15,80,2],
       keyy = range(1,8)
       )
C=pd.DataFrame(C)


D=dict(key=[f'd{num}' for num in range(1,9)],
       vol=[20,50,15,16,40,20,40,2],
       profit=[30,10,12,52,35,20,30,57],
       unitsSold=[20,35,10,12,40,30,5,15],
       keyy=range(8,0,-1)
       )
D=pd.DataFrame(D)



In [None]:
# C = pd.concat([C]*40*40)
# D = pd.concat([D]*40*40)

In [None]:
(C
.conditional_join(
    D, 
    
    ('profit','profit','>='),
    # ('keyy','keyy','=='),
  # ('unitsSold','unitsSold','>='),
  ('vol','vol','<='),
  # ('vol','profit','>'),
    use_numba=True)
)

In [None]:
(C
.conditional_join(
    D, 
    
    ('profit','profit','>='),
    # ('keyy','keyy','=='),
  # ('unitsSold','unitsSold','>='),
  ('vol','vol','<='),
  # ('vol','profit','>'),
    use_numba=False)
)

In [None]:
# %%timeit
# (C
# .conditional_join(
#     D, 
    
#     ('profit','profit','>='),
#     # ('keyy','keyy','=='),
#   # ('unitsSold','unitsSold','>='),
#   ('vol','vol','<='),
#   # ('vol','profit','>'),
#     use_numba=True)
# )

In [None]:
# %%timeit
# (C
# .conditional_join(
#     D, 
    
#     ('profit','profit','>='),
#     # ('keyy','keyy','=='),
#   # ('unitsSold','unitsSold','>='),
#   ('vol','vol','<='),
#   # ('vol','profit','>'),
#     use_numba=False)
# )

In [None]:
C

In [None]:
D.sort_values('profit')

In [None]:
D.sort_values('vol')

In [None]:
# %%timeit
# (C
# .conditional_join(
#     D, 
    
#     ('profit','profit','>='),
#     # ('keyy','keyy','=='),
#   # ('unitsSold','unitsSold','>='),
#   ('vol','vol','<='),
#   # ('vol','profit','>'),
#     use_numba=False)
# )

In [None]:
C

In [None]:
D.sort_values('profit')

In [None]:
# f=(C
# .conditional_join(
#     D, 
    
#     ('profit','profit','>='),
#     # ('keyy','keyy','=='),
#    ('unitsSold','unitsSold','>='),
#  # ('vol','vol','<='),
#   # ('vol','profit','>'),
#     use_numba=True)
# )
# f

In [None]:
from string import ascii_lowercase
np.random.seed(1)
n = 20; k = 20
# n = 20_000_000; k = 20_000
mapp = {k:v for v,k in enumerate(ascii_lowercase)}
idx1 = np.random.randint(0, high = 2_00, size = n)
idx2 = np.random.randint(0, high = 3_00, size = n)

d1 = dict(x = np.random.choice(list(ascii_lowercase[:5]), size=n),
          start = np.minimum(idx1, idx2),
          end = np.maximum(idx1, idx2),
          )


d2 = dict(x = np.random.choice(list(ascii_lowercase[:15]), size=k),
          pos1 = np.random.randint(low=60, high = 151, size=k))

d1 = pd.DataFrame(d1)
d2 = pd.DataFrame(d2)
d1 = d1.assign(xx=lambda df: df.x.map(mapp))
d2 = d2.assign(xx=lambda df: df.x.map(mapp))

In [None]:
bla = (d2
.conditional_join(
    d1, 
    ('pos1', 'start', '>'), 
     ('pos1', 'end', '<'), 
   # ('xx', 'xx', '=='), 
    use_numba=True)
)

bla

In [None]:
x=np.array([6, 6, 6, 6, 2, 4, 5, 6, 5, 6, 5, 6, 6, 6, 5, 1, 6, 6, 3])
x

In [None]:
np.partition(bla[1],248842-248212)[248842-248212:]

In [None]:
%timeit bla[1][np.argpartition(bla[1],248842-248212)[248842-248212:]]

In [None]:
%timeit bla[1][bla[1]>=91]

In [None]:
bla[1][bla[1]>=91]

In [None]:
%timeit x[x>=6]

In [None]:
np.partition(x,7)

In [None]:
%timeit x[np.argpartition(x,7)[8:]]

In [None]:
%timeit x[np.argpartition(x,7)]

In [None]:
len([3, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6])

In [None]:
np.searchsorted([3, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], 1)

In [None]:
bla[3].size

In [None]:
(bla[5][:2410367]>=91).sum()

In [None]:
%timeit bla[5][:2410367][bla[5][:2410367]>=91]

In [None]:
%timeit np.sort(bla[1][97:97+14458])

In [None]:
pd.unique(bla[5])

In [None]:
bla[1].size

In [None]:
uniques = pd.factorize(bla[5],sort=True)[1]
uniques

In [None]:
%timeit uniques.searchsorted(91)

In [None]:
%timeit uniques.searchsorted(bla[5][:2410367])

In [None]:
pd.unique(pd.factorize(bla[1],sort=True)[0])