In [108]:
import pysam
import pandas as pd
from Bio.Seq import Seq
import Levenshtein as lev

def load_primers(primer_file):
    """Load and prepare primers dataframe"""
    primers_df = pd.read_csv(primer_file, sep="\t")
    primers_df = primers_df.dropna(subset=['Forward', 'Reverse'])
    longest_primer_length = max(
        primers_df['Forward'].apply(len).max(), 
        primers_df['Reverse'].apply(len).max()
    )
    return primers_df, longest_primer_length

def is_match(seq1, seq2, max_distance=2):
    """
    Check for approx match using Levenshtein distance.
    seq1: longer sequence to search in
    seq2: primer sequence to find
    max_distance: maximum allowed edit distance
    """
    # Handle potential None or empty sequences
    if not seq1 or not seq2:
        return False
    
    try:
        # Slide the primer (seq2) across the sequence (seq1)
        for i in range(len(seq1) - len(seq2) + 1):
            window = seq1[i:i+len(seq2)]
            if len(window) == len(seq2):  # Ensure we have a full window
                distance = lev.distance(str(window), str(seq2))
                if distance <= max_distance:
                    return True
    except:
        return False
    return False

def find_primers_in_region(sequence, primers_df, window_size=100, max_distance=2):
    """Find primers in a given sequence region"""
    primers_found = []
    
    for _, primer in primers_df.iterrows():
        forward_primer = primer['Forward']
        reverse_primer = primer['Reverse']
        reverse_complement_forward = str(Seq(forward_primer).reverse_complement())
        reverse_complement_reverse = str(Seq(reverse_primer).reverse_complement())
        
        # Check each primer against the entire sequence
        if is_match(sequence, forward_primer, max_distance):
            primers_found.append(f"{primer['Name']}_Forward")
            
        if is_match(sequence, reverse_primer, max_distance):
            primers_found.append(f"{primer['Name']}_Reverse")
            
        if is_match(sequence, reverse_complement_forward, max_distance):
            primers_found.append(f"{primer['Name']}_ForwardComp")
            
        if is_match(sequence, reverse_complement_reverse, max_distance):
            primers_found.append(f"{primer['Name']}_ReverseComp")
    
    return list(set(primers_found))  # Remove duplicates
    
def bam_to_fasta(bam_path, primer_file, unaligned_only=False, max_reads=200):
    """Process BAM file and find primers in reads"""
    # Load primers
    primers_df, longest_primer_length = load_primers(primer_file)
    
    # Open BAM file
    try:
        bam_file = pysam.AlignmentFile(bam_path, "rb")
    except Exception as e:
        print(f"Error opening BAM file: {e}")
        return pd.DataFrame()

    data = []
    search_window = 100 + longest_primer_length  # Increased search window size
    
    reads_processed = 0
    
    for read in bam_file.fetch(until_eof=True):
        if unaligned_only and not read.is_unmapped:
            continue
        if read.query_sequence is None:
            continue

        reads_processed += 1
        
        if max_reads > 0:
            if reads_processed > max_reads:
                break
        
        read_sequence = read.query_sequence
        read_length = len(read_sequence)
        
        # print(f"\nProcessing read {reads_processed}:")
        # print(f"Read name: {read.query_name}")
        # print(f"Read length: {read_length}")
        
        # Define search regions with bounds checking
        start_region = read_sequence[:min(search_window, read_length)]
        end_region = read_sequence[max(0, read_length - search_window):]
        
        # Find primers in both regions
        start_primers_found = find_primers_in_region(start_region, primers_df, window_size=search_window, max_distance=2)
        end_primers_found = find_primers_in_region(end_region, primers_df, window_size=search_window, max_distance=2)
        
        data.append({
            'Read_Name': read.query_name,
            'Start_Primers': ', '.join(start_primers_found) if start_primers_found else 'None',
            'End_Primers': ', '.join(end_primers_found) if end_primers_found else 'None',
            'Read_Length': read_length
        })
    
    bam_file.close()
    
    result_df = pd.DataFrame(data)
    return result_df


