## AFDist on S3 - parallel timing

### Instance type is c5.24xlarge

In [1]:
!pip install boto3 matplotlib aioboto3 aiobotocore numcodecs
import boto3
import aioboto3
from botocore.config import Config
import matplotlib.pyplot as plt
import time
from concurrent.futures import ThreadPoolExecutor



In [2]:
s3_client = boto3.client('s3', config=Config(max_pool_connections=512))

In [3]:
bucket="lifebit-user-data-1f2bfdf2-1d99-488c-9b87-246c62b66ea7"

## Firstly make a list of the chunks that are read by the serial process in the other "Comparison" notebooks

In [4]:
start_index, end_index = 13889170, 14451810
start_chunk, end_chunk = start_index//10000, end_index//10000
keys = [(v,s) for v in range(start_chunk, end_chunk) for s in range(79)]

In [5]:
from numcodecs.blosc import Blosc
from numcodecs import blosc
blosc.init()
blosc.set_nthreads(1)
#Need to call this twice, not sure why!
blosc.set_nthreads(1)

1

## Numba afdist code - note that this can be run in parallel writing to the same numpy output arrays as long as the chunks are independent, which they are

In [6]:
import numba 
from numba import types
import numpy as np

@numba.jit(
    (
    types.int64,  # offset
    types.Array(types.int8, 3, 'C', readonly=True),  # G (blosc buffer)
    types.Array(types.int32, 1, 'C'),  # hom_ref
    types.Array(types.int32, 1, 'C'),  # hom_alt
    types.Array(types.int32, 1, 'C'),  # het
    types.Array(types.int32, 1, 'C')   # ref_count
), nopython=True, nogil=True
)
def count_genotypes_chunk_subset(
    offset, G, hom_ref, hom_alt, het, ref_count):
    #NB Assuming diploids and no missing data!
    index = offset
    for j in range(G.shape[0]):
            for k in range(G.shape[1]):
                    a = G[j, k, 0]
                    b = G[j, k, 1]
                    if a == b:
                        if a == 0:
                            hom_ref[index] += 1
                        else:
                            hom_alt[index] += 1
                    else:
                        het[index] += 1
                    ref_count[index] += (a == 0) + (b == 0)
            index += 1

## Code for fetching, decoding and running AFDist in parallel processes

In [8]:
import asyncio
import aioboto3
import multiprocessing as mp
from collections import defaultdict
import time
import os
from typing import List, Dict
import statistics
from dataclasses import dataclass
from concurrent.futures import ProcessPoolExecutor
import numpy as np

# Total num of variants to allocate result arrays
NUM_VAR=59880903

@dataclass
class BatchStats:
    process_id: int
    start_time: float
    total_bytes: int
    data_bytes: int
    files_completed: int
    duration: float = None
    
class S3BulkFetcher:
    def __init__(self, bucket: str, keys: List[str], num_processes: int = None, batch_size: int = 500):
        self.bucket = bucket
        self.keys = keys
        self.num_processes = num_processes or mp.cpu_count()
        self.batch_size = batch_size
        self.work_queue = mp.Queue()
        self.stats_queue = mp.Queue()
        
    def prepare_batches(self):
        batches = [
            self.keys[i:i+self.batch_size] 
            for i in range(0, len(self.keys), self.batch_size)
        ]
        for batch in batches:
            self.work_queue.put(batch)
            
        for _ in range(self.num_processes):
            self.work_queue.put(None)
    
    async def process_batch(self, batch: List[str], session, decoder) -> BatchStats:
        stats = BatchStats(
            process_id=os.getpid(),
            start_time=time.time(),
            total_bytes=0,
            data_bytes=0,
            files_completed=0
        )

        hom_ref = np.zeros(NUM_VAR, dtype="int32")
        hom_alt = np.zeros(NUM_VAR, dtype="int32")
        het = np.zeros(NUM_VAR, dtype="int32")
        ref_count = np.zeros(NUM_VAR, dtype="int32")
        
        async with session.client('s3') as s3:
            async def fetch_single(key: str):
                v, s = key
                key = f"GEL-256534/call_genotype/{v}/{s}/0"
                response = await s3.get_object(Bucket=self.bucket, Key=key)
                raw_data = await response['Body'].read()
                return raw_data, v
            loop = asyncio.get_running_loop()


            
            decompression_tasks = []
            for raw_data_v_chunk in asyncio.as_completed(
                [fetch_single(key) for key in batch]
            ):
                raw_data, v_chunk = await raw_data_v_chunk
                G = np.reshape(np.frombuffer(decoder.decode(raw_data), dtype="int8"), (10000,1000,2))
                count_genotypes_chunk_subset(v_chunk*10000,G, hom_ref, hom_alt, het, ref_count)
                stats.total_bytes += len(raw_data)
                stats.data_bytes += G.nbytes
                stats.files_completed += 1                
                    
        stats.duration = time.time() - stats.start_time
        return stats
    
    async def worker_loop(self):
        session = aioboto3.Session()
        decoder = Blosc.from_config({'cname': 'zstd', 'clevel': 7, 'shuffle': 2, 'blocksize': 0})
        while True:
            batch = self.work_queue.get()
            if batch is None:  # sentinel value
                break
                
            stats = await self.process_batch(batch, session, decoder)
            self.stats_queue.put(stats)


    def worker_process(self):
        asyncio.run(self.worker_loop())

    def print_final_stats(self, all_stats: List[BatchStats], wall_time: float):
        total_bytes = sum(s.total_bytes for s in all_stats)
        total_data = sum(s.data_bytes for s in all_stats)
        total_files = sum(s.files_completed for s in all_stats)
        
        process_stats = defaultdict(lambda: {'bytes': 0, 'data':0, 'files': 0, 'batches': 0})
        for stat in all_stats:
            process_stats[stat.process_id]['bytes'] += stat.total_bytes
            process_stats[stat.process_id]['data'] += stat.data_bytes
            process_stats[stat.process_id]['files'] += stat.files_completed
            process_stats[stat.process_id]['batches'] += 1

        batch_times = [s.duration for s in all_stats]
        
        print("\n=== S3 Bulk Download Statistics ===")
        print(f"\nOverall Performance:")
        print(f"Total transfer: {total_bytes / 1000 / 1000:.2f} MB in {total_files} files")
        print(f"Total data: {total_data / 1000 / 1000:.2f} MB in {total_files} chnnks")
        print(f"Duration: {wall_time:.2f} seconds")
        print(f"Throughput: {(total_bytes / 1000 / 1000) / wall_time:.2f} MB/s, "
             f"{total_files / wall_time:.1f} files/s")
        print(f"Data Throughput: {(total_data / 1000 / 1000) / wall_time:.2f} MB/s, "
             f"{total_files / wall_time:.1f} chunks/s")
        
        print("\nBatch Statistics:")
        print(f"Batch times - min: {min(batch_times):.2f}s, "
             f"max: {max(batch_times):.2f}s, "
             f"avg: {statistics.mean(batch_times):.2f}s")
        
        print("\nPer-Process Performance:")
        for pid, stats in process_stats.items():
           print(f"Process {pid}:")
           print(f"  Throughput: {(stats['bytes'] / 1000 / 1000) / wall_time:.2f} MB/s, "
                 f"{stats['files'] / wall_time:.1f} files/s")
           print(f"  Data Throughput: {(stats['data'] / 1000 / 1000) / wall_time:.2f} MB/s, "
                 f"{stats['files'] / wall_time:.1f} chunks/s")
           print(f"  Processed: {stats['files']} files in {stats['batches']} batches")

    def run(self):
        start_time = time.time()
        
        # Prepare work batches
        self.prepare_batches()
        
        # Start worker processes
        processes = []
        for _ in range(self.num_processes):
            p = mp.Process(target=self.worker_process)
            p.start()
            processes.append(p)
            
        # Collect all stats
        all_stats = []
        stats_to_collect = len([b for b in self.keys[::self.batch_size]])
        
        while len(all_stats) < stats_to_collect:
            stats = self.stats_queue.get()
            all_stats.append(stats)
            
            # Optional: Print progress
            #if len(all_stats) % 10 == 0:
            #    print(f"Progress: {len(all_stats)}/{stats_to_collect} batches complete")
        
        # Wait for all processes to complete
        for p in processes:
            p.join()
            
        wall_time = time.time() - start_time
        
        # Print final statistics
        self.print_final_stats(all_stats, wall_time)
        
        return all_stats



## Run and time the benchmark

In [10]:
%%time
fetcher = S3BulkFetcher(
    bucket=bucket,
    keys=keys,
    num_processes=128,
    batch_size=100,
)

start_time = time.time()
stats = fetcher.run()
duration = time.time() - start_time
print(duration)


=== S3 Bulk Download Statistics ===

Overall Performance:
Total transfer: 180.80 MB in 4503 files
Total data: 90060.00 MB in 4503 chnnks
Duration: 4.14 seconds
Throughput: 43.63 MB/s, 1086.6 files/s
Data Throughput: 21732.70 MB/s, 1086.6 chunks/s

Batch Statistics:
Batch times - min: 0.36s, max: 4.01s, avg: 3.72s

Per-Process Performance:
Process 1604:
  Throughput: 0.02 MB/s, 0.7 files/s
  Data Throughput: 14.48 MB/s, 0.7 chunks/s
  Processed: 3 files in 1 batches
Process 1558:
  Throughput: 0.89 MB/s, 24.1 files/s
  Data Throughput: 482.63 MB/s, 24.1 chunks/s
  Processed: 100 files in 1 batches
Process 1553:
  Throughput: 0.64 MB/s, 24.1 files/s
  Data Throughput: 482.63 MB/s, 24.1 chunks/s
  Processed: 100 files in 1 batches
Process 1561:
  Throughput: 0.65 MB/s, 24.1 files/s
  Data Throughput: 482.63 MB/s, 24.1 chunks/s
  Processed: 100 files in 1 batches
Process 1571:
  Throughput: 0.85 MB/s, 24.1 files/s
  Data Throughput: 482.63 MB/s, 24.1 chunks/s
  Processed: 100 files in 1 b

### This gives a a total wall time of 4.15s for the run