In [None]:
import numpy as np
import requests
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
import random
import os

Function to fetch CRY1 gene sequence from UCSC API

In [None]:
def get_CRY1_gene():
    response = requests.get("https://api.genome.ucsc.edu/getData/sequence?genome=hg38;chrom=chr12;start=106991364;end=107093549")
    if response.status_code == 200:
        return response.json().get("dna", "").upper()
    else:
        return None

One-hot encode sequence and return a 1D array

In [None]:
def onehotencoder(fasta_sequence, max_length=102500):
    sequence_array = np.array(list(fasta_sequence))
    label_encoder = LabelEncoder()
    integer_encoded = label_encoder.fit_transform(sequence_array)
    onehotencoder = OneHotEncoder(sparse_output=False, dtype=np.float32)
    integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)
    onehot_sequence = onehotencoder.fit_transform(integer_encoded).astype(np.float32)

    if onehot_sequence.shape[0] < max_length:
        pad_size = max_length - onehot_sequence.shape[0]
        padding = np.zeros((pad_size, onehot_sequence.shape[1]))
        onehot_sequence = np.vstack([onehot_sequence, padding])
    else:
        onehot_sequence = onehot_sequence[:max_length, :]
    return onehot_sequence.flatten()

Augment sequence by introducing substitutions, deletions, or insertions

In [None]:
def augment_sequence(seq, mutation_rate=0.1):
    """
    Randomly mutates a subset of the sequence.
    - `mutation_rate`: Fraction of positions to mutate (default 10% of sequence).
    """
    random.seed(os.urandom(4))  # Ensure true randomness per sequence

    seq_list = list(seq)
    num_mutations = int(len(seq_list) * mutation_rate)

    for _ in range(num_mutations):
        idx = random.randint(0, len(seq_list) - 1)
        mutation_type = random.choice(["substitution", "deletion", "insertion"])

        if mutation_type == "substitution":
            seq_list[idx] = random.choice(["A", "G", "C", "T"])
        elif mutation_type == "deletion":
            del seq_list[idx]
        elif mutation_type == "insertion":
            seq_list.insert(idx, random.choice(["A", "G", "C", "T"]))

    return ''.join(seq_list)

Process augmented sequence and save to output file

In [None]:
def process_data_augmentation(cry1_seq, output_path, num_augmented_sequences=1250, batch_size=250):
    """
    Augments sequences, ensuring diversity, and saves them in batches.
    - `num_augmented_sequences`: Total number of augmented sequences to generate.
    - `batch_size`: Number of sequences to save in one batch.
    """
    # Load existing data if available
    if os.path.exists(output_path):
        existing_data = np.load(output_path, allow_pickle=True)["arr_0"].tolist()
    else:
        existing_data = []

    seq_count = 0
    rows_save = []

    while seq_count < num_augmented_sequences:
        augmented_seq = augment_sequence(cry1_seq)

        # Ensure augmented sequence is unique
        if not is_duplicate(augmented_seq, existing_data):
            print(f"Processed augmented sequence {seq_count + 1}.")
            encoded_seq = onehotencoder(augmented_seq)
            seq_count += 1
            rows_save.append(encoded_seq)

            if len(rows_save) >= batch_size:
                existing_data.extend(rows_save)
                np.savez_compressed(output_path, arr_0=np.array(existing_data))
                rows_save = []

    # Save remaining data
    if rows_save:
        existing_data.extend(rows_save)
        np.savez_compressed(output_path, arr_0=np.array(existing_data))

    print(f"Processed {seq_count} augmented sequences and saved to {output_path}.")

Path to output file

In [None]:
output_file = "E:\\datasets\\processeddata\\AUGMENTED_DATA_TRAINING_1250_NEW.npz"

Fetch CRY1 sequence and process data with augmentation

In [None]:
cry1_seq = get_CRY1_gene()
if cry1_seq:
    process_data_augmentation(cry1_seq, output_file)
else:
    print("Failed to fetch CRY1 gene sequence.")