In [297]:
import pandas as pd
import numpy as np

In [298]:
def generate_df(start_date, end_date, N, n_cols):
    dates = pd.date_range(start=start_date, end=end_date, freq='B')
    n_values = [x + 1 for x in range(N)]
    index = pd.MultiIndex.from_product([dates, n_values], names=['date', 'x'])
    columns = [f'f{i+1}' for i in range(n_cols)]
    df = pd.DataFrame(index=index, data=np.random.random((len(index), n_cols)), columns=columns)
    return df

In [336]:
x = generate_df('2000-01-01', '2005-01-05', 1000, 5)
y = generate_df('2000-01-01', '2005-01-05', 1000, 5)

In [337]:
x_groups = x.groupby(level='date').groups
y_groups = y.groupby(level='date').groups

In [338]:
a.get_level_values(0).values

array(['2000-01-05T00:00:00.000000000', '2000-01-05T00:00:00.000000000',
       '2000-01-05T00:00:00.000000000', '2000-01-05T00:00:00.000000000',
       '2000-01-05T00:00:00.000000000', '2000-01-05T00:00:00.000000000',
       '2000-01-05T00:00:00.000000000', '2000-01-05T00:00:00.000000000',
       '2000-01-05T00:00:00.000000000', '2000-01-05T00:00:00.000000000'],
      dtype='datetime64[ns]')

In [339]:
def dotself(xy):
    x, y = xy
    res = x.dot(y.T)
    res.columns = res.columns.droplevel(0)
    return res

In [340]:
def dot_xy(x, y):
    res = x.dot(y.T)
    res.columns = res.columns.droplevel(0)
    return res

In [None]:
pair_groups = [(x.loc[x_groups[d]], y.loc[y_groups[d]]) for d in x_groups.keys() & y_groups.keys()]

In [None]:
pd.concat(map(dotself, pair_groups))

In [None]:
from numba import jit

@jit(parallel=True)
def dotnp(xy):
    x, y, x_dates, x_securities, y_securities = xy
    res = np.dot(x, y.T)
    return res, x_dates, x_securities, y_securities

In [None]:
def np_to_pd(values, x_dates, x_securities, y_securities):
    return pd.DataFrame(data=values, index=zip(x_dates, x_securities), columns=y_securities)

In [None]:
def np_to_pd2(x):
    values, x_dates, x_securities, y_securities = x
    return pd.DataFrame(data=values, index=pd.MultiIndex.from_arrays([x_dates, x_securities]), columns=y_securities)

In [None]:
%%timeit
x_groups = x.groupby(level='date').groups
y_groups = y.groupby(level='date').groups
pair_groups = [(x.loc[x_groups[d]], y.loc[y_groups[d]]) for d in x_groups.keys() & y_groups.keys()]
pd.concat(map(dotself, pair_groups))

In [310]:
import dask

In [311]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=4)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38435 instead


0,1
Client  Scheduler: tcp://127.0.0.1:39331  Dashboard: http://127.0.0.1:38435/status,Cluster  Workers: 4  Cores: 16  Memory: 15.57 GiB


In [312]:
%%timeit
x_groups = x.groupby(level='date').groups
y_groups = y.groupby(level='date').groups
pair_groups_np = [(x.loc[x_groups[d]].values, y.loc[y_groups[d]].values, x.loc[x_groups[d]].index.get_level_values(0).values, x.loc[x_groups[d]].index.get_level_values(1).values, y.loc[x_groups[d]].index.get_level_values(1).values) for d in x_groups.keys() & y_groups.keys()]

futures = client.map(dotnp, pair_groups_np)
futures2 = client.map(np_to_pd2, futures)
results = client.gather(futures2)
pd.concat(results)

10.4 s ± 1.25 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [313]:
%%timeit
import pandas as pd
from joblib import Parallel, delayed

all_keys = x_groups.keys() & y_groups.keys()

pd.concat(Parallel(n_jobs=-1)(delayed(dot_xy)(x.loc[x_groups[k]], y.loc[y_groups[k]]) for k in all_keys))

3.75 s ± 124 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [322]:
import cudf
import cupy as cp

In [323]:
x_cudf = cudf.DataFrame(x)
y_cudf = cudf.DataFrame(y)

In [335]:
%%timeit
x_groups = x_cudf.groupby(level='date').groups
y_groups = y_cudf.groupby(level='date').groups
pair_groups = [(x_cudf.loc[x_groups[d]], y_cudf.loc[y_groups[d]]) for d in x_groups.keys() & y_groups.keys()]

def dot_cudf(xy):
    x, y = xy
    return cp.dot(x.values, y.T.values)

res = [dot_cudf(pair_group) for pair_group in pair_groups]

26.6 s ± 146 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [326]:
!nvidia-smi

Wed Apr 21 22:19:03 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.56       Driver Version: 460.56       CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  GeForce RTX 3060    Off  | 00000000:01:00.0 Off |                  N/A |
|  0%   39C    P2    36W / 170W |   1006MiB / 12045MiB |      1%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces