In [None]:
import ray
import numpy as np
import pandas as pd
import joblib

from collections import Counter
from ray.util.joblib import register_ray
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

from cloud_data_cockpit import DataCockpit

In [None]:
ray.init()
register_ray()

## Loading and Partitioning FASTA Sequences with DataCockpit

In this cell, we will initialize the data loader and prepare a FASTA file for distributed processing with Ray. You should:

1. **Specify a FASTA file path**  
   - Point to the FASTA file you want to process (e.g. `sequences.fasta`).  

2. **Define the number of _chunks_**  
   - Choose into how many partitions (_chunks_) you want to split the sequence data.  
   - Proper chunking allows Ray to balance the workload across workers.

3. **Partition the FASTA file**  
   - Use DataCockpit to read and split the file into the defined number of _chunks_.  

4. **Run the rest of the notebook with Ray**  
   - After partitioning, Ray will manage parallel sequence processing.  
   - Ensure your Ray cluster is initialized before executing downstream analysis.


In [None]:
data_loader = DataCockpit()

In [None]:
slices = data_loader.get_data_slices()

In [None]:
def parse_vcf_line(line: str) -> dict:
    cols = line.split("\t")
    if len(cols) < 9:
        return {}
    chrom, pos, vid, ref, alt, qual, flt, info, fmt, *samples = cols
    info_dict = {}
    for entry in info.split(";"):
        if "=" in entry:
            key, val = entry.split("=", 1)
            try:
                val = int(val)
            except ValueError:
                try:
                    val = float(val)
                except ValueError:
                    pass
            info_dict[key] = val
        else:
            info_dict[entry] = True
    return {
        "CHROM": chrom,
        "POS": int(pos),
        "ID": vid,
        "REF": ref,
        "ALT": [] if alt == "." else alt.split(","),
        "QUAL": None if qual == "." else float(qual),
        "FILTER": flt,
        **info_dict,
        "FORMAT": fmt,
        "SAMPLES": samples,
    }

In [None]:
total_counts = Counter()
for slice in slices:
    raw = slice.get()
    for line in raw.splitlines():
        if not line or line.startswith("#"):
            continue
        rec = parse_vcf_line(line)
        if rec:
            total_counts[rec["CHROM"]] += 1

In [None]:
print("Variants per chromosome:")
for chrom, cnt in sorted(total_counts.items()):
    print(f"  {chrom}: {cnt}")


In [None]:
ray.shutdown()