def split_primer_results(result_df):
    """
    Split results into three dataframes:
    1. single_primer_df: Exactly one primer at each end
    2. multiple_primer_df: More than one primer at either end
    3. no_primer_df: No primers at either end
    """
    
    def count_primers(row):
        """Helper function to count primers in Start_Primers and End_Primers"""
        start_count = 0 if row['Start_Primers'] == 'None' else row['Start_Primers'].count(',') + 1
        end_count = 0 if row['End_Primers'] == 'None' else row['End_Primers'].count(',') + 1
        return start_count, end_count

    # Create mask for each category
    single_primer_mask = result_df.apply(
        lambda row: count_primers(row) == (1, 1), 
        axis=1
    )
    
    multiple_primer_mask = result_df.apply(
        lambda row: any(count > 1 for count in count_primers(row)), 
        axis=1
    )
    
    no_primer_mask = result_df.apply(
        lambda row: count_primers(row) == (0, 0), 
        axis=1
    )
    
    # Split into three dataframes
    single_primer_df = result_df[single_primer_mask].copy()
    multiple_primer_df = result_df[multiple_primer_mask].copy()
    no_primer_df = result_df[no_primer_mask].copy()
    
    # Add summary information
    print("\nSummary of split results:")
    print(f"Reads with single primer at each end: {len(single_primer_df)}")
    print(f"Reads with multiple primers at either end: {len(multiple_primer_df)}")
    print(f"Reads with no primers: {len(no_primer_df)}")
    
    print("\nSingle primer reads:")
    print(single_primer_df)
    
    print("\nMultiple primer reads:")
    print(multiple_primer_df)
    
    print("\nNo primer reads:")
    print(no_primer_df)
    
    return single_primer_df, multiple_primer_df, no_primer_df
    
def analyze_primer_pairs(single_primer_df, primers_df):
    """
    Analyze single primer matches for expected F/R pairs and size compliance
    """
    # Helper function to extract primer name without orientation
    def get_base_primer_name(primer_str):
        if primer_str == 'None':
            return None
        # Split from the right side to preserve any underscores in the primer name
        parts = primer_str.rsplit('_', 1)
        return parts[0]  # Return everything except the last part (orientation)
    
    # Helper function to get primer orientation
    def get_primer_orientation(primer_str):
        if primer_str == 'None':
            return None
        return primer_str.rsplit('_', 1)[1]  # Get the last part (orientation)
    
    # Rest of the function remains the same
    single_primer_df['Start_Primer_Name'] = single_primer_df['Start_Primers'].apply(get_base_primer_name)
    single_primer_df['End_Primer_Name'] = single_primer_df['End_Primers'].apply(get_base_primer_name)
    single_primer_df['Start_Orientation'] = single_primer_df['Start_Primers'].apply(get_primer_orientation)
    single_primer_df['End_Orientation'] = single_primer_df['End_Primers'].apply(get_primer_orientation)
    
    # Find matching pairs (same primer name at both ends)
    matching_pairs_df = single_primer_df[
        single_primer_df['Start_Primer_Name'] == single_primer_df['End_Primer_Name']
    ].copy()
    
    # Check correct orientation (one Forward, one Reverse)
    correct_orientation_df = matching_pairs_df[
        ((matching_pairs_df['Start_Orientation'].str.contains('Forward') & 
          matching_pairs_df['End_Orientation'].str.contains('Reverse')) |
         (matching_pairs_df['Start_Orientation'].str.contains('Reverse') & 
          matching_pairs_df['End_Orientation'].str.contains('Forward')))
    ].copy()
    
    # Add expected size information from primers_df
    primer_sizes = primers_df.set_index('Name')['Size'].to_dict()
    correct_orientation_df['Expected_Size'] = correct_orientation_df['Start_Primer_Name'].map(primer_sizes)
    
    # Calculate size compliance (within 10% of expected)
    def is_size_compliant(row):
        expected = row['Expected_Size']
        actual = row['Read_Length']
        if pd.isna(expected):
            return False
        tolerance = expected * 0.10  # 10% tolerance
        return abs(actual - expected) <= tolerance
    
    correct_orientation_df['Size_Compliant'] = correct_orientation_df.apply(is_size_compliant, axis=1)
    
    return correct_orientation_df



