In [1]:
from random import randint, choice

# Map Reduce

In the part of the assignment you are requested to use Map Reduce paradigm to solve the following exercises.

**NOTE THAT**: **A solution that does not use map reduce is not valid!**

# Exercise 1

You have a list of dictionaries, each representing a student with the following properties: a name and an array of test scores. Your task is to use map, filter, and reduce to calculate the average test score for each student, and then return a list of dictionaries containing only the students whose average score is above 90.

In [2]:
students = [
    {"name": "Alice", "scores": [95, 92, 88, 100]},
    {"name": "Bob", "scores": [78, 81, 85, 80]},
    {"name": "Charlie", "scores": [99, 91, 94, 96]},
    {"name": "Diana", "scores": [85, 87, 89, 83]}
]

Use `map`, `reduce` and `filter` that produce an output like:

In [3]:
[
    {"name": "Alice", "average_score": 93.75},
    {"name": "Charlie", "average_score": 95.0}
]

[{'name': 'Alice', 'average_score': 93.75},
 {'name': 'Charlie', 'average_score': 95.0}]

### Test
Test your solution using the dataset generated by the following function.

In [4]:
def generate_random_student_dataset(num_students=50):
    names = [f"Student {i}" for i in range(1, num_students + 1)]
    dataset = [
        {
            "name": name,
            "scores": [randint(50, 100) for _ in range(randint(3, 6))]  # Random scores between 50 and 100
        }
        for name in names
    ]
    return dataset

random_student_dataset = generate_random_student_dataset(50)
random_student_dataset[:3]

[{'name': 'Student 1', 'scores': [95, 98, 73, 66, 95]},
 {'name': 'Student 2', 'scores': [87, 62, 78, 73, 52]},
 {'name': 'Student 3', 'scores': [77, 69, 51, 93, 97]}]

In [5]:
# your code goes here
from random import randint
from functools import reduce

# Function to generate a random student dataset
def generate_random_student_dataset(num_students=50):
    names = [f"Student-{i}" for i in range(1, num_students + 1)]
    dataset = [
        {
            "name": name,
            "scores": [randint(50, 100) for _ in range(randint(3, 6))]  # Random scores between 50 and 100
        }
        for name in names
    ]
    return dataset

# Function to process the student dataset
def process_students(students):
    # Step 1: Calculate average score for each student
    students_with_avg = list(
        map(
            lambda student: {
                "name": student["name"],
                "average_score": reduce(lambda x, y: x + y, student["scores"]) / len(student["scores"])
            },
            students,
        )
    )

    # Step 2: Filter students with average score > 90
    filtered_students = list(filter(lambda student: student["average_score"] > 90, students_with_avg))

    # Step 3: Format the output
    result = list(
        map(lambda student: {"name": student["name"], "average_score": student["average_score"]}, filtered_students)
    )
    return result

# Generate the random dataset
random_student_dataset = generate_random_student_dataset(50)

# Process the dataset
output = process_students(random_student_dataset)

# Display the first few random students and the final result
print("Random Dataset (First 3 Students):")
print(random_student_dataset[:3])
print("\nFiltered Students with Average Score > 90:")
print(output)


Random Dataset (First 3 Students):
[{'name': 'Student-1', 'scores': [54, 50, 91, 84, 88]}, {'name': 'Student-2', 'scores': [97, 69, 76, 75, 80, 75]}, {'name': 'Student-3', 'scores': [54, 79, 88, 77, 54, 72]}]

Filtered Students with Average Score > 90:
[{'name': 'Student-28', 'average_score': 95.25}]


## Exercise 2

You have a list of dictionaries, each representing a product with the following properties: name, price, and category. Using the functions `map`, `filter`, and `reduce`, calculate the average price of the products in each category and return a list of dictionaries containing only the categories where the average price exceeds 50.

Example input:

In [6]:
products = [
    {"name": "Product A", "price": 60, "category": "Electronics"},
    {"name": "Product B", "price": 40, "category": "Electronics"},
    {"name": "Product C", "price": 70, "category": "Home"},
    {"name": "Product D", "price": 30, "category": "Home"},
    {"name": "Product E", "price": 90, "category": "Sports"}
]

Use `map`, `reduce` and `filter` that produce an output like:

In [7]:
[
    {"category": "Electronics", "average_price": 50.0},
    {"category": "Sports", "average_price": 90.0}
]

[{'category': 'Electronics', 'average_price': 50.0},
 {'category': 'Sports', 'average_price': 90.0}]

### Test
Test your solution using the dataset generated by the following function.

In [8]:
def generate_random_product_dataset(num_products=100):
    categories = ["Electronics", "Home", "Sports", "Books", "Clothing", "Toys"]
    dataset = [
        {
            "name": f"Product {i}",
            "price": randint(10, 200),  # Random price between 10 and 200
            "category": choice(categories),  # Randomly choose a category
        }
        for i in range(1, num_products + 1)
    ]
    return dataset

# Example of using the function
random_dataset = generate_random_product_dataset(100)
random_dataset[:5]  # Display the first 5 entries to check the dataset structure


[{'name': 'Product 1', 'price': 153, 'category': 'Books'},
 {'name': 'Product 2', 'price': 176, 'category': 'Books'},
 {'name': 'Product 3', 'price': 153, 'category': 'Electronics'},
 {'name': 'Product 4', 'price': 101, 'category': 'Books'},
 {'name': 'Product 5', 'price': 22, 'category': 'Home'}]

In [None]:
# your code goes here
# hints: 1) Group products by category (you don't need to use map reduce for this part), then 2) use map reduce paradigm to
# calculate the average price for each category and filter categories with an average price > 50
from functools import reduce
from random import randint, choice

# Example dataset
products = [
    {"name": "Product A", "price": 60, "category": "Electronics"},
    {"name": "Product B", "price": 40, "category": "Electronics"},
    {"name": "Product C", "price": 70, "category": "Home"},
    {"name": "Product D", "price": 30, "category": "Home"},
    {"name": "Product E", "price": 90, "category": "Sports"},
]

# Step 1: Group by category using reduce
grouped = reduce(lambda acc, product: acc.update({product['category']: acc.get(product['category'], []) + [product]}) or acc, products, {})

# Step 2: Calculate average prices using map
average_prices = list(map(lambda category: {"category": category, "average_price": sum(p['price'] for p in grouped[category]) / len(grouped[category])}, grouped))

# Step 3: Filter categories where average price > 50
result = list(filter(lambda cat: cat['average_price'] > 50, average_prices))

# Output the result
print(result)


# Exercise 3

You have a list of dictionaries, each representing an employee with the following properties: name, salary, and department. Your task is to use `map`, `filter`, and `reduce` to calculate the average salary for each department and return a list of dictionaries containing only the departments where the average salary is above 65,000.

**Example Input**

In [10]:
employees = [
    {"name": "John", "salary": 70000, "department": "Engineering"},
    {"name": "Jane", "salary": 75000, "department": "Engineering"},
    {"name": "Alice", "salary": 60000, "department": "HR"},
    {"name": "Bob", "salary": 68000, "department": "HR"},
    {"name": "Charlie", "salary": 90000, "department": "Marketing"},
    {"name": "Diana", "salary": 50000, "department": "Marketing"}
]

Use `map`, `reduce` and `filter` that produce an output like:

In [11]:
[
    {"department": "Engineering", "average_salary": 72500.0},
    {"department": "Marketing", "average_salary": 70000.0}
]

[{'department': 'Engineering', 'average_salary': 72500.0},
 {'department': 'Marketing', 'average_salary': 70000.0}]

### Test

Test your solution using the dataset generated by the following function.

In [12]:
def generate_random_employee_dataset(num_employees=50):
    departments = ["Engineering", "HR", "Marketing", "Sales", "Finance", "IT"]
    dataset = [
        {
            "name": f"Employee {i}",
            "salary": randint(40000, 120000),  # Random salary between 40,000 and 120,000
            "department": choice(departments)  # Randomly choose a department
        }
        for i in range(1, num_employees + 1)
    ]
    return dataset

random_employee_dataset = generate_random_employee_dataset(50)

random_employee_dataset[:3]  # Display the first 3 entries of each dataset for checking


[{'name': 'Employee 1', 'salary': 112097, 'department': 'HR'},
 {'name': 'Employee 2', 'salary': 87788, 'department': 'Engineering'},
 {'name': 'Employee 3', 'salary': 66561, 'department': 'Finance'}]

In [None]:
# your code goes here
# hints: 1) Group employees' salaries by department (you don't need to use map reduce for this part), then 2) use map reduce paradigm to
# calculate the average salary for each department and filter departments with an average salary > threshold
from functools import reduce
from itertools import groupby
from operator import itemgetter

# Input data
employees = [
    {"name": "John", "salary": 70000, "department": "Engineering"},
    {"name": "Jane", "salary": 75000, "department": "Engineering"},
    {"name": "Alice", "salary": 60000, "department": "HR"},
    {"name": "Bob", "salary": 68000, "department": "HR"},
    {"name": "Charlie", "salary": 90000, "department": "Marketing"},
    {"name": "Diana", "salary": 50000, "department": "Marketing"},
]

# Group employees by department
employees.sort(key=itemgetter('department'))
grouped = groupby(employees, key=itemgetter('department'))

# Calculate average salary per department
department_avg_salary = map(
    lambda dept: {
        "department": dept[0],
        "average_salary": reduce(
            lambda acc, emp: acc + emp["salary"], dept[1], 0
        ) / len(list(dept[1]))
    },
    grouped
)

# Filter departments with average salary > 65000
filtered_departments = filter(
    lambda dept: dept["average_salary"] > 65000, department_avg_salary
)

# Convert result to list
result = list(filtered_departments)

# Output
print(result)


# Biopython

Write the following five functions to analyze global alignments between two sequences using Biopython's `pairwise2` module:

1. **countMatches(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment (pairwise2.globalxx) of the same length. It returns the number of positions where the elements of both sequences match.

2. **countMismatches(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment of the same length. It returns the number of positions where the elements of the two sequences are different (i.e., they are not gaps, and the characters do not match).

3. **countGapOpens(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment of the same length. It returns the number of gap openings in the alignment (a gap is opened when a '-' appears in the sequence).

4. **countGapExtensions(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment of the same length. It returns the number of gap extensions (where '-' continues in the alignment after an initial gap is opened).

5. **getScore(s1, s2, matchScore, mismatchPenalty, gapOpenPenalty, gapExtensionPenalty)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment and returns the alignment score based on the provided scoring scheme: `matchScore` for matches, `mismatchPenalty` for mismatches, `gapOpenPenalty` for opening a gap, and `gapExtensionPenalty` for extending a gap.

In [None]:
# Add your functions here
from Bio import pairwise2

# Function 1: Count Matches
def countMatches(s1, s2):
    """Returns the number of matching positions in the aligned sequences."""
    return sum(1 for a, b in zip(s1, s2) if a == b and a != '-')

# Function 2: Count Mismatches
def countMismatches(s1, s2):
    """Returns the number of mismatched positions (non-gaps that differ)."""
    return sum(1 for a, b in zip(s1, s2) if a != b and a != '-' and b != '-')

# Function 3: Count Gap Opens
def countGapOpens(s1, s2):
    """Returns the number of gap openings in the alignment."""
    gaps = 0
    for a, b in zip(s1, s2):
        if (a == '-' or b == '-') and not (a == b == '-'):
            gaps += 1
    return gaps

# Function 4: Count Gap Extensions
def countGapExtensions(s1, s2):
    """Returns the number of gap extensions in the alignment."""
    extensions = 0
    previous_gap = False
    for a, b in zip(s1, s2):
        if a == '-' or b == '-':
            if previous_gap:  # If continuing a gap
                extensions += 1
            previous_gap = True
        else:
            previous_gap = False
    return extensions

# Function 5: Get Score
def getScore(s1, s2, matchScore, mismatchPenalty, gapOpenPenalty, gapExtensionPenalty):
    """Returns the alignment score based on the given scoring scheme."""
    score = 0
    previous_gap = False

    for a, b in zip(s1, s2):
        if a == b and a != '-':  # Match
            score += matchScore
        elif a != b and a != '-' and b != '-':  # Mismatch
            score -= mismatchPenalty
        elif a == '-' or b == '-':  # Gap
            if previous_gap:  # Gap extension
                score -= gapExtensionPenalty
            else:  # Gap opening
                score -= gapOpenPenalty
                previous_gap = True
        else:
            previous_gap = False  # Reset gap tracking if no gap
    
    return score

# Example Usage
if __name__ == "__main__":
    # Input sequences
    seq1 = "ACGTACGT"
    seq2 = "ACGTCG-T"

    # Perform global alignment
    alignments = pairwise2.align.globalxx(seq1, seq2)
    aligned_s1, aligned_s2 = alignments[0][:2]  # Extract aligned sequences

    # Example scoring
    match = 2
    mismatch = -1
    gap_open = -2
    gap_extend = -1

    # Test functions
    print("Aligned Sequences:")
    print(aligned_s1)
    print(aligned_s2)
    print("Matches:", countMatches(aligned_s1, aligned_s2))
    print("Mismatches:", countMismatches(aligned_s1, aligned_s2))
    print("Gap Opens:", countGapOpens(aligned_s1, aligned_s2))
    print("Gap Extensions:", countGapExtensions(aligned_s1, aligned_s2))
    print("Score:", getScore(aligned_s1, aligned_s2, match, mismatch, gap_open, gap_extend))


### Test
Align the sequences of the [Interleukin-12](https://en.wikipedia.org/wiki/Interleukin_12) chain A (denoted as `s1`) from the file [`IL12A.fasta`](https://qcbsciprolab2020.readthedocs.io/en/latest/file_samples/IL12A.fasta) and the Interleukin-12 chain B (denoted as `s2`) from the file [`IL12B.fasta`](https://qcbsciprolab2020.readthedocs.io/en/latest/file_samples/IL12B.fasta) and check the score as computed from pairwise2 and from your functions.

In [None]:
# add the output of the test here
from Bio import SeqIO
from Bio import pairwise2
from Bio.pairwise2 import format_alignment

# Reuse the previously defined functions
def countMatches(s1, s2):
    return sum(1 for a, b in zip(s1, s2) if a == b and a != '-')

def countMismatches(s1, s2):
    return sum(1 for a, b in zip(s1, s2) if a != b and a != '-' and b != '-')

def countGapOpens(s1, s2):
    gaps = 0
    for a, b in zip(s1, s2):
        if (a == '-' or b == '-') and not (a == b == '-'):
            gaps += 1
    return gaps

def countGapExtensions(s1, s2):
    extensions = 0
    previous_gap = False
    for a, b in zip(s1, s2):
        if a == '-' or b == '-':
            if previous_gap:  # Continuing a gap
                extensions += 1
            previous_gap = True
        else:
            previous_gap = False
    return extensions

def getScore(s1, s2, matchScore, mismatchPenalty, gapOpenPenalty, gapExtensionPenalty):
    score = 0
    previous_gap = False

    for a, b in zip(s1, s2):
        if a == b and a != '-':  # Match
            score += matchScore
        elif a != b and a != '-' and b != '-':  # Mismatch
            score -= mismatchPenalty
        elif a == '-' or b == '-':  # Gap
            if previous_gap:  # Gap extension
                score -= gapExtensionPenalty
            else:  # Gap opening
                score -= gapOpenPenalty
                previous_gap = True
        else:
            previous_gap = False  # Reset gap tracking
    
    return score

# Testing with IL12A.fasta and IL12B.fasta
def test_alignment(file1, file2):
    # Read the sequences from FASTA files
    seq1 = str(next(SeqIO.parse(file1, "fasta")).seq)
    seq2 = str(next(SeqIO.parse(file2, "fasta")).seq)

    # Perform global alignment
    alignments = pairwise2.align.globalxx(seq1, seq2)  # Example scoring: globalxx (match = 1, mismatch/gap = 0)
    aligned_s1, aligned_s2 = alignments[0][:2]  # Extract the first alignment's sequences

    # Example scoring scheme
    match = 2
    mismatch = -1
    gap_open = -2
    gap_extend = -1

    # Compute scores using custom functions
    computed_score = getScore(aligned_s1, aligned_s2, match, mismatch, gap_open, gap_extend)

    # Biopython computed score
    biopython_score = alignments[0][2]  # Score is part of the alignment tuple

    # Print results
    print("Aligned Sequences:")
    print(aligned_s1)
    print(aligned_s2)
    print("\nCustom Computed Metrics:")
    print("Matches:", countMatches(aligned_s1, aligned_s2))
    print("Mismatches:", countMismatches(aligned_s1, aligned_s2))
    print("Gap Opens:", countGapOpens(aligned_s1, aligned_s2))
    print("Gap Extensions:", countGapExtensions(aligned_s1, aligned_s2))
    print("Custom Score:", computed_score)
    print("\nBiopython Alignment Score:", biopython_score)
    print("\nAlignment:")
    print(format_alignment(*alignments[0]))

# Call the test function
test_alignment("IL12A.fasta", "IL12B.fasta")


BIOINFORMATICS STRONGHOLD EXERCISES 

In [None]:
#  exercise 1 (lgis)
def find_lis(arr):
    from bisect import bisect_left

    # Dynamic programming + binary search for LIS
    sub = []
    parent = [-1] * len(arr)
    pos = []
    
    for i, val in enumerate(arr):
        idx = bisect_left(sub, val)
        if idx == len(sub):
            sub.append(val)
            pos.append(i)
        else:
            sub[idx] = val
            pos[idx] = i
        if idx > 0:
            parent[i] = pos[idx - 1]
    
    # Reconstruct LIS
    lis = []
    last_index = pos[-1]
    while last_index != -1:
        lis.append(arr[last_index])
        last_index = parent[last_index]
    return lis[::-1]

def main():
    
    with open("rosalind_input.txt", "r") as file:
        n = int(file.readline().strip())
        permutation = list(map(int, file.readline().strip().split()))
    
    # Compute LIS and LDS
    lis = find_lis(permutation)
    lds = find_lis([-x for x in permutation])
    lds = [-x for x in lds]
    
    # Output 
    print(" ".join(map(str, lis)))
    print(" ".join(map(str, lds)))

if __name__ == "__main__":
    main()


117 211 288 319 394 484 540 554 605 633 776 795 803 871 883 913 955 966 1003 1097 1250 1311 1335 1350 1356 1457 1525 1584 1587 1677 1706 1765 1766 1790 1797 1810 1816 1857 1863 1898 1926 1959 1986 2004 2032 2036 2053 2067 2122 2143 2182 2288 2400 2416 2468 2473 2487 2493 2507 2555 2584 2656 2753 2799 2834 2905 2942 2957 2994 3030 3153 3386 3395 3455 3473 3640 3718 3785 3806 3873 3877 3916 3980 4117 4135 4152 4209 4273 4291 4329 4363 4444 4480 4497 4506 4507 4527 4583 4622 4809 4815 4818 4851 4866 4867 4931 4945 4966 5012 5033 5063 5150 5231 5233 5296 5299 5350 5381 5413 5494 5517 5706 5740 5742 5796 5806 5943 5962 5967 5977 6024 6045 6055 6141 6236 6250 6267 6315 6316 6367 6577 6620 6639 6647 6708 6751 6769 6782 6818 6909 6951 6994 7032 7140 7179 7183 7251 7255 7267 7412 7556 7608 7656 7710 7814 7857 7894 7976 8014 8082 8119 8126 8127 8151 8196 8286 8332 8334 8506
8658 8649 8590 8583 8568 8551 8547 8544 8531 8469 8397 8392 8375 8350 8335 8312 8308 8203 8194 8179 8137 8081 8069 8059 802

In [None]:
# exercise 2 (sseq)
def parse_fasta(filepath):
    """Parses a FASTA file and returns a list of sequences."""
    with open(filepath, 'r') as file:
        sequences = []
        current_seq = []
        for line in file:
            if line.startswith('>'):
                if current_seq:
                    sequences.append(''.join(current_seq))
                    current_seq = []
            else:
                current_seq.append(line.strip())
        if current_seq:
            sequences.append(''.join(current_seq))
    return sequences

def find_subsequence_indices(s, t):
    """Finds one collection of indices in s where t appears as a subsequence."""
    indices = []
    start = 0
    for char in t:
        start = s.find(char, start)  # Find the character starting from the current position
        if start == -1:
            return []  # If a character is not found, subsequence doesn't exist
        indices.append(start + 1)  # Convert to 1-based index
        start += 1
    return indices

# Main logic
file_path = "rosalind_input.txt"  # Replace with the actual file path
sequences = parse_fasta(file_path)

if len(sequences) != 2:
    raise ValueError(f"Expected exactly 2 sequences in the input, but found {len(sequences)}.")

s, t = sequences[:2]  # Take the first two sequences

indices = find_subsequence_indices(s, t)
if indices:
    print(" ".join(map(str, indices)))
else:
    print("No subsequence found.")




In [None]:
#exercise 2 (lcsq)
def read_fasta(file_path):
    """Reads a FASTA file and returns the sequences as a tuple."""
    with open(file_path, 'r') as file:
        lines = file.read().strip().split('>')
        sequences = []
        for line in lines:
            if line:
                parts = line.split('\n')
                sequences.append(''.join(parts[1:]))
        return tuple(sequences)

def longest_common_subsequence(s, t):
    m, n = len(s), len(t)
    dp = [[0] * (n + 1) for _ in range(m + 1)]
    
    # Fill DP table
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if s[i - 1] == t[j - 1]:
                dp[i][j] = dp[i - 1][j - 1] + 1
            else:
                dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])
    
    # Backtrack to find the LCS
    i, j = m, n
    lcs = []
    while i > 0 and j > 0:
        if s[i - 1] == t[j - 1]:
            lcs.append(s[i - 1])
            i -= 1
            j -= 1
        elif dp[i - 1][j] > dp[i][j - 1]:
            i -= 1
        else:
            j -= 1
    
    return ''.join(reversed(lcs))


input_file = 'rosalind_input.txt'  
s, t = read_fasta(input_file)
lcs = longest_common_subsequence(s, t)

# Output 
print(lcs)

ACCTGG


In [None]:
#exercise 4 (edit)
def parse_fasta(filepath):
    """Parses a FASTA file and returns a list of sequences."""
    with open(filepath, 'r') as file:
        sequences = []
        current_seq = []
        for line in file:
            if line.startswith('>'):
                if current_seq:
                    sequences.append(''.join(current_seq))
                    current_seq = []
            else:
                current_seq.append(line.strip())
        if current_seq:
            sequences.append(''.join(current_seq))
    return sequences

def edit_distance(s, t):
    """Calculates the edit distance between two strings s and t."""
    m, n = len(s), len(t)
    dp = [[0] * (n + 1) for _ in range(m + 1)]

    # Base cases
    for i in range(m + 1):
        dp[i][0] = i
    for j in range(n + 1):
        dp[0][j] = j

    # Fill the dp table
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if s[i - 1] == t[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = min(
                    dp[i - 1][j] + 1,    # Deletion
                    dp[i][j - 1] + 1,    # Insertion
                    dp[i - 1][j - 1] + 1 # Substitution
                )

    return dp[m][n]


file_path = "rosalind_input.txt"  
sequences = parse_fasta(file_path)

if len(sequences) != 2:
    raise ValueError(f"Expected exactly 2 sequences in the input, but found {len(sequences)}.")

s, t = sequences
result = edit_distance(s, t)
print(result)


5


In [None]:
#exercise 5 (edita)
def read_fasta(file_path):
    """Reads a FASTA file and returns the sequences as a tuple."""
    with open(file_path, 'r') as file:
        lines = file.read().strip().split('>')
        sequences = []
        for line in lines:
            if line:
                parts = line.split('\n')
                sequences.append(''.join(parts[1:]))
        return tuple(sequences)

def edit_distance_alignment(s, t):
    m, n = len(s), len(t)
    dp = [[0] * (n + 1) for _ in range(m + 1)]
    
    # Base cases
    for i in range(m + 1):
        dp[i][0] = i
    for j in range(n + 1):
        dp[0][j] = j
    
    # Fill DP table
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if s[i - 1] == t[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1])
    
    # Backtrack to find the alignment
    i, j = m, n
    s_aligned, t_aligned = [], []
    
    while i > 0 or j > 0:
        if i > 0 and j > 0 and s[i - 1] == t[j - 1]:
            s_aligned.append(s[i - 1])
            t_aligned.append(t[j - 1])
            i -= 1
            j -= 1
        elif i > 0 and dp[i][j] == dp[i - 1][j] + 1:
            s_aligned.append(s[i - 1])
            t_aligned.append('-')
            i -= 1
        elif j > 0 and dp[i][j] == dp[i][j - 1] + 1:
            s_aligned.append('-')
            t_aligned.append(t[j - 1])
            j -= 1
        else:
            s_aligned.append(s[i - 1])
            t_aligned.append(t[j - 1])
            i -= 1
            j -= 1
    
    s_aligned = ''.join(reversed(s_aligned))
    t_aligned = ''.join(reversed(t_aligned))
    
    return dp[m][n], s_aligned, t_aligned


input_file = 'rosalind_input.txt'
s, t = read_fasta(input_file)
edit_distance, s_aligned, t_aligned = edit_distance_alignment(s, t)

# output
print(edit_distance)
print(s_aligned)
print(t_aligned)

4
PRETTY--
PR-TTEIN


In [None]:
#exercise 6 (ctea)
def read_fasta(file_path):
    """Reads a FASTA file and returns the sequences as a tuple."""
    with open(file_path, 'r') as file:
        lines = file.read().strip().split('>')
        sequences = []
        for line in lines:
            if line:
                parts = line.split('\n')
                sequences.append(''.join(parts[1:]))
        return tuple(sequences)

def optimal_alignment_count(s, t):
    MOD = 134217727
    m, n = len(s), len(t)
    
    # Initialize DP and count tables
    dp = [[0] * (n + 1) for _ in range(m + 1)]
    count = [[0] * (n + 1) for _ in range(m + 1)]
    
    # Base cases
    for i in range(m + 1):
        dp[i][0] = i
        count[i][0] = 1
    for j in range(n + 1):
        dp[0][j] = j
        count[0][j] = 1

    # Fill DP and count tables
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            cost = 0 if s[i - 1] == t[j - 1] else 1
            
            # Calculate the minimum edit distance
            dp[i][j] = min(
                dp[i - 1][j - 1] + cost,  # Match/mismatch
                dp[i - 1][j] + 1,         # Deletion
                dp[i][j - 1] + 1          # Insertion
            )
            
            # Count the ways to reach dp[i][j]
            if dp[i][j] == dp[i - 1][j - 1] + cost:
                count[i][j] += count[i - 1][j - 1]
            if dp[i][j] == dp[i - 1][j] + 1:
                count[i][j] += count[i - 1][j]
            if dp[i][j] == dp[i][j - 1] + 1:
                count[i][j] += count[i][j - 1]
            
            # Apply modulo to keep numbers manageable
            count[i][j] %= MOD

    # Result is in count[m][n]
    return count[m][n]


input_file = 'rosalind_input.txt'  
s, t = read_fasta(input_file)
result = optimal_alignment_count(s, t)

# Output 
print(result)

4


In [None]:
#exercise 7 (glob)
import numpy as np

def parse_fasta(filepath):
    """Parses a FASTA file and returns a list of sequences."""
    with open(filepath, 'r') as file:
        sequences = []
        current_seq = []
        for line in file:
            if line.startswith('>'):
                if current_seq:
                    sequences.append(''.join(current_seq))
                    current_seq = []
            else:
                current_seq.append(line.strip())
        if current_seq:
            sequences.append(''.join(current_seq))
    return sequences

def load_blosum62():
    """Returns the BLOSUM62 scoring matrix as a dictionary."""
    blosum62_data = """
       A  R  N  D  C  Q  E  G  H  I  L  K  M  F  P  S  T  W  Y  V
    A  4 -1 -2 -2  0 -1 -1  0 -2 -1 -1 -1 -1 -2 -1  1  0 -3 -2  0
    R -1  5  0 -2 -3  1  0 -2  0 -3 -2  2 -1 -3 -2 -1 -1 -3 -2 -3
    N -2  0  6  1 -3  0  0  0  1 -3 -3  0 -2 -3 -2  1  0 -4 -2 -3
    D -2 -2  1  6 -3  0  2 -1 -1 -3 -4 -1 -3 -3 -1  0 -1 -4 -3 -3
    C  0 -3 -3 -3  9 -3 -4 -3 -3 -1 -1 -3 -1 -2 -3 -1 -1 -2 -2 -1
    Q -1  1  0  0 -3  5  2 -2  0 -3 -2  1  0 -3 -1  0 -1 -2 -1 -2
    E -1  0  0  2 -4  2  5 -2  0 -3 -3  1 -2 -3 -1  0 -1 -3 -2 -2
    G  0 -2  0 -1 -3 -2 -2  6 -2 -4 -4 -2 -3 -3 -2  0 -2 -2 -3 -3
    H -2  0  1 -1 -3  0  0 -2  8 -3 -3 -1 -2 -1 -2 -1 -2 -2  2 -3
    I -1 -3 -3 -3 -1 -3 -3 -4 -3  4  2 -3  1  0 -3 -2 -1 -3 -1  3
    L -1 -2 -3 -4 -1 -2 -3 -4 -3  2  4 -2  2  0 -3 -2 -1 -2 -1  1
    K -1  2  0 -1 -3  1  1 -2 -1 -3 -2  5 -1 -3 -1  0 -1 -3 -2 -2
    M -1 -1 -2 -3 -1  0 -2 -3 -2  1  2 -1  5  0 -2 -1 -1 -1 -1  1
    F -2 -3 -3 -3 -2 -3 -3 -3 -1  0  0 -3  0  6 -4 -2 -2  1  3 -1
    P -1 -2 -2 -1 -3 -1 -1 -2 -2 -3 -3 -1 -2 -4  7 -1 -1 -4 -3 -2
    S  1 -1  1  0 -1  0  0  0 -1 -2 -2  0 -1 -2 -1  4  1 -3 -2 -2
    T  0 -1  0 -1 -1 -1 -1 -2 -2 -1 -1 -1 -1 -2 -1  1  5 -2 -2  0
    W -3 -3 -4 -4 -2 -2 -3 -2 -2 -3 -2 -3 -1  1 -4 -3 -2 11  2 -3
    Y -2 -2 -2 -3 -2 -1 -2 -3  2 -1 -1 -2 -1  3 -3 -2 -2  2  7 -1
    V  0 -3 -3 -3 -1 -2 -2 -3 -3  3  1 -2  1 -1 -2 -2  0 -3 -1  4
    """
    lines = blosum62_data.strip().split("\n")
    amino_acids = lines[0].split()
    matrix = {}
    for i, line in enumerate(lines[1:]):
        row = line.split()
        matrix[amino_acids[i]] = {amino_acids[j]: int(row[j + 1]) for j in range(len(amino_acids))}
    return matrix

def max_alignment_score(s, t, blosum62, gap_penalty):
    """Calculates the maximum alignment score between s and t using BLOSUM62 and gap penalty."""
    m, n = len(s), len(t)
    dp = np.zeros((m + 1, n + 1), dtype=int)

    # Initialize base cases
    for i in range(1, m + 1):
        dp[i][0] = dp[i - 1][0] - gap_penalty
    for j in range(1, n + 1):
        dp[0][j] = dp[0][j - 1] - gap_penalty

    # Fill the dp table
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            match = dp[i - 1][j - 1] + blosum62[s[i - 1]][t[j - 1]]
            delete = dp[i - 1][j] - gap_penalty
            insert = dp[i][j - 1] - gap_penalty
            dp[i][j] = max(match, delete, insert)

    return dp[m][n]


file_path = "rosalind_input.txt" 
sequences = parse_fasta(file_path)

if len(sequences) != 2:
    raise ValueError(f"Expected exactly 2 sequences in the input, but found {len(sequences)}.")

s, t = sequences
gap_penalty = 5
blosum62 = load_blosum62()

alignment_score = max_alignment_score(s, t, blosum62, gap_penalty)
print(alignment_score)


8
