In [None]:

query = '/mnt/869990e7-a61f-469f-99fe-a48d24ac44ca/git/ebi/query.fa'
target = '/mnt/869990e7-a61f-469f-99fe-a48d24ac44ca/git/ebi/gencode.v46.transcripts.200bp.fa'

import sys
import os.path as op
import ctypes as ct

class CAlignRes(ct.Structure):
    _fields_ = [('nScore', ct.c_uint16), 
                ('nScore2', ct.c_uint16), 
                ('nRefBeg', ct.c_int32), 
                ('nRefEnd', ct.c_int32), 
                ('nQryBeg', ct.c_int32), 
                ('nQryEnd', ct.c_int32), 
                ('nRefEnd2', ct.c_int32), 
                ('sCigar', ct.POINTER(ct.c_uint32)), 
                ('nCigarLen', ct.c_int32)] 

class CProfile(ct.Structure):
    _fields_ = [('pByte', ct.POINTER(ct.c_int32)),
                ('pWord', ct.POINTER(ct.c_int32)),
                ('pRead', ct.POINTER(ct.c_int8)),
                ('pMat', ct.POINTER(ct.c_int8)),
                ('nReadLen', ct.c_int32),
                ('nN', ct.c_int32),
                ('nBias', ct.c_uint8)]

class CSsw(object):
    def __init__(self, sLibPath):

        sLibName = 'libssw.so'
        if sLibPath:
            if not op.exists(op.join(sLibPath, sLibName)):
                sys.stderr.write('libssw.so does not exist in the input path.')
                sys.exit()
            self.ssw = ct.cdll.LoadLibrary(op.join(sLibPath,sLibName))
        else:
            bFound = False
            for s in sys.path:
                if op.exists(op.join(s,sLibName)):
                    bFound = True
                    self.ssw = ct.cdll.LoadLibrary(op.join(s,sLibName))
            if bFound == False:
                sys.stderr.write('libssw.so does not exist in PATH')
                sys.exit()

        self.ssw_init = self.ssw.ssw_init
        self.ssw_init.argtypes = [ct.POINTER(ct.c_int8), ct.c_int32, ct.POINTER(ct.c_int8), ct.c_int32, ct.c_int8]
        self.ssw_init.restype = ct.POINTER(CProfile)
        self.init_destroy = self.ssw.init_destroy
        self.init_destroy.argtypes = [ct.POINTER(CProfile)]
        self.init_destroy.restype = None
        self.ssw_align = self.ssw.ssw_align
        self.ssw_align.argtypes = [ct.c_void_p, ct.POINTER(ct.c_int8), ct.c_int32, ct.c_uint8, ct.c_uint8, ct.c_uint8, ct.c_uint16, ct.c_int32, ct.c_int32]
        self.ssw_align.restype = ct.POINTER(CAlignRes)
        self.align_destroy = self.ssw.align_destroy
        self.align_destroy.argtypes = [ct.POINTER(CAlignRes)]
        self.align_destroy.restype = None

def read_matrix(sFile):
    with open(args.sMatrix, 'r') as f:
        for l in f:
            if not l.startswith('#'):
                break
        lEle = l.strip().split()
        dEle2Int = {}
        dInt2Ele = {}
        for i,ele in enumerate(lEle):
            dEle2Int[ele] = i
            dEle2Int[ele.lower()] = i
            dInt2Ele[i] = ele
        nEleNum = len(lEle)
        lScore = []
        for l in f:
            lScore.extend([int(x) for x in l.strip().split()[1:]])

        return lEle, dEle2Int, dInt2Ele, lScore



import sys
import os.path as op
import argparse as ap
import ctypes as ct
import timeit as ti
import gzip
import math
import ssw_lib


def read(sFile):
    """
    read a sequence file
    @param  sFile   sequence file
    """
    def read_one_fasta(f):
        """
        read a fasta file
        @param  f   file handler
        """
        sId = ''
        sSeq = ''
        for l in f:
            if l.startswith('>'):
                if sSeq:
                    yield sId, sSeq, ''
                sId = l.strip()[1:].split()[0]
                sSeq = ''
            else:
                sSeq += l.strip()

        yield sId, sSeq, ''

    bFasta = True
    with open(sFile, 'r') as f:
        l = f.readline()
        if l.startswith('>'):
            bFasta = True
        elif l.startswith('@'):
            bFasta = False
        else:
            sys.stderr.write('file format cannot be recognized\n')
            sys.exit()

    with open(sFile, 'r') as f:
        if bFasta == True:
            for sId,sSeq,sQual in read_one_fasta(f):
                yield sId, sSeq, sQual


def to_int(seq, lEle, dEle2Int):
    """
    translate a sequence into numbers
    @param  seq   a sequence
    """
    num_decl = len(seq) * ct.c_int8
    num = num_decl()
    for i,ele in enumerate(seq):
        try:
            n = dEle2Int[ele]
        except KeyError:
            n = dEle2Int[lEle[-1]]
        finally:
            num[i] = n

    return num


def align_one(ssw, qProfile, rNum, nRLen, nOpen, nExt, nFlag, nMaskLen):
    """
    align one pair of sequences
    @param  qProfile   query profile
    @param  rNum   number array for reference
    @param  nRLen   length of reference sequence
    @param  nFlag   alignment flag
    @param  nMaskLen   mask length
    """
    res = ssw.ssw_align(qProfile, rNum, ct.c_int32(nRLen), nOpen, nExt, nFlag, 0, 0, nMaskLen)

    nScore = res.contents.nScore
    nScore2 = res.contents.nScore2
    nRefBeg = res.contents.nRefBeg
    nRefEnd = res.contents.nRefEnd
    nQryBeg = res.contents.nQryBeg
    nQryEnd = res.contents.nQryEnd
    nRefEnd2 = res.contents.nRefEnd2
    lCigar = [res.contents.sCigar[idx] for idx in range(res.contents.nCigarLen)]
    nCigarLen = res.contents.nCigarLen
    ssw.align_destroy(res)

    return (nScore, nScore2, nRefBeg, nRefEnd, nQryBeg, nQryEnd, nRefEnd2, nCigarLen, lCigar)


def buildPath(q, r, nQryBeg, nRefBeg, lCigar):
    """
    build cigar string and align path based on cigar array returned by ssw_align
    @param  q   query sequence
    @param  r   reference sequence
    @param  nQryBeg   begin position of query sequence
    @param  nRefBeg   begin position of reference sequence
    @param  lCigar   cigar array
    """
    sCigarInfo = 'MIDNSHP=X'
    sCigar = ''
    sQ = ''
    sA = ''
    sR = ''
    nQOff = nQryBeg
    nROff = nRefBeg
    for x in lCigar:
        n = x >> 4
        m = x & 15
        if m > 8:
            c = 'M'
        else:
            c = sCigarInfo[m]
        sCigar += str(n) + c

        if c == 'M':
            sQ += q[nQOff : nQOff+n]
            sA += ''.join(['|' if q[nQOff+j] == r[nROff+j] else '*' for j in range(n)])
            sR += r[nROff : nROff+n]
            nQOff += n
            nROff += n
        elif c == 'I':
            sQ += q[nQOff : nQOff+n]
            sA += ' ' * n
            sR += '-' * n
            nQOff += n
        elif c == 'D':
            sQ += '-' * n
            sA += ' ' * n
            sR += r[nROff : nROff+n]
            nROff += n
    return sCigar, sQ, sA, sR

