In [1]:
# EX 1 - Bioinformatics Stronghold: Longest Increasing Subsequence - lgis

"""""
Given: A positive integer n≤10000 followed by a permutation π of length n.

Return: A longest increasing subsequence of π, followed by a longest decreasing subsequence of π.
"""""

# I need to find the longest increasing and decreasing subsequences in a permutation.
# LIS = Longest Increasing Subsequence; LDS = Longest Decreasing Subsequence

# Dynamic programming to find LIS
# This function: - keeps track of the length of the longest increasing subsequence (LIS) that ends at each position
# - For each element, it looks at all the previous elements to build the optimal subsequence
# - "sequence" is the input parameter of the function -> it's the permutation of numbers I am analyzing
def find_LIS(sequence):
    
    # I calculate the length of the sequence (ex: len([5, 1, 4, 2, 3]) = 5)
    # I assign this value to a variable called n
    # This line is useful because: - I need to know the length of the sequence to create support arrays of the right size; - I will use it in the following for loops to iterate over the sequence;
    n = len(sequence)
    
    # I create two arrays of length n - dp and prev:

    # I create a dp (dynamic programming) array of length n where all elements are initialised to 1 -> It is initialised to 1 because each element alone is an increasing subsequence of length 1
    # Each position dp[i] will represent the length of LIS that ends with the element at position i
    dp = [1] * n
    
    # I create a prev array of length n where all elements are initialised to -1
    # Each position prev[i] stores the index of the previous element in LIS
    # I use -1 as the initial value to indicate ‘no previous item’
    prev = [-1] * n
    
    # For each position, I look at all the previous elements:
    # - Iteration on the sequence from the second element (index 1) -> because the first element alone is always an increasing subsequence of length 1 -> So I start from the 
    # second element to try to extend the subsequences
    for i in range(1, n):
        
        # This loop iterates over all elements preceding the current element "sequence[i]"
        # I compare sequence[i] with all previous elements to see if it can extend an existing subsequence
        for j in range(i):
            
            # sequence[i] > sequence[j] -> It checks whether the current element is greater than the previous element -> This means that sequence[i] can be added at the end of a increasing subsequence
            # dp[j] + 1 > dp[i] -> It checks whether extending the subsequence ending in sequence[j] (of length dp[j]) with sequence[i] results in a subsequence longer than the current one
            # dp[i] contains the length of the LIS ending in sequence[i]
            if sequence[i] > sequence[j] and dp[j] + 1 > dp[i]:
                
                # If the previous condition is true, I update dp[i] with the length of the longest subsequence ending in sequence[j], plus 1 (to include sequence[i])
                # This update ensures that dp[i] stores the length of LIS ending in sequence[i].
                dp[i] = dp[j] + 1
                
                # I use prev to rebuild the actual sequence:
                # - I store the index j in prev[i]
                # - This will later allow me to reconstruct the actual subsequence from the last element
                prev[i] = j

                # Recap about this for loop:
                # - It compares each sequence[i] element with all preceding elements sequence[j].
                # - It updates the length of LIS that ends in sequence[i] if it can be extended from a LIS that ends in sequence[j]
                # - It stores the index j in order to reconstruct the actual subsequence later on
    
    # Find the length of LIS (max_length):
    # I find the maximum value in the dp array
    # By taking the max() of all the values in dp, I'm finding the overall length of LIS
    # ex: if the final dp array is [1, 1, 2, 2, 3], then max_length will be set to 3, which is the length of LIS
    max_length = max(dp)
    
    # I find the index of the last element in LIS
    # dp.index(max_length) -> returns the index of the first occurrence of the max_length value in the dp array
    # ex:  if dp is [1, 1, 2, 2, 3], then dp.index(3) would return 4, since the value 3 (the maximum value) is at index 4 in the dp array.
    # This allows me to reconstruct the actual sequence of elements that make up LIS. By starting from the index of the last element (end_index) 
    # and working backwards using the prev array, I can trace back through the sequence to find all the elements in LIS
    # I need to know the end_index because LIS can appear at different positions in the original sequence
    end_index = dp.index(max_length)
    
    # I reconstruct the actual elements of LIS from the information stored in the dp and prev arrays:
    
    # Empty list to store the elements of LIS in the correct order.
    result = []
    
    # Loop that continues as long as end_index is not equal to -1.
    # end_index variable -> previously set to the index of the last element in LIS
    while end_index != -1:
        
        # I append the element at the current end_index position in the original sequence to the result list
        # -> This adds the elements of LIS to the result list in reverse order
        result.append(sequence[end_index])
        
        # I set end_index to the value stored in prev[end_index] (prev array stores the index of the previous element in LIS for each index)
        # So, on LIS, I am starting from the last element and working my way towards the beginning
        end_index = prev[end_index]
    
    # After the loop completes, I have the elements of LIS in the result list, but in reverse order.
    # The [::-1] slice notation is used to reverse the order of the list, so that the elements are returned in the correct order.
    return result[::-1]  


# This function is similar as find_LIS, but reverses the comparison condition (I use "<" instead of ">") in order to find the longest decreasing subsequence
# It takes the input "sequence" as a parameter
def find_LDS(sequence):
    
    # I save the length of the input sequence in the variable n -> in order to simplify array allocation and indexing
    n = len(sequence)
    
    # I initialize two arrays: dp and prev -> both initialized with 1 and -1 respectively

    # dp stores the length of LDS ending at each index.
    dp = [1] * n
    
    # prev stores the index of the previous element in LDS ending at each index.
    prev = [-1] * n
    
    # Nested loops to iterate through the sequence -> I compare each element to all the previous elements
    for i in range(1, n):
        for j in range(i):
            
            # sequence[i] < sequence[j] -> checks if the current element is less than the previous element (which is the criteria for a decreasing subsequence)
            # dp[j] + 1 > dp[i] -> checks if extending the LDS ending at index j (of length dp[j]) with the current element sequence[i] results 
            # in a longer decreasing subsequence than the current LDS ending at index i.
            if sequence[i] < sequence[j] and dp[j] + 1 > dp[i]:
                
                # If both conditions are true, I update dp[i] and prev[i] accordingly
                dp[i] = dp[j] + 1
                prev[i] = j
    
    # Length of the overall LDS
    max_length = max(dp)
    
    # Index of the last element in this "overall LDS"
    end_index = dp.index(max_length)
    
    # I reconstructs the actual elements of LDS by following the prev pointers backwards, starting from the last element
    # It's the same process as in the LIS case, just with the sequence being decreasing instead of increasing
    result = []
    while end_index != -1:
        result.append(sequence[end_index])
        end_index = prev[end_index]
    
    # I reverse the list to get the correct order
    return result[::-1]  

# This function calls both functions above and returns both the sequences found
# It takes two parameters: n -> the length of the input permutation; permutation -> the input permutation, which is a list of integers
def LIS_LDS(n, permutation):
    
    # I call the find_LIS function, passing the permutation as an argument
    # The function returns the longest increasing subsequence, which is stored in the increasing variable
    increasing = find_LIS(permutation)
    
    # I call the find_LDS function, also passing the permutation as an argument
    # The function returns the longest decreasing subsequence, which is stored in the decreasing variable
    decreasing = find_LDS(permutation)
    
    # I return both the increasing and decreasing subsequences as a tuple
    return increasing, decreasing

# This function takes two parameters: increasing -> the longest increasing subsequence, which is a list of integers;
# decreasing -> the longest decreasing subsequence, which is also a list of integers
def format_output(increasing, decreasing):
    
    # I format the output as requested -> I use an f-string (formatted string literal) to construct the final output
    # ' '.join(map(str, increasing)) -> takes the increasing list of integers and converts each element to a string using the map(str, increasing) call
    # The ' '.join() function then concatenates all the string representations of the elements, separated by a space character -> The result is a space-separated 
    # string of the elements in the increasing list. 

    # '\n' (newline character) -> acts as a separator between the increasing and decreasing subsequences in the final output

    # ' '.join(map(str, decreasing)) -> this part is similar to the increasing part, but it operates on the decreasing list of integers
    # It converts each element to a string and joins them with space characters
    return f"{' '.join(map(str, increasing))}\n{' '.join(map(str, decreasing))}"

