In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import math
from pathlib import Path
import pandas as pd
import numpy as np
from numba import cuda
from time import perf_counter
from multiprocessing import shared_memory
from multiprocessing.pool import ThreadPool
from itertools import product
from tqdm import tqdm
from data import get_ref_spectra_from_df, batches, mkdir, spectra_peaks_to_tensor
from kernel import compile
from cosine import similarity

In [3]:
## Define constants
tolerance: float = 0.1
shift: float = 0
mz_power: float = 0
int_power: float = 1

## How many pairs per batch. Has to be a power of 2.
# Hardware specific - An RTX2070 works best at around 1024 * 2
# But Colab T4 GPU might work best at 1024 * 4
BATCH_SIZE = 1024

# MATCH_LIMIT specifies max how many mz-mz pairs we could consider for each RQ pair, before we sort and filter. 
# E.g. a value of 256 usually causes around ~0.003% of RQ pairs to "overflow".
# The overflown RQ scores will be strictly less than or equal to perfectly accurate score.
# The mean absolute difference at 256, for all overflown pairs is on the order of ~1e-3
# Small values of MATCH_LIMIT (e.g. 128, 64,) cause a dramatic speedup in the processing speed.
MATCH_LIMIT = 256

## GPU-specific constants
THREADS_PER_BLOCK = (32, 32)
BLOCKS_PER_GRID_X = math.ceil(BATCH_SIZE / THREADS_PER_BLOCK[0])
BLOCKS_PER_GRID_Y = math.ceil(BATCH_SIZE / THREADS_PER_BLOCK[1])
BLOCKS_PER_GRID = (BLOCKS_PER_GRID_X, BLOCKS_PER_GRID_Y)

# Since Greedy cosine is an unstable algorithm, because approximate mz-mz values do not
# result in approximately the same scores and number of matches.
# So we need to use fp64 to minimize the deviation as much as possible.
# Using float32 causes a significant speedup in the processing speed.
dtype = 'float64'

# Data path
reference_csv_file = "data/input/example_dataset_tornike.csv"
query_csv_file = "data/input/example_dataset_tornike.csv"
output_dir = 'data/output/'

# Limits
# We consider only first LIMIT number of entries in CSVs
LIMIT = 2048

# For keeping track of experiments
CONFIG = dict(
    tolerance = tolerance,
    shift = shift,
    mz_power = mz_power,
    int_power = int_power,
    match_limit = MATCH_LIMIT,
    batch_size = BATCH_SIZE,
    limit = LIMIT,
)

In [4]:
# We load CSV files using multiple threads
ref_spectra_df_path = Path(reference_csv_file)
ref_spectra_df = pd.read_csv(ref_spectra_df_path)
references = get_ref_spectra_from_df(ref_spectra_df, limit=LIMIT)

query_spectra_df_path = Path(query_csv_file)
query_spectra_df = pd.read_csv(query_spectra_df_path)
queries = get_ref_spectra_from_df(query_spectra_df, limit=LIMIT)

print(f"We have {len(ref_spectra_df)} references and {len(query_spectra_df)} queries")

100%|██████████| 2048/2048 [00:03<00:00, 638.48it/s] 
100%|██████████| 2048/2048 [00:00<00:00, 4576.51it/s]


We have 100001 references and 100001 queries


In [5]:
# Numba Just-in-time compiles our kernel and bakes in our constants for performance.
kernel = compile(tolerance=tolerance, shift=shift, 
                 mz_power=mz_power, int_power=int_power, 
                 match_limit=MATCH_LIMIT, batch_size=BATCH_SIZE)

Found 1 CUDA devices
id 0    b'NVIDIA GeForce RTX 2070 with Max-Q Design'                              [SUPPORTED]
                      Compute Capability: 7.5
                           PCI Device ID: 0
                              PCI Bus ID: 1
                                    UUID: GPU-f6e241c8-f0ad-720e-be22-2713a6b0868d
                                Watchdog: Enabled
             FP32/FP64 Performance Ratio: 32
