In [None]:
import ray
import numpy as np
import pandas as pd
import joblib

from collections import Counter
from ray.util.joblib import register_ray
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

from cloud_data_cockpit import DataCockpit

In [None]:
ray.init()
register_ray()

## Loading and Partitioning FASTA Sequences with DataCockpit

In this cell, we will initialize the data loader and prepare a FASTA file for distributed processing with Ray. You should:

1. **Specify a FASTA file path**  
   - Point to the FASTA file you want to process (e.g. `sequences.fasta`).  

2. **Define the number of _chunks_**  
   - Choose into how many partitions (_chunks_) you want to split the sequence data.  
   - Proper chunking allows Ray to balance the workload across workers.

3. **Partition the FASTA file**  
   - Use DataCockpit to read and split the file into the defined number of _chunks_.  

4. **Run the rest of the notebook with Ray**  
   - After partitioning, Ray will manage parallel sequence processing.  
   - Ensure your Ray cluster is initialized before executing downstream analysis.


In [None]:
data_loader = DataCockpit()

In [None]:
slices = data_loader.get_data_slices()

In [None]:
@ray.remote
def process_batch(batch_slice, k=4):
    lines = batch_slice.get()
    # Normalize bytes→str
    normalized = [l.decode("utf-8") if isinstance(l, bytes) else l
                    for l in lines]

    docs = []
    for i in range(0, len(normalized), 4):
        header = normalized[i].lstrip("@").strip()
        seq = normalized[i+1].strip().upper()
        if not seq:
            continue
        # Build k‑mers
        kmers = [seq[j:j+k] for j in range(len(seq) - k + 1)]
        docs.append(" ".join(kmers))
    return docs

In [None]:
futures = [process_batch.remote(s) for s in slices]
nested_docs = ray.get(futures)

In [None]:
counts = [len(docs) for docs in nested_docs]
logging.info(f"Reads per batch: {counts}")
if sum(counts) == 0:
    raise RuntimeError("No k‑mers extracted: check parser or batch size")

In [None]:
vectorizer = CountVectorizer(
    stop_words='english',
    token_pattern=r'(?u)\b[a-zA-ZÁÉÍÓÚñÑüÜ]{2,}\b',
    min_df=2
)
X = vectorizer.fit_transform(all_docs)

In [None]:
register_ray()
n_clusters = 3
with parallel_backend("ray", n_jobs=4):
    kmeans = KMeans(n_clusters=n_clusters, random_state=0)
    labels = kmeans.fit_predict(X)

In [None]:
df = pd.DataFrame({
    "doc_index": range(len(all_docs)),
    "cluster": labels
})
print("Size of each cluster:")
print(df["cluster"].value_counts().sort_index())


In [None]:
ray.shutdown()