# This if statement checks if the current script is being run as the main program (as opposed to being imported as a module)
# The with statement ensures that the file is properly closed after the code inside the block is executed
if __name__ == "__main__":
    
    # I open the file in read mode ("r")
    with open("./bs-datasets/rosalind_lgis.txt", "r") as file:
        
        # I read all the lines from the file and I store them in the "lines" list
        # Each element in "lines" will be a string representing a single line from the file
        lines = file.readlines()
        
        # The first line of the input file contains the length of the permutation, which is stored in the variable n
        # I use the strip() method to remove any leading or trailing whitespace characters from the line
        # I use the int() function to convert the string to an integer
        n = int(lines[0].strip())
        
        # The second line of the input file contains the permutation itself, which is a space-separated list of integers
        # lines[1].strip().split() -> splits the line into a list of strings, where each string represents a single integer in the permutation
        # I use the map(int, ...) function to convert each string to an integer
        # I use the list() function to convert the map object to a regular Python list, which is stored in the permutation variable
        permutation = list(map(int, lines[1].strip().split()))
    
    # I call the LIS_LDS function, passing the n (length of the permutation) and the permutation itself as arguments
    # The function returns the longest increasing subsequenc (LIS) e and the longest decreasing subsequence (LDS), which are stored in the increasing and decreasing variables, respectively
    increasing, decreasing = LIS_LDS(n, permutation)
    
    # I call the format_output function, passing the increasing and decreasing subsequences as arguments
    # The function returns the formatted output string, which is stored in the result variable
    result = format_output(increasing, decreasing)
    print(result)

129 154 178 289 328 360 423 439 496 516 659 685 735 778 793 797 816 896 946 993 1006 1021 1065 1084 1120 1240 1243 1435 1459 1481 1606 1621 1833 1834 1867 1903 1909 1917 1952 2023 2035 2059 2084 2085 2197 2297 2426 2513 2583 2598 2692 2723 2727 2788 2798 2799 2932 2986 3046 3108 3146 3230 3364 3414 3450 3471 3506 3507 3549 3578 3661 3692 3745 3780 3899 3950 3974 3979 3989 4026 4142 4307 4309 4592 4622 4628 4662 4738 4746 4850 5058 5107 5187 5234 5235 5269 5391 5395 5468 5505 5563 5630 5633 5891 5900 5947 5964 5992 6059 6067 6160 6165 6222 6258 6413 6463 6475 6680 6691 6716 6796 6803 6815 6841 6878 6892 6920 6964 7000 7041 7049 7092 7205 7215 7236 7272 7322 7381 7398 7460 7498 7566 7577 7582 7632 7637 7658 7668 7719 7722 7804 7824 7885 7971 8099 8131 8211 8275 8296 8305 8325 8362 8363 8472 8482 8665 8701 8724 8733 8751 8831 8833 8847 8886 8920 8985 8986 8993
9005 8943 8849 8783 8748 8512 8310 8293 8260 8174 8076 7898 7862 7728 7713 7691 7665 7573 7518 7517 7487 7457 7449 7350 7315 7277 

In [5]:
# EX 2 - Bioinformatics Stronghold: Finding a Spliced Motif - sseq

"""""
Given: Two DNA strings s and t (each of length at most 1 kbp) in FASTA format.

Return: One collection of indices of s in which the symbols of t appear as a subsequence of s. If multiple solutions exist, you may return any one.
"""""

# How I solve this:
# 1. I iterate through each character in t, and for each character, I search for that character in s (starting from the current position s_index)
# 2. I keep track of the indices where the characters from t are found in s, and return the list of these indices at the end


# I import SeqIO from the Biopython library, which makes it easy to read files in FASTA format
from Bio import SeqIO

# This function searches for positions where the characters of string t appear as a subsequence in s
def indices_collection(s, t):

    # I initialize an empty list "indices" to store the indices
    indices = []  
    
    # I start from the beginning of 's'
    s_index = 0  
    
    # I iterate through each character in 't'
    # On the first iteration, "ch" will be the first character in t, then the second character, and so on
    for ch in t:
        
        # I search for the current character "ch" in the string s, starting from the current position s_index
        # The loop continues as long as "s_index" is less than the length of "s" (meaning we haven't reached the end of s) and the character at the current 
        # "s_index" position in "s" is not equal to the current "ch" from "t".
        while s_index < len(s) and s[s_index] != ch:
            
            # I increment the s_index variable by 1, moving the search position one character further in s
            s_index += 1
        
        # I check if the current character "ch" from "t" was found in "s"
        # If s_index is still less than the length of s, it means the character was found
        if s_index < len(s):  
            
            # If the character was found, I add the 1-based index (since the problem statement uses 1-based indexing) to the "indices" list
            indices.append(s_index + 1)  
            
            # I increment the s_index variable by 1 to move to the next position in s, so the next character from t can be searched for
            s_index += 1  
        
        else:
            
            # If the current character from t was not found in s, this else block is executed, and the loop is broken
            break  
    
    return indices

if __name__ == "__main__":
    
    # I read the input using Biopython
    with open("./bs-datasets/rosalind_sseq.txt", "r") as file:
        
        # I use SeqIO.parse() to read the file rosalind_sseq.txt in FASTA format. I obtain a list of records, where each record represents a sequence in the file
        # I extract the sequences as strings with str(record.seq)
        sequences = [str(record.seq) for record in SeqIO.parse(file, "fasta")]

    # I assign the first sequence of the file to s and the second to t
    s, t = sequences[0], sequences[1]

    # I find the subsequence indices
    result = indices_collection(s, t)
    
    # I print the subsequence indices in a readable format:
    # - map(str, result) -> map() is a function that applies a given function (in this case, str()) to each item of the result
    # - The str function converts each element to a string
    # - " ".join() -> method that takes a sequence of strings and joins them into a single string, with a space (" ") between each element. -> So, it will take the list of string-converted items and join them with spaces, creating one long string.
    print(" ".join(map(str, result)))



2 4 10 11 19 30 32 44 50 51 58 62 63 64 68 70 72 73 83 85 93 96 97 102 104 105 108 113 116 119 120 128 129 137 149 157 158 164 166 170 174 177 181 183 190 192 195 204 224 232 236 237 240 249 250 253 257 258 266 271 275 277 285 287 289 290 292 295 298 309 316 319 324 329 332 340 342 344


In [3]:
# EX 3 - Bioinformatics Stronghold: Finding a Shared Spliced Motif - lcsq

"""""
Given: Two DNA strings s and t (each having length at most 1 kbp) in FASTA format.

Return: A longest common subsequence of s and t. (If more than one solution exists, you may return any one.)
"""""

# A subsequence is a sequence derived from another by removing some characters without changing their order (ex: "ACTG" is a common subsequence of "AACCTTG" and "ACACTGTGA")
# To find the longest common subsequence (LCS) between two strings "s" and "t", I use dynamic programming -> I break down the complex problem into simpler subproblems and store their solutions to avoid redundant calculations
# 1. I create a matrix to calculate the length of the LCS between all substrings of s and t; 2. I reconstruct the LCS from the matrix

# I use Biopython -> SeqIO to read the sequences from the FASTA file
from Bio import SeqIO

def LCS(s, t):

    # I create a matrix to store LCS lengths:
    # I assign to variables "m" and "n", the length of the first string (s) and the length of the second string (t) respectively
    m, n = len(s), len(t)

    # I create a matrix (m+1) x (n+1) dp where each cell dp[i][j] represents the length of the LCS between the first "i" characters of "s" and first "j" characters of "t"
    dp = [[0] * (n + 1) for _ in range(m + 1)]  

    # I fill the matrix by comparing characters from both strings:
    # This loop runs through the rows of the dynamic matrix -> "i" represents the index of the string "s"; I start with 1 (not 0) to handle basic cases more easily;
    # I stop at m + 1 (length of s + 1) to cover the whole string
    for i in range(1, m + 1):
        
        # This cycle runs through the columns of the dynamic matrix -> "j" represents the index of the string "t"; I start from 1 and go up to n + 1;
        # For each "i", I scroll through all possible "j"
        for j in range(1, n + 1):

            # When s[i - 1] is equal to t[j - 1], it means that I have found a common character
            if s[i - 1] == t[j - 1]:  
                
                # So I increase the length of the common subsequence by 1 -> that is, I take the value from the upper left diagonal (dp[i - 1][j - 1]) and add 1
                dp[i][j] = dp[i - 1][j - 1] + 1
            
            # When the characters are different, I take the maximum between: 1. Upper value (dp[i - 1][j]): ignoring the last letter of s;
            # 2. Left value (dp[i][j - 1]): ignoring the last letter of t
            else:  
                dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])

    # From the matrix, I reconstruct the LCS by following the corresponding characters and ignoring those not in common
    # I start from the bottom right of the matrix and trace the steps backwards by following the correspondences:
    
    # Empty list to store the common subsequence
    lcs = []
    
    # I set "i" and "j" to the last row and column of the matrix (original string lengths)
    i, j = m, n

    # I continue until I reach the beginning of one of the two strings, so that I can explore the entire matrix by going back up
    while i > 0 and j > 0:
        
        # If the characters at positions i-1 and j-1 are identical..
        if s[i - 1] == t[j - 1]:  
            
            # .. I add the matching characters to the lcs list
            lcs.append(s[i - 1])
            
            # I move to the top left in the matrix
            i -= 1
            j -= 1
        
        # If the upper value is greater than or equal to the value on the left..
        elif dp[i - 1][j] >= dp[i][j - 1]:  
            
            # .. I move upwards (remove a character from s)
            i -= 1
        
        else:
            
            # Otherwise, I move to the left (remove a character from t)
            j -= 1

    # I return the LCS as a string (I reverse the list to get the correct order)
    return ''.join(reversed(lcs))

if __name__ == "__main__":
    
    # I read the sequences from the FASTA file using Biopython
    with open("./bs-datasets/rosalind_lcsq.txt", "r") as file:
        
        # SeqIO.parse reads the FASTA file and returns the sequences as objects
        # With str(record.seq) I extract the sequence as a string
        sequences = [str(record.seq) for record in SeqIO.parse(file, "fasta")]

    # The two sequences
    s, t = sequences[0], sequences[1]

    # I return the LCS (longest common subsequence)
    result = LCS(s, t)
    print(result)


TTAGGTCCGCCCTCGGGCATCCATTCGCTAACAGGAATCGTTGTGGCAACTTTGTCATACCTTGATACAGTTAAGCCCCAACAACCCGGGAGGCAACGTTTATATCGGGGTACTATATTGGCCGAACACACGCTTTTCACGGGGAGGGCGATTAGTGGGTAGAGGGGCATCAGTTTTTTGCACACGTAGCGCACTAACATATCTGACCAACGATAAGTTTGCCTATAGGAATCTGAAACTCCCCCATCACTTTTTCGCAATTGCCATGACTAGAACTCGCAGTGCCACAAAGGTAGCATCGGTTTTTCTCATTGTTTGGCTTATGACTGGGCCCGCTATATTGCTAAGTTCCAATTTCACAGAGGACCACTCGTGGACCGGTATTGACACCGAGATTGGATAACGATCTGGACGCATAGGGGTTCGCACCACGGTTTAGTTTATTGACCGTTTCAGAGATGTAGAAACACGGGTCAAACATATAGAAAGACTCTTTTTCCAACCGATTTGGTAAAATTACTTCTTAGGTTGGACATCGTTTCCGGGGAGCTGAAGGTT


In [15]:
# EX 4 - Bioinformatics Stronghold: Edit Distance - edit

"""""
Given: Two protein strings s and t in FASTA format (each of length at most 1000 aa).

Return: The edit distance d_E(s,t)
"""""

# I have to calculate the edit distance between two strings s and t.
# The edit distance is the minimum number of edit operations (substitution, insertion, or deletion of a single symbol) needed to transform s into t. 

# I use a dynamic programming approach to calculate the edit distance between the two input strings (s, t)
def edit_distance(s, t):

    m, n = len(s), len(t)

    # I create a two-dimensional table (a matrix) of size (m+1) x (n+1)
    # Each cell of the dp[i][j] array will represent the edit distance between the first "i" characters of "s" and the first "j" characters of "t"
    # I add 1 to ‘m’ and ‘n’ (i.e. I add one extra row and column) to handle the basic case where one of the strings is empty. 
    # -> This allows me to calculate distances even when comparing against an empty string.
    dp = [[0] * (n + 1) for _ in range(m + 1)]
    
    # I fill the first row and column with sequential values: 
    # dp[i][0]: This represents the edit distance between the first i characters of s and an empty string. The cost is "i" (all characters need to be deleted)
    # dp[0][j]: This represents the edit distance between an empty string and the first j characters of t. The cost is "j" (all characters need to be inserted)
    for i in range(m + 1):
        dp[i][0] = i
    for j in range(n + 1):
        dp[0][j] = j
    
    # I loop through each cell of the table, comparing the characters s[i-1] and t[j-1] (note the -1 because dp is 1-based while the strings are 0-based).
    # If the characters are the same (s[i-1] == t[j-1]) no edit is needed, so I copy the value from the previous diagonal cell dp[i-1][j-1].
    # If the characters are different, I calculate the minimum cost to make them the same using 3 possible operations: 1. Deletion: Remove the character s[i-1] → dp[i-1][j];
    # 2. Insertion: Add the character t[j-1] → dp[i][j-1]; 3. Substitution: Replace s[i-1] with t[j-1] → dp[i-1][j-1].
    # I add 1 to the minimum of these values to account for the cost of the current operation
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if s[i - 1] == t[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1])
    
    # I return the value stored in dp[m][n], which represents the minimum edit distance between the entire strings s and t
    return dp[m][n]

# The problem asks for input in FASTA format, so the strings "s" and "t" are not in the first 2 lines of the file, but must be extracted correctly using a FASTA parser
# In FASTA format, each string is preceded by an identifier (ex: >Rosalind_39); Strings are distributed over several lines after the header.
# -> So I have to ignore the lines starting with > and concatenate the subsequent lines to get the sequences.
def parse_fasta(filename):
    
    # I open the input file (specified by file_path) and read all its lines into a list called "lines". Each line is stored as a separate string in the list.
    with open(filename, "r") as file:
        lines = file.readlines()
    
    sequences = {}
    current_id = None
    
    # I process the lines to extract the sequences and their IDs
    for line in lines:
        
        # If a line starts with >, it indicates a new sequence ID. I store this ID in the variable current_id and initialize an empty string for it in the dictionary "sequences".
        if line.startswith(">"):  
            current_id = line.strip()
            sequences[current_id] = ""
        
        else:
            
            # If a line does not start with >, it’s part of a sequence, so I append it to the string associated with the current ID in the dictionary.
            # At the end of this process, the sequences dictionary contains sequence IDs as keys and their corresponding sequences as values.
            sequences[current_id] += line.strip()
    
    # I extract only the sequences (ignoring their IDs) by taking the values from the "sequences" dictionary and converting them into a list
    # The result is a list of the two sequences that I’ll use for the edit distance calculation
    return list(sequences.values())

if __name__ == "__main__":
    
    # I read input from the FASTA file
    filename = "./bs-datasets/rosalind_edit.txt"
    s, t = parse_fasta(filename)
    
    # I calculate the edit distance between s and t
    distance = edit_distance(s, t)
    print(distance)

300


In [16]:
# EX 5 - Bioinformatics Stronghold: Edit Distance Alignment - edta

"""""
Given: Two protein strings s and t in FASTA format (with each string having length at most 1000 aa).

Return: The edit distance d_E(s,t) followed by two augmented strings s' and t' representing an optimal alignment of s and t.
"""""

def EditDistance(s, t):
    
    # 1. I initialize the DP table
    # I create a 2D DP table where dp[i][j] that will store the minimum edit distance to align the first i characters of s with the first j characters of t.
    
    # I store the lengths of the strings
    n, m = len(s), len(t)
    
    # I initialize a table of size (n+1) x (m+1)
    dp = [[0] * (m + 1) for _ in range(n + 1)]

    # 2. Base cases (first row and column)
    # If one string is empty, the edit distance (the cost) is the length of the other string (all insertions or deletions).
    for i in range(n + 1):
        
        # I set the cost of deleting all characters in s
        dp[i][0] = i  
    
    for j in range(m + 1):
        
        # I set the cost of inserting all characters in t
        dp[0][j] = j  

    # 3. I fill the DP table
    # I calculate the minimum cost for each cell considering insertion, deletion, and substitution:
    
    # I loop through all characters of s
    for i in range(1, n + 1):
        
        # I loop through all characters of t
        for j in range(1, m + 1):
            
            # No cost if the characters match, else -> Substitution cost (1) if the characters don't match
            cost = 0 if s[i - 1] == t[j - 1] else 1  
            
            # I calculate the minimum of the 3 operations: deletion, insertion, substitution:
            # Deletion
            dp[i][j] = min(dp[i - 1][j] + 1,     
                           
                           # Insertion
                           dp[i][j - 1] + 1,     
                           
                           # Substitution
                           dp[i - 1][j - 1] + cost)  

    # 4. Traceback to reconstruct alignments
    # I use the DP table to trace back the optimal alignment of the strings.
    
    # I initialize two lists to store the aligned strings
    aligned_s, aligned_t = [], []
    
    # I start from the bottom-right corner of the DP table
    i, j = n, m
    
    # I trace back until I reach the top-left corner
    while i > 0 or j > 0:
        
        # If the current cell corresponds to a match or substitution..
        if i > 0 and j > 0 and dp[i][j] == dp[i - 1][j - 1] + (0 if s[i - 1] == t[j - 1] else 1):
            
             # .. I add the character from s
            aligned_s.append(s[i - 1])
            
            # .. I add the character from t
            aligned_t.append(t[j - 1])
            i -= 1
            j -= 1
        
        # If the current cell corresponds to a deletion in s..
        elif i > 0 and dp[i][j] == dp[i - 1][j] + 1:
            
            # .. I add the character from s
            aligned_s.append(s[i - 1])
            
            # .. I add a gap to t
            aligned_t.append('-')
            i -= 1
        
        else:  # j > 0 and dp[i][j] == dp[i][j - 1] + 1
            
            # If the current cell corresponds to an insertion in t:

            # I add a gap to s
            aligned_s.append('-')
            
            # I add the character from t
            aligned_t.append(t[j - 1])
            j -= 1

    # I reverse the alignments because I built them backwards during traceback:
    # -> I join the reversed list into a string
    aligned_s = ''.join(reversed(aligned_s))
    aligned_t = ''.join(reversed(aligned_t))

    # I return the edit distance and the aligned strings
    return dp[n][m], aligned_s, aligned_t


def parse_fasta(filename):
    
    # I read the FASTA file and extract the sequences
    with open(filename, "r") as file:
        
        # I create a dictionary to store the sequences
        sequences = {}
        
        # I initialize a variable to store the current label
        current_label = None
        for line in file:
            
            # I remove any whitespace or newline characters
            line = line.strip()
            
            # If the line starts with ">", it's a new sequence label
            if line.startswith(">"):
                
                # I store the label (without the ">")
                current_label = line[1:]
                
                # I initialize an empty sequence for this label
                sequences[current_label] = ""
            
            else:
                
                # Otherwise, the line contains sequence data
                # -> I append the data to the current sequence
                sequences[current_label] += line
    
    # I return the sequences as a list
    return list(sequences.values())



if __name__ == "__main__":
    
    # I parse the input from the FASTA file:
    # -> I read the sequences from the file
    sequences = parse_fasta("./bs-datasets/rosalind_edta.txt")
    
    # I assign the first and second sequences to s and t
    s, t = sequences[0], sequences[1]

    # I compute the edit distance and alignments
    distance, alignment_s, alignment_t = EditDistance(s, t)

    # I format and print the result:
    
    # I print the edit distance
    print(distance)
    
    # I print the aligned version of s
    print(alignment_s)
    
    # I print the aligned version of t
    print(alignment_t)



395
-GVMSSQQQPEWYDQIQMMDAPIFSYHRFMC--LHWI-NGNY-KGD---L--PLGDDPMREHPALFKPEVAGLWHVDSQQEFNQEFGIHIIDEEERLWGWQGPWYDHKWYRYMTMPCGSHCPFIWVGLHNMLLGQNKK-----QGL-M-------YHKLPTDFIWAGGWADVRSCNF--IEISQIQDGKNYFV-----AQLTLDAYWNNARWHLDAKQWQQRLDLKSLKYLHIVQPCEFAQGHDQIFSMVVTTFVFEVPLAKWSAGPFASGEAVCEDDCEYPMEPPNQNEPFISVD-FTVWCVNVDYNVQLN--PACEVESIICPYYMLQGMHYLFTGLWW----E---GVEWHGQGERDEVHLGPTLICNDEQDWSISFPIRWY-PYTWPIYMHYRTNFARTKETHIREENPQALYIQQWCMQDHKYQNTCPHCPGWPYGDLWLHSNWNPQPMPYIEDECNKMKGVFNGFEGYANYWSNPKYNGCGPFAFNIKWQQELIYIELIQDSEYANTANLIH-E-Q-----GYMIIWPISKYMRSSSNFPRRNRTR--PHIVIWVTCPKAWWAET--K-CRGFHRLPAHQYCTY---KGPRRKACHDVFVCT--ETGE-METPET-TTFGPNHRFHH------M-------MGRCEFTS--YA--PLM-HSG-KDS-R-L---GHEW-ACWYCFACTLTSDPHHTFYWATHGNENWGIVYWM---YFRNVCWDMEDSVHFWCQYDDTLWFNDTMHYM-TYQTAVPWACQTTQCI--NHEKHLTDLEY--Q-----PVNLEFEFCDQEQCFFRLFTMKRQGAQDPKYDLDRSLGDRYT--------VAGQGFY-----PDE
DDKWNCYQQPDIYEQIQ--IFPI-SLMDATCKSKMMIEIFSYTRADVCCLHWHNGDDPMVEHPALFK--------VDSHKYWTQHILIHIIGEEERLWGW--NNYRTLTY-YMTCPFIWTC-NGGYGLHNMLLGQTKKP

In [18]:
# EX 6 - Bioinformatics Stronghold: Counting Optimal Alignments - ctea

"""""
Given: Two protein strings s and t in FASTA format, each of length at most 1000 aa.

Return: The total number of optimal alignments of s and t with respect to edit alignment score, modulo 134,217,727 (2^(27)-1).
"""""

def count_OptimalAlignments(s, t):
    
    # I use the modulo value given in the problem
    MOD = 134217727  
    
    # 1. I initialize DP tables (Dynamic Programming Table) -> I use it to calculate the edit distance between the two strings 
    # dp[i][j]: Minimum edit distance for s[:i] and t[:j]
    n, m = len(s), len(t)
    dp = [[0] * (m + 1) for _ in range(n + 1)]
    
    # I maintain a second table "count" where count[i][j] stores the number of ways to achieve the edit distance at position (i, j).
    count = [[0] * (m + 1) for _ in range(n + 1)]
    
    # 2. Base cases -> I initialize the DP tables "dp" and "count" for scenarios where one of the two strings is empty
    # I loop through all possible prefixes of string s using the variable i, which ranges from 0 to n (inclusive)
    for i in range(n + 1):
        
        # Edit Distance (dp[i][0]): If string t is empty (has length 0), I need to delete all characters in s[:i] to align it with t
        # Thus, the edit distance for this alignment is "i" (the number of characters in s[:i])
        dp[i][0] = i  
        
        # count[i][0] -> There is only one way to delete all characters from s[:i] (by sequentially deleting each character). 
        # Hence, I set count[i][0] = 1
        count[i][0] = 1  
    
    # I loop through all possible prefixes of string t using the variable j, which ranges from 0 to m (inclusive)
    for j in range(m + 1):
        
        # Edit Distance (dp[0][j]): If string s is empty (has length 0), I need to insert all characters in t[:j] to align it with s
        # Thus, the edit distance is j (the number of characters in t[:j])
        dp[0][j] = j  
        
        # count[0][j]: There is only one way to align s with t[:j] -> by sequentially inserting each character from t
        # So, I set count[0][j] = 1
        count[0][j] = 1  
    
    # 3. I fill DP tables:
    # I compute both the minimum edit distance (dp[i][j]) and the number of optimal alignments (count[i][j]) for all positions in the DP table.
    
    # I iterate through each prefix of s (up to length i) and t (up to length j), starting from 1.
    # The goal is to fill the DP table cell "dp[i][j]" and count table cell "count[i][j]".
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            
            # Case 1: Match or Substitution -> I check if the last characters of the prefixes s[:i] and t[:j] are the same:
            # - If s[i-1] == t[j-1], no substitution is needed, and the cost of alignment is 0
            if s[i - 1] == t[j - 1]:
                cost = 0
            
            else: # - Otherwise, the characters differ, and the cost of substitution is 1
                cost = 1
            
            # I compute the minimum edit distance at dp[i][j] by considering the 3 possible operations:
            # Deletion ->  I delete the last character of s[:i] and align the rest of s[:i-1] with t[:j]. The cost is dp[i-1][j] + 1
            dp[i][j] = min(dp[i - 1][j] + 1,         
                           
                           # Insertion -> I insert the last character of t[:j] and align s[:i] with the rest of t[:j-1]. The cost is dp[i][j-1] + 1
                           dp[i][j - 1] + 1,         
                           
                           # Substitution/Match -> I align the last characters of s[:i] and t[:j] with the cost determined earlier (cost), and align the 
                           # rest of s[:i-1] with t[:j-1]. The cost is dp[i-1][j-1] + cost.
                           dp[i - 1][j - 1] + cost)  
            
            # Case 2: I count Optimal Alignments (count[i][j])
            # -> For each valid move, I add the counts from the respective previous cells to the current cell while taking the modulo 134, 217, 727
            
            # I initialize the count for count[i][j] as 0
            count[i][j] = 0
            
            # Then, I add up the ways to achieve the minimum edit distance from each possible previous state:

            # If the deletion operation is part of the optimal path (dp[i][j] == dp[i-1][j] + 1), I add all ways to align s[:i-1] with t[:j], which is count[i-1][j]
            if dp[i][j] == dp[i - 1][j] + 1:  
                count[i][j] += count[i - 1][j]
            
            # If the insertion operation is part of the optimal path (dp[i][j] == dp[i][j-1] + 1), I add all ways to align s[:i] with t[:j-1], which is count[i][j-1]
            if dp[i][j] == dp[i][j - 1] + 1:  
                count[i][j] += count[i][j - 1]
            
            # If the match/substitution operation is part of the optimal path (dp[i][j] == dp[i-1][j-1] + cost), I add all ways to align s[:i-1] with 
            # t[:j-1], which is count[i-1][j-1].
            if dp[i][j] == dp[i - 1][j - 1] + cost:  
                count[i][j] += count[i - 1][j - 1]
            
            # To avoid integer overflow, I take the result modulo 134, 217, 727 at each step.
            count[i][j] %= MOD
    
    # 4.After filling the DP table, the total number of optimal alignments between s and t is stored in count[n][m]. I return this value.
    return count[n][m]