Summary:
	1/1 devices are supported


In [6]:
output_dir = mkdir(output_dir)

TOTAL_BATCHES_X = math.ceil( len(references) / BATCH_SIZE )
TOTAL_BATCHES_Y = math.ceil( len(queries) / BATCH_SIZE)
TOTAL_BATCHES = TOTAL_BATCHES_X * TOTAL_BATCHES_Y
print("Total batches: ", TOTAL_BATCHES)
print(f"Total pairs considered: {len(references)} * {len(queries)} = {len(references) * len(queries)}")

if len(references) % BATCH_SIZE != 0:
    print(f"Since {len(references)} isn't divisible by BATCH_SIZE, last batch will have {len(references) % BATCH_SIZE} empty ROWS at the end")
if len(queries) % BATCH_SIZE != 0:
    print(f"Since {len(queries)} isn't divisible by BATCH_SIZE, last batch will have {len(queries) % BATCH_SIZE} empty COLUMNS at the end")

Total batches:  4
Total pairs considered: 1993 * 1993 = 3972049
Since 1993 isn't divisible by BATCH_SIZE, last batch will have 969 empty ROWS at the end
Since 1993 isn't divisible by BATCH_SIZE, last batch will have 969 empty COLUMNS at the end


In [7]:
# Load each batch in memory so that we don't have to load any R,Q twice
batches_r = []
for rbatch in tqdm(batches(references, BATCH_SIZE), desc="Batch all references"):
    rspec, rlen = spectra_peaks_to_tensor(rbatch, dtype=dtype)
    batches_r.append([rspec, rlen])

batches_q = list()
for qbatch in tqdm(batches(queries, BATCH_SIZE), desc="Batch all queries"):
    qspec, qlen  = spectra_peaks_to_tensor(qbatch, dtype=dtype)
    batches_q.append([qspec, qlen])

Batch all references: 0it [00:00, ?it/s]

Batch all references: 2it [00:00, 22.83it/s]
Batch all queries: 2it [00:00, 18.97it/s]


## Explanation


To understand what we are doing here, let's take a look at this image below:


![alt text](assets/cosine-batch-layout-grid.jpg "Title")