def main():
    nMatch = 2
    nMismatch = 2

    nOpen = 3
    nExt = 1

    lEle = ['A', 'C', 'G', 'T', 'N']
    dRc = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A', 'a': 'T', 'c': 'G', 'g': 'C', 't': 'A'}
    dEle2Int = {}
    dInt2Ele = {}

    for i, ele in enumerate(lEle):
        dEle2Int[ele] = i
        dEle2Int[ele.lower()] = i
        dInt2Ele[i] = ele

    nEleNum = len(lEle)
    lScore = [0 for _ in range(nEleNum ** 2)]
    for i in range(nEleNum - 1):
        for j in range(nEleNum - 1):
            if lEle[i] == lEle[j]:
                lScore[i * nEleNum + j] = nMatch
            else:
                lScore[i * nEleNum + j] = -nMismatch

    mat = (len(lScore) * ct.c_int8)()
    mat[:] = lScore

    nFlag = 0
    ssw = ssw_lib.CSsw('./')

    for sQId, sQSeq, sQQual in read(query):
        qNum = to_int(sQSeq, lEle, dEle2Int)
        qProfile = ssw.ssw_init(qNum, ct.c_int32(len(sQSeq)), mat, len(lEle), 2)
        nMaskLen = len(sQSeq) // 2

        best_score = -float('inf')
        best_alignment = None
        ref = None

        for sRId, sRSeq, _ in read(target):
            rNum = to_int(sRSeq, lEle, dEle2Int)
            res = align_one(ssw, qProfile, rNum, len(sRSeq), nOpen, nExt, nFlag, nMaskLen)

            if res[0] > best_score:
                ref = sRId
                best_score = res[0]
                best_alignment = res

        if best_alignment is not None:
            print(f"Query ID: {sQId}")
            print(f"Ref ID: {ref}")
            print(f"Best Score: {best_score}")
            print(f"Best : {res}")

main()

In [14]:
from Bio import SeqIO
counter = 0

input_fasta = "/mnt/869990e7-a61f-469f-99fe-a48d24ac44ca/git/ebi/gencode.v46.transcripts.fa"
output_fasta = "/mnt/869990e7-a61f-469f-99fe-a48d24ac44ca/git/ebi/gencode.v46.transcripts.short.fa"
# def extract_last_200_bp(input_file, output_file):
with open(output_fasta, "w") as out_handle:
    for record in SeqIO.parse(input_fasta, "fasta"):
        if len(record.seq) > 15:
            counter = counter + 1
            # Extract the last 200 bp of the sequence
            last_200_bp = record.seq[-100:]
            # Create a new record with the truncated sequence
            new_record = record[:]
            new_record.seq = last_200_bp
            # Write the new record to the output file
            SeqIO.write(new_record, out_handle, "fasta")

# Replace these paths with your actual file paths


# extract_last_200_bp(input_fasta, output_fasta)

print(counter)

254065


In [13]:
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
import random




def extract_last_bases(seq, length=20):
    """Extract the last 'length' bases from a sequence."""
    return seq[-length:]

def process_fasta(input_fasta, output_fasta, min_length=15, num_sequences=512, last_bases_length=50):
    # Read all records from the input FASTA file
    records = [record for record in SeqIO.parse(input_fasta, "fasta") if len(record.seq) > min_length]
    
    # Check if there are enough records to sample
    if len(records) < num_sequences:
        raise ValueError(f"Not enough sequences longer than {min_length} bp in {input_fasta} to sample {num_sequences} sequences.")
    
    # Randomly select the specified number of sequences
    random.seed(123)
    selected_records = random.sample(records, num_sequences)
    
    # Extract the last 'last_bases_length' bases from each selected sequence
    last_bases_records = []
    for record in selected_records:
        last_bases_seq = extract_last_bases(record.seq, last_bases_length)
        last_bases_record = SeqRecord(last_bases_seq, id=record.id, description=record.description)
        last_bases_records.append(last_bases_record)
    
    # Write the extracted sequences to the output FASTA file
    with open(output_fasta, "w") as out_handle:
        SeqIO.write(last_bases_records, out_handle, "fasta")

# Example usage
input_fasta = "/mnt/869990e7-a61f-469f-99fe-a48d24ac44ca/git/ebi/gencode.v46.transcripts.fa"
output_fasta = "/mnt/869990e7-a61f-469f-99fe-a48d24ac44ca/git/ebi/gencode.v46.transcripts.50bp.fa"
process_fasta(input_fasta, output_fasta)