def parse_fasta(filename):
    
    # I read the FASTA file and extract the sequences
    with open(filename, "r") as file:
        
        # I create a dictionary to store the sequences
        sequences = {}  
        
        # I initialize a variable to store the current label
        current_label = None  
        for line in file:
            
            # I remove any whitespace or newline characters
            line = line.strip()  
            if line.startswith(">"):
                
                # If the line starts with ">", it's a new sequence label:
                
                # I store the label (without the ">")
                current_label = line[1:]  
                
                # I initialize an empty sequence for this label
                sequences[current_label] = ""  
            
            else:
                
                # Otherwise, the line contains sequence data:
                
                # I append the data to the current sequence
                sequences[current_label] += line  
    
    # I return the sequences as a list
    return list(sequences.values())  



if __name__ == "__main__":
    
    # I parse the input from the FASTA file
    # -> I read the sequences from the file
    sequences = parse_fasta("./bs-datasets/rosalind_ctea.txt")  
    
    # I assign the first and second sequences to s and t
    s, t = sequences[0], sequences[1]  

    # I compute the number of optimal alignments
    total_alignments = count_OptimalAlignments(s, t)

    # I print the number of optimal alignments modulo 134,217,727 (the result)
    print(total_alignments)  


111301358


In [19]:
# EX 7 - Bioinformatics Stronghold: Global Alignment with Scoring Matrix - glob

"""""
Given: Two protein strings s and t in FASTA format (each of length at most 1000 aa).

Return: The maximum alignment score between s and t. Use:
- The BLOSUM62 scoring matrix.
- Linear gap penalty equal to 5 (i.e., a cost of -5 is assessed for each gap symbol).
"""""

# I import the "product" function from the "itertools" module. 
# "product" is used to generate all pairs of indices between two ranges (in this case, between the lengths of the two sequences s and t)
from itertools import product

# I import the substitution_matrices module from BioPython. 
# It allows you to load predefined scoring matrices for sequence alignment, like BLOSUM62, which is commonly used for protein sequence alignment.
from Bio.Align import substitution_matrices

# I load the BLOSUM62 substitution matrix using the load function. The matrix contains the scores for aligning different amino acids. 
# If two amino acids match, you get a positive score; if they mismatch, you get a negative score.
blosum62 = substitution_matrices.load("BLOSUM62")


# This function takes two strings s and t (representing protein sequences) and returns the maximum alignment score between them using dynamic programming.
def MaxAlignment(s, t):
    
    # I calculate the lengths of the two input strings s and t, storing them in sl and tl respectively.
    sl, tl = len(s), len(t)
    
    # I initialize a dictionary "m" with a single key-value pair, where (0, 0) corresponds to the starting point of the alignment grid
    # The value (0, None) indicates a score of 0 and None as the previous alignment position
    m = {(0, 0): (0, None)}
    
    # I initialize the boundary conditions for the dynamic programming grid:

    # I set the scores for the first column of the grid (aligning s to gaps in t). Each score is calculated as i * -5 (a gap penalty of -5 per insertion).
    m.update({((i, 0), (i * - 5, (i - 1, 0))) for i in range(1, sl + 1)})
    
    # I set the scores for the first row of the grid (aligning t to gaps in s), also applying a gap penalty of -5.
    m.update({((0, i), (i * - 5, (0, i - 1))) for i in range(1, tl + 1)})
    
    # I iterate over all possible pairs of positions (i, j) in the sequences s and t, excluding the first row and first column, which were already initialized.
    for i, j in product(range(1, sl + 1), range(1, tl + 1)):
        
        # I retrieve the substitution score for aligning the amino acids s[i - 1] and t[j - 1] from the BLOSUM62 matrix.
        cost = blosum62.get((s[i - 1], t[j - 1]))
        
        # If the score for the pair (s[i-1], t[j-1]) is not found (i.e., it's None)..
        if cost == None:
            # .. I try to get the score for the reverse pair (t[j-1], s[i-1]), since the matrix might be asymmetric.
            cost = blosum62.get((t[j - 1], s[i - 1]))
        
        # I calculate the possible scores for the 3 potential moves in the dynamic programming grid:
        
        # d -> diagonal move, meaning both s[i-1] and t[j-1] are aligned, so I add the substitution score cost to the value from the diagonal cell (i-1, j-1).
        d = m[(i - 1, j - 1)][0] + cost
        
        # l -> left move, meaning s[i-1] is aligned to a gap in t, so I subtract 5 for the gap penalty.
        l = m[(i - 1, j)][0] - 5
        
        # u -> up move, meaning t[j-1] is aligned to a gap in s, with a gap penalty of 5.
        u = m[(i, j - 1)][0] - 5
        
        # b stores the best score from the 3 possibilities.
        b = max(d, l, u)
        
        # I update the dynamic programming grid "m" by storing the best score "b" and the position of the move that resulted in this score 
        # (the direction from where the value came).
        if d == b:
            m[(i, j)] = (b, (i - 1, j - 1))
        elif l == b:
            m[(i, j)] = (b, (i - 1, j))
        elif u == b:
            m[(i, j)] = (b, (i, j - 1))
            
    # I return the maximum alignment score stored at the bottom-right cell of the grid (i, j), which is the last cell processed
    # The score is converted to an integer to avoid any decimal part.
    return int(m[(i, j)][0])

    
if __name__ == '__main__':
    
    # I initialize an empty list "results" to store the sequences read from the input file and an empty string "protein" to build each protein sequence line by line
    results = []
    protein = ""
    
    # I read the input file line by line
    with open('./bs-datasets/rosalind_glob.txt') as file:
        for line in file:
            line = line.rstrip()

            # If a line starts with ">", it marks the start of a new sequence, and the previous sequence (stored in "protein") is added to the results list.
            if line.startswith(">"):
                if(protein != ""):
                    results.append(protein)
                    protein = ""
            else:
                # Otherwise, the line is part of a sequence and is appended to "protein"
                protein += line
                
    # I append the last sequence (in case it’s not followed by a ">" marker) to the results list.
    results.append(protein)
    
    # I call the function with the first two sequences from the results list, and the resulting maximum alignment score is printed.
    print (MaxAlignment(results[0], results[1]))


2629


In [2]:
from random import randint, choice

# Map Reduce

In the part of the assignment you are requested to use Map Reduce paradigm to solve the following exercises.

**NOTE THAT**: **A solution that does not use map reduce is not valid!**

# Exercise 1

You have a list of dictionaries, each representing a student with the following properties: a name and an array of test scores. Your task is to use map, filter, and reduce to calculate the average test score for each student, and then return a list of dictionaries containing only the students whose average score is above 90.

In [None]:
students = [
    {"name": "Alice", "scores": [95, 92, 88, 100]},
    {"name": "Bob", "scores": [78, 81, 85, 80]},
    {"name": "Charlie", "scores": [99, 91, 94, 96]},
    {"name": "Diana", "scores": [85, 87, 89, 83]}
]

Use `map`, `reduce` and `filter` that produce an output like:

In [None]:
[
    {"name": "Alice", "average_score": 93.75},
    {"name": "Charlie", "average_score": 95.0}
]

[{'name': 'Alice', 'average_score': 93.75},
 {'name': 'Charlie', 'average_score': 95.0}]

### Test
Test your solution using the dataset generated by the following function.

In [None]:
def generate_random_student_dataset(num_students=50):
    names = [f"Student {i}" for i in range(1, num_students + 1)]
    dataset = [
        {
            "name": name,
            "scores": [randint(50, 100) for _ in range(randint(3, 6))]  # Random scores between 50 and 100
        }
        for name in names
    ]
    return dataset