We have 1.5 million references and 100k arrays and want a stupidly large matrix of scores, with 1.5 million rows and 100k columns, where each matrix entry is a result of pairwise GreedyCosine. All entries are independent and can be computed in parallel. Even with high-CPU count (my machine has 8 CPU, estimate it takes 200 hours

GPUs are fundamentally a large 2D grid of very small CPUs. There are several ways of making our problem "fit" to the enviroment of GPUs, and I have chosen the following layout as shown above.

GPU can processes a single batch at a time - per-batch processing speed is near-instatanous, regardless of batch size, as long as the batch can fit into memory.

So - every batch is a 2D grid of references and queries that will be compared pairwise by different threads. If we zoom into the batch#0, we see:


![alt text](assets/cosine-batch-layout-batch.jpg "Title")


Meaning that a GPU has a separate small CPU (thread) for every pair in the cartesian product of references and queries in that batch. We see that every thread takes in it's own reference and query and returns three values:
score (float), num_matches (int, but casted to float), overflow (bool).

If we further zoom into the first thread, we see this pseudo-code being executed:


![alt text](assets/cosine-batch-layout-thread.jpg "Title")

This code is what is called a CUDA kernel - and it is exactly the same for every single thread in all batches. What changes is the input data (per batch) and which reference and query we work with (per thread).

The algorithm has two parts.

First loop collects all possible mzmz pairs (up to MATCH_LIMIT size), and report an overflow if it happens.

Second loop is essentially a bubble sort. Since "sorted()" isn't available to CUDA threads, we have to manually loop over the matches (nested loop) and, while we have left over scores:
- Get largest score
- Discard all other scores that have same index
- We normalize the score

# Main loop

In [8]:
streams = [cuda.stream() for _ in range(TOTAL_BATCHES)]

batches_r = []
for rbatch in tqdm(batches(references, BATCH_SIZE), desc="Batch all references"):
    rspec, rlen = spectra_peaks_to_tensor(rbatch, dtype=dtype)
    batches_r.append([rspec, rlen])

batches_q = list()
for qbatch in tqdm(batches(queries, BATCH_SIZE), desc="Batch all queries"):
    qspec, qlen  = spectra_peaks_to_tensor(qbatch, dtype=dtype)
    batches_q.append([qspec, qlen])
    
batches_rq = list(product(batches_r, batches_q))

Batch all references: 2it [00:00, 30.31it/s]
Batch all queries: 2it [00:00, 30.15it/s]


In [9]:
start = perf_counter()
# We initialize a pool of 3 workers that will offload results to disk
with ThreadPool(3) as pool:
    # We loop over all batchs in sequence
    for batch_i in tqdm(range(TOTAL_BATCHES)):
        
        # Each batch has own CUDA stream so that the GPU is as busy as possible
        stream = streams[batch_i]
        
        # Shared memory allows pool workers to read array without copying it
        out_shm = shared_memory.SharedMemory(create=True, size=(BATCH_SIZE * BATCH_SIZE * 2 * 4))
        out = np.ndarray(shape=(BATCH_SIZE, BATCH_SIZE, 2), dtype='float32', buffer=out_shm.buf)
        overflow_shm = shared_memory.SharedMemory(create=True, size=(BATCH_SIZE * BATCH_SIZE * 1 * 1))
        overflow = np.ndarray(shape=(BATCH_SIZE, BATCH_SIZE, 1), dtype='uint8', buffer=overflow_shm.buf)

        # We order empty space for results on GPU RAM
        out_cu = cuda.device_array((BATCH_SIZE, BATCH_SIZE, 2), dtype='float32', stream=stream)
        overflow_cu = cuda.device_array((BATCH_SIZE, BATCH_SIZE, 1), dtype='uint8', stream=stream)

        # We get our batch and lengths (lengths are different for different spectra)
        (rspec, rlen), (qspec, qlen) = batches_rq[batch_i]
        lens = np.zeros((2, max(BATCH_SIZE, BATCH_SIZE)), 'int32')
        lens[0,:len(rlen)] = rlen
        lens[1,:len(qlen)] = qlen
        
        # We make sure main resources remain on CPU RAM
        with cuda.pinned(rspec, qspec, lens, out, overflow,):
            
            # We order the stream to copy input data to GPU RAM
            rspec_cu = cuda.to_device(rspec, stream=stream)
            qspec_cu = cuda.to_device(qspec, stream=stream)
            lens_cu = cuda.to_device(lens, stream=stream)
            
            # We order the stream to execute kernel (this is scheduled, it will execute, but we can't force it)
            kernel(rspec_cu, qspec_cu,
                    lens_cu,
                    out_cu, overflow_cu,
                    stream=stream)
            
            # We order a data return
            out_cu.copy_to_host(out, stream=stream)
            overflow_cu.copy_to_host(overflow, stream=stream)

            # We create a function that will execute when this stream is done working
            # It is important to be quick here - so main work of writing to disk
            # Is handled by pool workers, not callback stream.
            def end_of_stream_callback(*args):
                def thread_worker(name1, name2):
                    ex_shm = shared_memory.SharedMemory(name=name1)
                    out = np.ndarray(shape=(BATCH_SIZE, BATCH_SIZE, 2), dtype=np.float32, buffer=ex_shm.buf)
                    np.save(output_dir/f'{batch_i:05d}.score.npy', out)
                    ex_shm.unlink()
                    ex_shm = shared_memory.SharedMemory(name=name2)
                    overflow = np.ndarray(shape=(BATCH_SIZE, BATCH_SIZE, 1), dtype=np.uint8, buffer=ex_shm.buf)
                    np.save(output_dir/f'{batch_i:05d}.ovfl.npy', overflow)
                    ex_shm.unlink()
                    
                pool.apply_async(
                    thread_worker, 
                    args=[out_shm.name, overflow_shm.name], 
                    error_callback=lambda e: print("Thread error", e)
                )
            stream.add_callback(
                callback=end_of_stream_callback,
            )

# We wait for all streams to finish their work everywhere 
cuda.synchronize()

# We can now calculate our performance fairly
duration = perf_counter() - start
persec = len(references) * len(queries) / duration
print(f"Speed at {persec:.1f} pairs/sec")
print(f"Estimated {(100_000 * 1_500_000 / persec) / 3600:.2f}hrs per 100k x 1.5mln")

  0%|          | 0/4 [00:00<?, ?it/s]

100%|██████████| 4/4 [00:01<00:00,  2.51it/s]

Speed at 2483338.3 pairs/sec
Estimated 16.78hrs per 100k x 1.5mln





# Filtering and further processing

## Examples

### Get any one specific R and Q similarity

In [10]:
def get_one_specific(
    ref_idx,
    que_idx,
) -> tuple:

    batch_idx_x = ref_idx // BATCH_SIZE
    batch_idx_y = que_idx // BATCH_SIZE

    batch_idx = batch_idx_x * TOTAL_BATCHES_Y + batch_idx_y
    scores = np.load(output_dir / f'{batch_idx:05d}.score.npy')

    ref_idx_batch = ref_idx % BATCH_SIZE
    que_idx_batch = que_idx % BATCH_SIZE

    score, num_matches = scores[ref_idx_batch, que_idx_batch]
    return score, num_matches
ref_idx = 1337
que_idx = 777
score, num_matches = get_one_specific(ref_idx, que_idx)
print(f"REF {ref_idx} and QUE {que_idx}: Score {score}, Num Matches {num_matches}")

REF 1337 and QUE 777: Score 0.0, Num Matches 0.0


### Query RQ pairs with condition on score

This is still TODO on large outputs, since filtering gigabytes worth of numpy arrays will take forever. For now, CPU implementation should suffice - or we could integrate this "filtering" behaviour directly into Kernel.

In [38]:
min_score = 0.75 # Min score
results = pd.DataFrame([], columns=['Reference','Query','Score','Num_Matches'])

score_files = sorted(Path(output_dir).glob('*.score.npy'))
for score_file in score_files:
    batch_idx = int(score_file.stem.split('.')[0])
    batch_x_pad = (batch_idx // TOTAL_BATCHES_Y) * BATCH_SIZE
    batch_y_pad = (batch_idx % TOTAL_BATCHES_Y) * BATCH_SIZE
    
    score = np.load(score_file)
    
    # Condition query
    pairs_relative = np.argwhere(score[...,0] >= min_score)
    # We have to pad pairs with their actual locations on full grid
    pairs_absolute = pairs_relative + [batch_x_pad, batch_y_pad]
    
    # score, num_matches = get_one_specific(ref_idx, que_idx)
    r, q = pairs_relative.T
    score, num_match = score[r, q].T
    
    r, q = pairs_absolute.T
    result = pd.DataFrame(dict(
        Reference=r.astype('uint32'),
        Query=q.astype('uint32'),
        Score=score.astype('float32'),
        Num_Matches=num_match.astype('uint16')
    )).convert_dtypes()
    results = pd.concat([results, result], axis=0, copy=False)
    
print(results.dtypes, "Memory ", results.memory_usage().sum() / 1e6, 'MB')

assert (result.Score >= min_score).all(), "Something wrong with filtering!"

results

Reference       UInt32
Query           UInt32
Score          Float32
Num_Matches     UInt16
dtype: object Memory  1.079728 MB


Unnamed: 0,Reference,Query,Score,Num_Matches
0,0,0,1.0,14
1,0,1,0.990495,14
2,0,2,0.977393,11
3,0,3,0.934253,11
4,0,4,0.877143,11
...,...,...,...,...
15022,1992,1273,0.8502,39
15023,1992,1609,0.761673,33
15024,1992,1990,0.802216,37
15025,1992,1991,0.945183,45


Be careful with how large the `results` dataframe can get! You might run our of RAM before it's all loaded into memory