def create_analysis_summary(result_df, primers_df):
    """
    Create a comprehensive summary of primer analysis results including mismatches and length statistics
    """
    total_reads = len(result_df)
    
    # Helper functions for primer analysis with fixed underscore handling
    def get_base_primer_name(primer_str):
        if primer_str == 'None':
            return None
        return primer_str.rsplit('_', 1)[0]
    
    def get_primer_orientation(primer_str):
        if primer_str == 'None':
            return None
        return primer_str.rsplit('_', 1)[1]
    
    result_df['Start_Primer_Name'] = result_df['Start_Primers'].apply(get_base_primer_name)
    result_df['End_Primer_Name'] = result_df['End_Primers'].apply(get_base_primer_name)
    result_df['Start_Orientation'] = result_df['Start_Primers'].apply(get_primer_orientation)
    result_df['End_Orientation'] = result_df['End_Primers'].apply(get_primer_orientation)
    
    # Identify different categories
    no_primers = result_df[
        (result_df['Start_Primers'] == 'None') & 
        (result_df['End_Primers'] == 'None')
    ]
    
    single_end_only = result_df[
        ((result_df['Start_Primers'] != 'None') & (result_df['End_Primers'] == 'None')) |
        ((result_df['Start_Primers'] == 'None') & (result_df['End_Primers'] != 'None'))
    ]
    
    both_ends = result_df[
        (result_df['Start_Primers'] != 'None') & 
        (result_df['End_Primers'] != 'None')
    ]
    
    # Analyze matched pairs
    matched_pairs = both_ends[
        both_ends['Start_Primer_Name'] == both_ends['End_Primer_Name']
    ]
    
    mismatched_pairs = both_ends[
        both_ends['Start_Primer_Name'] != both_ends['End_Primer_Name']
    ]
    
    # Check orientation and length compliance
    primer_sizes = primers_df.set_index('Name')['Size'].to_dict()
    
    def is_correct_orientation(row):
        return (
            (row['Start_Orientation'].startswith('Forward') and 
             row['End_Orientation'].startswith('Reverse')) or
            (row['Start_Orientation'].startswith('Reverse') and 
             row['End_Orientation'].startswith('Forward'))
        )
    
    def is_size_compliant(row):
        expected = primer_sizes.get(row['Start_Primer_Name'])
        if pd.isna(expected):
            return False
        tolerance = expected * 0.10
        return abs(row['Read_Length'] - expected) <= tolerance
    
    matched_pairs['Correct_Orientation'] = matched_pairs.apply(is_correct_orientation, axis=1)
    matched_pairs['Size_Compliant'] = matched_pairs.apply(is_size_compliant, axis=1)
    
    # Create summary DataFrame
    summary_data = [
        {
            'Category': 'No primers detected',
            'Count': len(no_primers),
            'Percentage': (len(no_primers) / total_reads) * 100
        },
        {
            'Category': 'Single-end primers only',
            'Count': len(single_end_only),
            'Percentage': (len(single_end_only) / total_reads) * 100
        },
        {
            'Category': 'Mismatched primer pairs',
            'Count': len(mismatched_pairs),
            'Percentage': (len(mismatched_pairs) / total_reads) * 100
        },
        {
            'Category': 'Matched pairs - incorrect orientation',
            'Count': len(matched_pairs[~matched_pairs['Correct_Orientation']]),
            'Percentage': (len(matched_pairs[~matched_pairs['Correct_Orientation']]) / total_reads) * 100
        },
        {
            'Category': 'Matched pairs - correct orientation, wrong size',
            'Count': len(matched_pairs[
                matched_pairs['Correct_Orientation'] & 
                ~matched_pairs['Size_Compliant']
            ]),
            'Percentage': (len(matched_pairs[
                matched_pairs['Correct_Orientation'] & 
                ~matched_pairs['Size_Compliant']
            ]) / total_reads) * 100
        },
        {
            'Category': 'Matched pairs - correct orientation and size',
            'Count': len(matched_pairs[
                matched_pairs['Correct_Orientation'] & 
                matched_pairs['Size_Compliant']
            ]),
            'Percentage': (len(matched_pairs[
                matched_pairs['Correct_Orientation'] & 
                matched_pairs['Size_Compliant']
            ]) / total_reads) * 100
        }
    ]
    
    summary_df = pd.DataFrame(summary_data)
    summary_df['Percentage'] = summary_df['Percentage'].round(2)
    
    return summary_df, matched_pairs, mismatched_pairs


In [110]:
def main():
    try:
        result_df = bam_to_fasta(
            bam_path="test.bam",
            primer_file="primers.tsv",
            unaligned_only=False,
            max_reads=0
        )
        
        # Load primers for size information
        primers_df, _ = load_primers("primers.tsv")
        
        # Generate analysis summary
        summary_df, matched_pairs, mismatched_pairs = create_analysis_summary(result_df, primers_df)
        
        # Print results
        print("\nPrimer Analysis Summary:")
        print("=" * 80)
        print(summary_df.to_string(index=False))
        
        # Save detailed results
        matched_pairs.to_csv('matched_primer_pairs.csv', index=False)
        mismatched_pairs.to_csv('mismatched_primer_pairs.csv', index=False)
        # Save pairs with correct orientation but wrong size
        wrong_size_pairs = matched_pairs[
            matched_pairs['Correct_Orientation'] & 
            ~matched_pairs['Size_Compliant']
        ]
        wrong_size_pairs.to_csv('wrong_size_pairs.csv', index=False)
        
    except Exception as e:
        print(f"Error in main execution: {e}")

if __name__ == "__main__":
    main()


Primer Analysis Summary:
                                       Category  Count  Percentage
                            No primers detected  13944       13.63
                        Single-end primers only  51519       50.37
                        Mismatched primer pairs  17072       16.69
          Matched pairs - incorrect orientation   2479        2.42
Matched pairs - correct orientation, wrong size  17244       16.86
   Matched pairs - correct orientation and size     26        0.03


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  matched_pairs['Correct_Orientation'] = matched_pairs.apply(is_correct_orientation, axis=1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  matched_pairs['Size_Compliant'] = matched_pairs.apply(is_size_compliant, axis=1)