random_student_dataset = generate_random_student_dataset(50)
random_student_dataset[:3]

In [28]:
# your code goes here

# I import the randint function from the random module, which allows me to generate random integers
# I'll use this to create random test scores for each student.
from random import randint

# I import the reduce function from the functools module.
# reduce allows me to perform a rolling computation on a sequence of values (in this case, the test scores) and accumulate them into a single result (the total score)
from functools import reduce

# This function takes one argument (student) which is a dictionary containing the student's name and scores.
# It calculates the average score for each student based on their test scores.
def AverageCalculation(student):
    
    # I use reduce to sum the test scores -> reduce takes a function and a sequence, and applies the function cumulatively to the items in the sequence.
    # In this case, the lambda function lambda x, y: x + y adds the values x and y together. 
    total_score = reduce(lambda x, y: x + y, student['scores'])  
    
    # To calculate the average, I need to know how many scores are in the list, so I can divide the total score by the correct number.
    # so -> I get the number of test scores for the student by calculating the length of the "scores" list.
    scores_number = len(student['scores'])  
    
    # Formula for calculating an average: total score divided by the number of items (scores).
    average_score = total_score / scores_number  
    
    # I round the calculated average to two decimal places using the round() function
    average_score = round(average_score, 2)  
    
    # I return a dictionary containing the student's name and their rounded average score.
    return {'name': student['name'], 'average_score': average_score}

# This function checks if a student has an average score greater than 90. 
# I will use this function later in the filter() function to filter out students who don’t meet the condition of having an average score above 90
def above_90(student):
    
    # I return True if the student's average score is greater than 90, and False otherwise.
    # This is the condition that I’ll use to filter out students. Only those who meet this condition will be included in the final list.
    return student['average_score'] > 90

# This function organizes the process of calculating the average scores and filtering the students. 
def above90_students(students):
    
    # I use the map() function to apply the "AverageCalculation" function to each student in the "students" list. 
    # The result is converted to a list using list(). -> map() returns an iterator, so I need to convert it to a list to work with it later.
    student_avg_scores = list(map(AverageCalculation, students))  
    
    # I use the filter() function to apply the "above_90" function to the list student_avg_scores, returning only students whose average score is above 90.
    stud_above90 = filter(above_90, student_avg_scores)
    
    # Since filter() returns an iterator, I need to convert it to a list so I can return it in the correct format.
    return list(stud_above90)

# This function generates a list of students with random names and random scores.
def generate_random_student_dataset(num_students=50):
    
    # I create a list of student names in the format Student 1, Student 2, ..., up to the specified number of students (num_students)
    names = [f"Student {i}" for i in range(1, num_students + 1)]
    
    # I create a list of dictionaries where each dictionary represents a student with a name and a list of random scores.
    # The number of scores is also random, between 3 and 6.
    dataset = [
        {
            "name": name,
            "scores": [randint(50, 100) for _ in range(randint(3, 6))]  # Punteggi casuali tra 50 e 100
        }
        for name in names
    ]
    
    # I return the generated list of students.
    return dataset

# I generate a random dataset with 50 students.
random_student_dataset = generate_random_student_dataset(50)

# I pass the random student dataset to the above90_students function, which calculates the average scores and filters out the students with average scores > 90.
result = above90_students(random_student_dataset)

# I print the final list of students whose average score is greater than 90.
for element in result:
   print(f"{element} \n")


{'name': 'Student 11', 'average_score': 91.0} 

{'name': 'Student 12', 'average_score': 90.8} 

{'name': 'Student 49', 'average_score': 90.67} 



## Exercise 2

You have a list of dictionaries, each representing a product with the following properties: name, price, and category. Using the functions `map`, `filter`, and `reduce`, calculate the average price of the products in each category and return a list of dictionaries containing only the categories where the average price exceeds 50.

Example input:

In [None]:
products = [
    {"name": "Product A", "price": 60, "category": "Electronics"},
    {"name": "Product B", "price": 40, "category": "Electronics"},
    {"name": "Product C", "price": 70, "category": "Home"},
    {"name": "Product D", "price": 30, "category": "Home"},
    {"name": "Product E", "price": 90, "category": "Sports"}
]

Use `map`, `reduce` and `filter` that produce an output like:

In [None]:
[
    {"category": "Electronics", "average_price": 50.0},
    {"category": "Sports", "average_price": 90.0}
]

[{'category': 'Electronics', 'average_price': 50.0},
 {'category': 'Sports', 'average_price': 90.0}]

### Test
Test your solution using the dataset generated by the following function.

In [4]:
def generate_random_product_dataset(num_products=100):
    categories = ["Electronics", "Home", "Sports", "Books", "Clothing", "Toys"]
    dataset = [
        {
            "name": f"Product {i}",
            "price": randint(10, 200),  # Random price between 10 and 200
            "category": choice(categories),  # Randomly choose a category
        }
        for i in range(1, num_products + 1)
    ]
    return dataset

# Example of using the function
random_dataset = generate_random_product_dataset(100)
random_dataset[:5]  # Display the first 5 entries to check the dataset structure


[{'name': 'Product 1', 'price': 112, 'category': 'Home'},
 {'name': 'Product 2', 'price': 139, 'category': 'Clothing'},
 {'name': 'Product 3', 'price': 168, 'category': 'Electronics'},
 {'name': 'Product 4', 'price': 137, 'category': 'Books'},
 {'name': 'Product 5', 'price': 97, 'category': 'Clothing'}]

In [29]:
# your code goes here
# hints: 1) Group products by category (you don't need to use map reduce for this part), then 2) use map reduce paradigm to
# calculate the average price for each category and filter categories with an average price > 50

# I import randint and choice from the random module to generate random data for product prices and categories.
from random import randint, choice

# I import reduce from the functools module, which will help me calculate the sum of prices in a list of products.
from functools import reduce

# I define a function that takes a list of products (in the same category) as input and calculates the average price for the products in that list.
def AveragePrice(products):
    
    # I use reduce to sum up the price of each product in the list
    # The lambda x, y: x + y['price'] is a function that adds the price from each dictionary (y) in the list to the accumulator (x), starting from 0.
    total_price = reduce(lambda x, y: x + y['price'], products, 0)
    
    # After getting the total price, I calculate the average by dividing the total price by the length of the list (len(products))
    average_price = total_price / len(products)
    
    # I then return a dictionary containing the category (from the first product in the list) and the average_price, rounded to 2 decimal places using "round"
    return {"category": products[0]['category'], "average_price": round(average_price, 2)}

# I define a function that groups products by their category
def GroupbyCategory(products):
    
    # I create an empty dictionary "grouped" to hold the products organized by their categories.
    grouped = {}
    
    # I loop through each product in the list: 
    for product in products:
        
        # I check if the category of the current product is not already in the grouped dictionary:
        if product['category'] not in grouped:
            
            # If the category is not in the dictionary, I add an entry for that category with an empty list.
            grouped[product['category']] = []
        
        # I append the current product to the list corresponding to its category in the grouped dictionary.
        grouped[product['category']].append(product)
    
    # After processing all the products, I return the grouped dictionary, which contains categories as keys and lists of products as values.
    return grouped

# I define a function that filters out categories where the average_price is less than or equal to 50.
def FilterCategories(categories):
    
    # I use the filter function with a lambda function to iterate through each dictionary in the categories list
    # The lambda checks if the average_price of each category is greater than 50: If it is, the category is kept; otherwise, it is discarded.
    # The filter function returns an iterator, so I will later convert it into a list.
    return filter(lambda x: x['average_price'] > 50, categories)

# I define a function, which processes the list of products to filter out categories with average prices greater than 50.
def AvgPrices_above50(products):
    
    # I call GroupbyCategory to group the products by their category and store the result in grouped_products
    grouped_products = GroupbyCategory(products)
    
    # I use map to apply the "AveragePrice" function to each group of products (i.e., each category).
    # This produces a list of dictionaries where each dictionary contains a category and its average price.
    category_avg_prices = list(map(AveragePrice, grouped_products.values()))
    
    # I pass the result of map to "FilterCategories" to filter out categories where the average price is not greater than 50.
    filtered_categories = list(FilterCategories(category_avg_prices))
    
    # I return the filtered list of categories with their average prices above 50.
    return filtered_categories

