In [1]:
import math
import lithops
import pandas as pd
from dataplug import CloudObject

from dataplug.formats.genomics.fasta import FASTA, partition_chunks_strategy


### 🔐 AWS Credentials Configuration

There are two main ways to configure your AWS credentials:

---

#### ✅ Option 1: (Recommended) Use AWS CLI profiles

You can securely configure your credentials using the AWS CLI:

Run the following command in your terminal:

```bash
aws configure
```

✅ This is the **safest and most recommended approach**, especially for production environments or shared projects.

---

#### ⚠️ Option 2: (Less secure) Set credentials in code

For quick experiments or in isolated environments, you can manually set the environment variables in this notebook:

```python
import os

os.environ["AWS_ACCESS_KEY_ID"] = "AWS_ACCESS_KEY_ID"
os.environ["AWS_SECRET_ACCESS_KEY"] = "AWS_SECRET_ACCESS_KEY"
```

> ⚠️ **IMPORTANT**: Never expose your credentials in source code or public repositories.  
> This method should only be used for temporary testing and never in production.


### ⚙️ Preprocessing a FASTA File from S3 with Lithops

The following code loads a FASTA file stored in an S3 bucket using `CloudObject` and preprocesses it in **4 parallel jobs** by dividing the file into chunks.

```python
co = CloudObject.from_s3(
    FASTA, 
    "s3://dnastack-covid-19-sra-data/PacBio/fasta/SRR16804994/SRR16804994.fasta"
)

# Perform preprocessing in 4 parallel jobs (chunk size = total size / 4)
parallel_config = {"verbose": 10}
chunk_size = math.ceil(co.size / 4)
co.preprocess(parallel_config=parallel_config, chunk_size=chunk_size)
```

- `CloudObject.from_s3(...)`: Loads the remote FASTA file as a Lithops CloudObject.
- `co.size`: Retrieves the size of the file in bytes.
- `chunk_size`: Determines the number of bytes each parallel job will process.
- `preprocess(...)`: Splits and processes the file using **4 parallel workers**, improving performance for large datasets.

> ℹ️ You can adjust the number of parallel jobs by modifying the divisor used in the `chunk_size` calculation.


In [3]:
co = CloudObject.from_s3(FASTA, "s3://dnastack-covid-19-sra-data/PacBio/fasta/SRR16804994/SRR16804994.fasta")

# Perform preprocessing in 4 parallel jobs (chunk size = total size / 4)
parallel_config = {"verbose": 10}
chunk_size = math.ceil(co.size / 4)
co.preprocess(parallel_config=parallel_config, chunk_size=chunk_size)

### 📊 Inspecting and Partitioning the FASTA File

After loading the FASTA file as a `CloudObject`, we can inspect its metadata and split it into multiple slices for parallel processing.

```python
print(f"FASTA file has {co.attributes.num_sequences} sequences")

data_slices = co.partition(partition_chunks_strategy, num_chunks=8)
```

- `co.attributes.num_sequences`: Returns the number of sequences found in the FASTA file.
- `partition(...)`: Divides the file into chunks using the `partition_chunks_strategy` and the specified number of partitions (`num_chunks=8`).

In [4]:
print(f"FASTA file has {co.attributes.num_sequences} sequences")
data_slices = co.partition(partition_chunks_strategy, num_chunks=8)

FASTA file has 1 sequences


In [5]:
data_slices[0].get()

b'>SRR16804994\nNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNTCTTGTAGATCTGTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTCACGCAGTATAATTAATAACTAATTACTGTCGTTGACAGGACACGAGTAACTCGTCTATCTTCTGCAGGCTGCTTACGGTTTCGTCCGTTTTGCAGCCGATTATCAGCACATCTAGGTTTTGTCCGGGTGTGACCGAAAGGTAAGATGGAGAGCCTTGTCCCTGGTTTCAACGAGAAAACACACGTCCAACTCAGTTTGCCTGTTTTACAGGTTCGCGACGTGCTCGTACGTGGCTTTGGAGACTCCGTGGAGGAGGTCTTATCAGAGGCACGTCAACATCTTAAAGATGGCACTTGTGGCTTAGTAGAAGTTGAAAAAGGCGTTTTGCCTCAACTTGAACAGCCCTATGTGTTCATCAAACGTTCGGATGCTCGAACTGTACCTCATGGTCATGTTATGGTTGAGCTGGTAGCAGAACTCGAAGGCATTCAGTACGGTCGTAGTGGTGAGACACTTGGTGTCCTTGTCCCTCATGTGGGCGAAATACCAGTGGCTTACCGCAAGGTTCTTCTTCGTAAGAACGGTAATAAAGGAGCTGGTGGCCATAGTTACGGCGCCGATCTAAAGTCATTTGACTTAGGCGACGAGCTTGGCACTGATCCTTATGAAGATTTTCAAGAAAACTGGAACACTAAACATAGCAGTGGTGTTACCCGTGAACTCATGCGTGAGCTTAACGGAGGGGCATACACTCGCTATGTCGATAACAACTTCTGTGGCCCTGATGGCTACCCTCTTGAGTGCATTAAAGACCTTCTAGCACGTGCTGGTAAAGCTTCATGCACTTTGTCCGAACAACTGGACTTTATTGACACTAAGAGGGGTGTATACTGCTGCCGTGAACATGAGCATGAAATTGCTTG

In [6]:
len(data_slices)

8

### 🧬 Processing FASTA Partitions

The following function processes a partition (chunk) of a FASTA file and extracts useful statistics such as the number of sequences and their length distribution.

#### 🔍 What this function does:

- Retrieves and decodes the content of a FASTA chunk.
- Iterates through the lines and accumulates DNA sequence lengths.
- When a new sequence header (`>`) is encountered, it stores the length of the previous sequence.
- After processing, it returns:
  - `record_count`: Number of sequences found.
  - `min_length`: Length of the shortest sequence.
  - `max_length`: Length of the longest sequence.

> ℹ️ This function can be executed in parallel across all partitions to compute statistics efficiently over large datasets.


In [7]:
def process_fasta_partition(fasta_chunk):
    lines = fasta_chunk.get().decode('utf-8').split('\n')
    read_lengths = []
    sequence = ""

    for line in lines:
        if line.startswith('>'):
            if sequence:
                read_lengths.append(len(sequence))
                sequence = ""
        else:
            sequence += line.strip()

    if sequence:
        read_lengths.append(len(sequence))

    if not read_lengths:
        return {
            "record_count": 0,
            "min_length": None,
            "max_length": None
        }

    record_count = len(read_lengths)
    min_length = min(read_lengths)
    max_length = max(read_lengths)

    return {
        "record_count": record_count,
        "min_length": min_length,
        "max_length": max_length
    }

### ⚡ Parallel Execution with Lithops and Dataplug

Using `lithops.FunctionExecutor`, we can easily launch parallel computations over the data partitions produced by Dataplug.

```python
with lithops.FunctionExecutor() as executor:
    futures = executor.map(process_fasta_partition, data_slices)
    results = executor.get_result(futures)
```

#### 🚀 Highlights:

- **Seamless integration** between **Dataplug** and **Lithops**:  
  The `data_slices` returned by `co.partition(...)` are directly compatible with `executor.map(...)`, making it easy to scale up processing.
  
- **Serverless execution**:  
  Each partition is processed independently and in parallel, leveraging cloud functions without the need to manage infrastructure.

- `executor.map(...)`: Applies the `process_fasta_partition` function to each partition.
- `executor.get_result(...)`: Collects the results once all functions have completed.

> ✅ This tight integration enables effortless scaling for processing large datasets stored in S3 or other backends — with just a few lines of code.

In [8]:
with lithops.FunctionExecutor() as executor:
    futures = executor.map(process_fasta_partition, data_slices)
    results = executor.get_result(futures)

2025-03-28 13:41:18,639 [INFO] config.py:146 -- Lithops v3.5.2.dev0 - Python3.10
2025-03-28 13:41:18,640 [DEBUG] config.py:107 -- Loading configuration from the cloud
2025-03-28 13:41:19,016 [DEBUG] config.py:186 -- Loading Serverless backend module: aws_lambda
2025-03-28 13:41:19,024 [DEBUG] config.py:226 -- Loading Storage backend module: aws_s3
2025-03-28 13:41:19,029 [DEBUG] aws_s3.py:36 -- Creating AWS S3 Client
2025-03-28 13:41:19,188 [INFO] aws_s3.py:59 -- S3 client created - Region: us-east-1
2025-03-28 13:41:19,234 [DEBUG] aws_lambda.py:53 -- Creating AWS Lambda client
2025-03-28 13:41:19,579 [INFO] aws_lambda.py:97 -- AWS Lambda client created - Region: us-east-1
2025-03-28 13:41:19,583 [DEBUG] invokers.py:105 -- ExecutorID a08781-0 - Invoker initialized. Max workers: 1000
2025-03-28 13:41:19,584 [DEBUG] invokers.py:308 -- ExecutorID a08781-0 - Serverless invoker created
2025-03-28 13:41:19,736 [DEBUG] executors.py:164 -- Function executor for aws_lambda created with ID: a087

In [9]:
total_records = 0
for idx, res in enumerate(results):
    print(f"Partition {idx + 1} results:")
    print(f"  Min Length: {res['min_length']}")
    print(f"  Max Length: {res['max_length']}")
    print(f"  Record Count: {res['record_count']}")
    total_records += res['record_count']

print(f"Total record count from all partitions: {total_records}")

Partition 1 results:
  Min Length: 3725
  Max Length: 3725
  Record Count: 1
Partition 2 results:
  Min Length: 3738
  Max Length: 3738
  Record Count: 1
Partition 3 results:
  Min Length: 3738
  Max Length: 3738
  Record Count: 1
Partition 4 results:
  Min Length: 3738
  Max Length: 3738
  Record Count: 1
Partition 5 results:
  Min Length: 3738
  Max Length: 3738
  Record Count: 1
Partition 6 results:
  Min Length: 3738
  Max Length: 3738
  Record Count: 1
Partition 7 results:
  Min Length: 3738
  Max Length: 3738
  Record Count: 1
Partition 8 results:
  Min Length: 3737
  Max Length: 3737
  Record Count: 1
Total record count from all partitions: 8