# I define a function to generate a random list of products.
# The function takes a parameter num_products to determine how many products to generate.
def generate_random_product_dataset(num_products=100):
    
    # I create a list of categories (Electronics, Home, Sports, etc.), and for each product, I randomly assign a price (using randint(10, 200)) and a category (using choice(categories) to pick randomly).
    categories = ["Electronics", "Home", "Sports", "Books", "Clothing", "Toys"]
    
    # I use a list comprehension to create a list of dictionaries, where each dictionary represents a product
    # As said, each product has a random price between 10 and 200, and a random category chosen from the categories list.
    dataset = [
        {
            "name": f"Product {i}",
            "price": randint(10, 200),  # Random price between 10 and 200
            "category": choice(categories),  # Randomly choose a category
        }
        for i in range(1, num_products + 1)
    ]
    
    # I return the generated dataset.
    return dataset

# I call generate_random_product_dataset with the argument 100 to generate a dataset of 100 random products.
# I store the result in random_dataset.
random_dataset = generate_random_product_dataset(100)

# I call the "AvgPrices_above50" function with the random_dataset to process the products: grouping them by category, calculating their average prices, 
# and filtering out categories with average prices greater than 50.
# I store the result in the variable "result".
result = AvgPrices_above50(random_dataset)

# I use a for loop to iterate over each item (category) in the result list.
# For each category, I print it on a new line, which results in each dictionary being displayed separately.
for category in result:
    print(category)


{'category': 'Electronics', 'average_price': 101.68}
{'category': 'Books', 'average_price': 112.7}
{'category': 'Toys', 'average_price': 127.38}
{'category': 'Home', 'average_price': 106.0}
{'category': 'Sports', 'average_price': 90.0}
{'category': 'Clothing', 'average_price': 112.68}


# Exercise 3

You have a list of dictionaries, each representing an employee with the following properties: name, salary, and department. Your task is to use `map`, `filter`, and `reduce` to calculate the average salary for each department and return a list of dictionaries containing only the departments where the average salary is above 65,000.

**Example Input**

In [5]:
employees = [
    {"name": "John", "salary": 70000, "department": "Engineering"},
    {"name": "Jane", "salary": 75000, "department": "Engineering"},
    {"name": "Alice", "salary": 60000, "department": "HR"},
    {"name": "Bob", "salary": 68000, "department": "HR"},
    {"name": "Charlie", "salary": 90000, "department": "Marketing"},
    {"name": "Diana", "salary": 50000, "department": "Marketing"}
]

Use `map`, `reduce` and `filter` that produce an output like:

In [None]:
[
    {"department": "Engineering", "average_salary": 72500.0},
    {"department": "Marketing", "average_salary": 70000.0}
]

[{'department': 'Engineering', 'average_salary': 72500.0},
 {'department': 'Marketing', 'average_salary': 70000.0}]

### Test

Test your solution using the dataset generated by the following function.

In [6]:
def generate_random_employee_dataset(num_employees=50):
    departments = ["Engineering", "HR", "Marketing", "Sales", "Finance", "IT"]
    dataset = [
        {
            "name": f"Employee {i}",
            "salary": randint(40000, 120000),  # Random salary between 40,000 and 120,000
            "department": choice(departments)  # Randomly choose a department
        }
        for i in range(1, num_employees + 1)
    ]
    return dataset

random_employee_dataset = generate_random_employee_dataset(50)

random_employee_dataset[:3]  # Display the first 3 entries of each dataset for checking


[{'name': 'Employee 1', 'salary': 40737, 'department': 'Engineering'},
 {'name': 'Employee 2', 'salary': 77275, 'department': 'Engineering'},
 {'name': 'Employee 3', 'salary': 107774, 'department': 'IT'}]

In [30]:
# your code goes here
# hints: 1) Group employees' salaries by department (you don't need to use map reduce for this part), then 2) use map reduce paradigm to
# calculate the average salary for each department and filter departments with an average salary > threshold

# I import randint to generate random numbers for salaries -> It allows me to define a range (ex: 40,000 to 120,000) and pick numbers randomly within that range.
# I import choice to randomly assign a department to each employee from a predefined list of departments.
from random import randint, choice

# I import reduce from functools. This function helps me repeatedly apply an operation (like summing salaries) across a sequence of data.
# I use it to calculate the total salary for employees in each department.
from functools import reduce

# I define this function to calculate the average salary for a specific department. The input is a list of employees, each represented as a dictionary.
def AverageSalary(employees):
    
    # I use reduce to iterate over the list of employees and sum their salaries:
    # - The lambda function takes two arguments: x (the current total) and y (the current employee dictionary).
    # - For each employee, I extract the salary field and add it to x
    # - The starting value of x is 0.
    total_salary = reduce(lambda x, y: x + y['salary'], employees, 0)
    
    # I divide the total salary by the number of employees in the department to get the average.
    average_salary = total_salary / len(employees)
    
    # I create a dictionary containing the department name (extracted from the first employee in the list) and the average salary.
    # I use "round" to limit the average salary to 2 decimal places for clarity.
    return {"department": employees[0]['department'], "average_salary": round(average_salary, 2)}

# I define this function to group employees by their departments.
# The input is a list of employee dictionaries.
def GROUPbyDEPARTMENT(employees):
    
    # I create an empty dictionary to hold departments names as keys and lists of employees (in that department) as values.
    grouped = {}
    
    # I loop through each employee in the input list.
    for employee in employees:
        
        # If the employee's department doesn’t already exist in the dictionary, ..
        if employee['department'] not in grouped:
            
            # .. -> I add it as a key and initialize its value to an empty list.
            grouped[employee['department']] = []
        
        # I append the current employee to the list corresponding to their department.
        grouped[employee['department']].append(employee)
    
    # I return the dictionary, which groups all employees by their department.
    return grouped

# I define this function to filter out departments with an average salary of 65,000 or less.
def FilterDepartments(departments):
    
    # I use the filter function to keep only the departments where the average salary exceeds 65,000:
    # - I use the filter function to iterate over the list of department dictionaries
    # - For each department, the lambda function checks if the average_salary is greater than 65,000.
    # - The function returns an iterator containing only the departments that pass this condition.
    return filter(lambda x: x['average_salary'] > 65000, departments)

def departments_above_salary_threshold(employees):
    
    # I call GROUPbyDEPARTMENT to organize employees into groups based on their department.
    grouped_employees = GROUPbyDEPARTMENT(employees)
    
    # I use map to apply AverageSalary to each group of employees (values of the grouped dictionary).
    # This generates a list of dictionaries, each containing the department name and its average salary.
    department_avg_salaries = list(map(AverageSalary, grouped_employees.values()))
    
    # I use FilterDepartments to keep only the departments with an average salary above 65,000.
    filtered_departments = list(FilterDepartments(department_avg_salaries))
    
    # I return the filtered list of departments.
    return filtered_departments

# I define this function to generate a random dataset of employees.
def generate_random_employee_dataset(num_employees=50):
    
    # I list the possible departments to which employees can belong.
    departments = ["Engineering", "HR", "Marketing", "Sales", "Finance", "IT"]
    
    # I use a list comprehension to create a list of dictionaries for "num_employees" employees.
    # Each dictionary includes: - A unique name (ex:"Employee 1"); - A randomly generated salary between 40,000 and 120,000 using randint;
    # - A randomly assigned department using choice 
    dataset = [
        {
            "name": f"Employee {i}",
            "salary": randint(40000, 120000),
            "department": choice(departments),
        }
        for i in range(1, num_employees + 1)
    ]
    
    # I return the list of generated employee dictionaries.
    return dataset

# I generate a random dataset of 50 employees.
random_employee_dataset = generate_random_employee_dataset(50)

# I call "departments_above_salary_threshold" to calculate average salaries by department and filter based on the salary threshold.
result = departments_above_salary_threshold(random_employee_dataset)

# I iterate over the filtered list of departments and print each department's name and average salary.
for department in result:
    print(department)


{'department': 'Finance', 'average_salary': 77205.0}
{'department': 'Engineering', 'average_salary': 83699.5}
{'department': 'Sales', 'average_salary': 79646.8}
{'department': 'Marketing', 'average_salary': 75670.0}
{'department': 'IT', 'average_salary': 81591.82}
{'department': 'HR', 'average_salary': 76998.25}


# Biopython

Write the following five functions to analyze global alignments between two sequences using Biopython's `pairwise2` module:

1. **countMatches(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment (pairwise2.globalxx) of the same length. It returns the number of positions where the elements of both sequences match.

2. **countMismatches(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment of the same length. It returns the number of positions where the elements of the two sequences are different (i.e., they are not gaps, and the characters do not match).

3. **countGapOpens(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment of the same length. It returns the number of gap openings in the alignment (a gap is opened when a '-' appears in the sequence).

4. **countGapExtensions(s1, s2)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment of the same length. It returns the number of gap extensions (where '-' continues in the alignment after an initial gap is opened).

5. **getScore(s1, s2, matchScore, mismatchPenalty, gapOpenPenalty, gapExtensionPenalty)**  
   This function takes two sequences (`s1`, `s2`) aligned using global alignment and returns the alignment score based on the provided scoring scheme: `matchScore` for matches, `mismatchPenalty` for mismatches, `gapOpenPenalty` for opening a gap, and `gapExtensionPenalty` for extending a gap.

In [31]:
# Add your functions here

# I import the pairwise2 module from Biopython, which provides tools for pairwise sequence alignments, both local and global.
from Bio import pairwise2

# I import the SeqIO module from the Biopython library -> it helps handling sequence input and output
from Bio import SeqIO

# I define this function to count matching positions between s1 and s2 where neither character is a gap ('-'):
# - Using zip, I pair corresponding characters from the two sequences.
# - I use a generator expression to count cases where the characters are identical (a == b) and not gaps (a != '-')
# - I return the total count of matches.
def countMatches(s1, s2):

    # zip(s1, s2) -> The zip function takes two sequences (in this case, s1 and s2, which are strings) and pairs up the corresponding elements from each sequence into tuples
    # So, for each position in both strings, zip combines the characters at that position into a tuple.

    # for a, b in zip(s1, s2) -> This part is a generator expression that loops over each tuple produced by zip; 
    # It unpacks each tuple into the variables a and b, where a is the character from s1 and b is the character from s2

    # if a == b and a != '-' -> this condition checks 2 things: a == b -> This checks if the characters from s1 and s2 are the same at the current position.
    # a != '-' -> This ensures that the character "a" is not a gap (-); This is important because we don't want to count matches where one or both sequences have gaps.
    
    # If both conditions (a == b and a != '-') are met, the expression returns 1 for that pair of characters.
    # So this counts how many times the characters in s1 and s2 are the same and neither of them is a gap.

    # sum(...) -> The sum function takes the generator expression (the sequence of 1s) and adds them together.
    # so it gives the total count of matches between s1 and s2, excluding gaps.
    return sum(1 for a, b in zip(s1, s2) if a == b and a != '-')


# I use this function to count positions where characters differ (excluding gap characters)
# This function returns the mismatch count
def countMismatches(s1, s2):
    
    # I iterate over paired characters and count cases where they differ (a != b) but are not gaps (a != '-' and b != '-')
    return sum(1 for a, b in zip(s1, s2) if a != b and a != '-' and b != '-')


# I use this function to count the number of new gaps introduced in the alignment.
def countGapOpens(s1, s2):
    gap_opens = 0
    
    # For each position i ..
    for i in range(len(s1)):
        
        # .. I check if there is a gap in either sequence (s1[i] == '-' or s2[i] == '-').
        # A gap is counted as an "opening" if it is at the start of the sequence (i == 0) or follows a non-gap position (s1[i - 1] != '-' and s2[i - 1] != '-').
        if (s1[i] == '-' or s2[i] == '-') and (i == 0 or s1[i - 1] != '-' and s2[i - 1] != '-'):
            
            # I increment gap_opens for each new gap opening
            gap_opens += 1
    
    # I return the total
    return gap_opens


# I use this function to count positions where a gap continues beyond its opening.
def countGapExtensions(s1, s2):
    
    gap_extensions = 0
    
    # Starting from position i = 1, ..
    for i in range(1, len(s1)):
        
        # .. I check if there is a gap (s1[i] == '-' or s2[i] == '-'
        # and If the previous position also contains a gap (s1[i - 1] == '-' or s2[i - 1] == '-')..
        if (s1[i] == '-' or s2[i] == '-') and (s1[i - 1] == '-' or s2[i - 1] == '-'):
            
            # ..  it is counted as an extension.
            gap_extensions += 1
    
    # The function returns the total number of gap extensions.
    return gap_extensions


# In this function, I'm bringing all the previous functions together: I calculate matches, mismatches, gap opens, and gap extensions -> in order to compute a score for a global alignment between the two sequences (s1 and s2)
# Each of these factors is weighted by predefined scoring parameters
# The function takes 6 arguments: 
# 1/2)s1, s2: The two aligned sequences for comparison; 3)matchScore: The score to be added for each match between s1 and s2; 
# 4)mismatchPenalty: The penalty to subtract for each mismatch; 5)gapOpenPenalty: The penalty to subtract for opening a gap (= when a gap is introduced into the sequence alignment).
# 6)gapExtensionPenalty: The penalty to subtract for extending an existing gap (i.e., when a gap continues in the alignment).
def getScore(s1, s2, matchScore, mismatchPenalty, gapOpenPenalty, gapExtensionPenalty):
    
    # I call the previously defined functions to calculate the relevant quantities:

    # This returns the number of positions where s1 and s2 match, excluding gaps.
    matches = countMatches(s1, s2)
    
    # This returns the number of positions where s1 and s2 are mismatched (both characters are not equal and not gaps)
    mismatches = countMismatches(s1, s2)
    
    # This returns the number of times a gap is opened in the alignment.
    gap_opens = countGapOpens(s1, s2)
    
    # This returns the number of times a gap is extended (i.e., it continues after the first gap is opened).
    gap_extensions = countGapExtensions(s1, s2)
    
    # I calculate the score by multiplying the number of matches by the matchScore, the number of mismatches by the mismatchPenalty, 
    # the number of gap openings by the gapOpenPenalty, and the number of gap extensions by the gapExtensionPenalty
    score = (

        # The formula then adds or subtracts these weighted values: - Positive for matches (because a match increases the score);
        # - Negative for mismatches, gap openings, and gap extensions (because these reduce the alignment quality).
        matches * matchScore -
        mismatches * mismatchPenalty -
        gap_opens * gapOpenPenalty -
        gap_extensions * gapExtensionPenalty
    )
    
    # I return the computed score.
    return score


# TEST
def main():
    
    # SeqIO.read reads the fasta file and returns an object SeqRecord
    # The sequence is accessible via .seq
    s1_record = SeqIO.read("./biopython-datasets/IL12A.fasta", "fasta")
    s2_record = SeqIO.read("./biopython-datasets/IL12B.fasta", "fasta")
    
    # To work with the sequence, I convert Seq into a string with str()
    s1 = str(s1_record.seq)
    s2 = str(s2_record.seq)
    
    # I define scoring parameters: - matchScore: Reward for matching characters; mismatchPenalty: Penalty for mismatched characters; 
    # gapOpenPenalty: Penalty for introducing a new gap; gapExtensionPenalty: Penalty for continuing a gap
    matchScore = 1
    mismatchPenalty = 0
    gapOpenPenalty = 0
    gapExtensionPenalty = 0
    
    # I use pairwise2.align.globalms to perform a global alignment of s1 and s2
    alignments = pairwise2.align.globalms(
        s1, s2, 
        
        # I pass the scoring parameters, converting penalties into negative values.
        # This generates a list of possible alignments.
        matchScore, -mismatchPenalty, -gapOpenPenalty, -gapExtensionPenalty
    )
    
    # I select the first alignment from the list (alignments[0]) and unpack the relevant details:
    # - aligned_s1 and aligned_s2: Aligned versions of the sequences.
    # - pairwise2_score: The alignment score calculated by pairwise2.
    aligned_s1, aligned_s2, pairwise2_score, _, _ = alignments[0]
    
    # I calculate the alignment score using the getScore function
    calculated_score = getScore(
        aligned_s1, aligned_s2, 
        matchScore, mismatchPenalty, gapOpenPenalty, gapExtensionPenalty
    )
    
    # I use :.1f in an f-string in order to format the number as a floating-point number with one decimal place. 
    # This means that even if the number is an integer, it will be displayed with one decimal, ensuring uniformity in the output format.
    print(f"Calculated Score: {calculated_score:.1f}")
    print(f"Pairwise2 Score: {pairwise2_score:.1f}")

# I call the main function to execute the program.
main()



Calculated Score: 103.0
Pairwise2 Score: 103.0


### Test
Align the sequences of the [Interleukin-12](https://en.wikipedia.org/wiki/Interleukin_12) chain A (denoted as `s1`) from the file [`IL12A.fasta`](https://qcbsciprolab2020.readthedocs.io/en/latest/file_samples/IL12A.fasta) and the Interleukin-12 chain B (denoted as `s2`) from the file [`IL12B.fasta`](https://qcbsciprolab2020.readthedocs.io/en/latest/file_samples/IL12B.fasta) and check the score as computed from pairwise2 and from your functions.

In [None]:
# add the output of the test here

"""""

Calculated Score: 103.0
Pairwise2 Score: 103.0

"""""