Connected to rework (Python 3.10.14)

In [None]:
from itertools import islice
from ma_mapper import extract_maf
import importlib
importlib.reload(extract_maf)
from Bio.AlignIO import MafIO
try:
    from sqlite3 import dbapi2
except ImportError:
    dbapi2 = None
MAFINDEX_VERSION = 2
import time

In [None]:
import subprocess
gztool_path = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/dev/installation/gztool/gztool'

In [None]:
import os
class gzMafIndex(MafIO.MafIndex):
    def _MafIndex__check_existing_db(self):
        try:
            idx_version = int(
                self._con.execute(
                    "SELECT value FROM meta_data WHERE key = 'version'"
                ).fetchone()[0]
            )
            if idx_version != MAFINDEX_VERSION:
                msg = "\n".join(
                    [
                        "Index version (%s) incompatible with this version "
                        "of MafIndex" % idx_version,
                        "You might erase the existing index %s "
                        "for it to be rebuilt." % self._index_filename,
                    ]
                )
                raise ValueError(msg)

            filename = self._con.execute(
                "SELECT value FROM meta_data WHERE key = 'filename'"
            ).fetchone()[0]
            if os.path.isabs(filename):
                tmp_mafpath = filename
            else:
                tmp_mafpath = os.path.join(
                    self._relative_path, filename.replace("/", os.path.sep)
                )
            #amend .gz filetype
            if os.path.splitext(self._maf_file)[1] == ".gz":
                normalized_path = os.path.splitext(self._maf_file)[0]
            else:
                normalized_path = self._maf_file
            if tmp_mafpath != os.path.abspath(normalized_path):
                raise ValueError(
                    f"Index uses a different file ({filename} != {normalized_path})"
                )

            db_target = self._con.execute(
                "SELECT value FROM meta_data WHERE key = 'target_seqname'"
            ).fetchone()[0]
            if db_target != self._target_seqname:
                raise ValueError(
                    "Provided database indexed for %s, expected %s"
                    % (db_target, self._target_seqname)
                )

            record_count = int(
                self._con.execute(
                    "SELECT value FROM meta_data WHERE key = 'record_count'"
                ).fetchone()[0]
            )
            if record_count == -1:
                raise ValueError("Unfinished/partial database provided")

            records_found = int(
                self._con.execute("SELECT COUNT(*) FROM offset_data").fetchone()[0]
            )
            if records_found != record_count:
                raise ValueError(
                    "Expected %s records, found %s.  Corrupt index?"
                    % (record_count, records_found)
                )

            return records_found

        except (dbapi2.OperationalError, dbapi2.DatabaseError) as err:
            raise ValueError(f"Problem with SQLite database: {err}") from None

    def _MafIndex__make_new_index(self):
        """Read MAF file and generate SQLite index (PRIVATE)."""
        # make the tables
        self._con.execute("CREATE TABLE meta_data (key TEXT, value TEXT);")
        self._con.execute(
            "INSERT INTO meta_data (key, value) VALUES (?, ?);",
            ("version", MAFINDEX_VERSION),
        )
        self._con.execute(
            "INSERT INTO meta_data (key, value) VALUES ('record_count', -1);"
        )
        self._con.execute(
            "INSERT INTO meta_data (key, value) VALUES (?, ?);",
            ("target_seqname", self._target_seqname),
        )
        # Determine whether to store maf file as relative to the index or absolute
        if not os.path.isabs(self._maf_file) and not os.path.isabs(self._index_filename):
            mafpath = os.path.relpath(self._maf_file, self._relative_path).replace(
                os.path.sep, "/"
            )
        elif (
            os.path.dirname(os.path.abspath(self._maf_file)) + os.path.sep
        ).startswith(self._relative_path + os.path.sep):
            mafpath = os.path.relpath(self._maf_file, self._relative_path).replace(
                os.path.sep, "/"
            )
        else:
            mafpath = os.path.abspath(self._maf_file)
        self._con.execute(
            "INSERT INTO meta_data (key, value) VALUES (?, ?);",
            ("filename", mafpath),
        )
        
        # Add length_to_next column for precomputed lengths (initialized as NULL)
        self._con.execute(
            "CREATE TABLE offset_data (bin INTEGER, start INTEGER, end INTEGER, offset INTEGER, length_to_next INTEGER DEFAULT NULL);"
        )

        insert_count = 0

        # iterate over the entire file and insert in batches
        mafindex_func = self._MafIndex__maf_indexer()

        while True:
            batch = list(islice(mafindex_func, 100))
            if not batch:
                break

            # Insert batch data without length_to_next for now (defaults to NULL)
            self._con.executemany(
                "INSERT INTO offset_data (bin, start, end, offset) VALUES (?,?,?,?);",
                batch,
            )
            self._con.commit()
            insert_count += len(batch)

        # Update length_to_next column for precomputed lengths
        self._con.execute(
            """
            WITH cte AS (
                SELECT offset, 
                    LEAD(offset) OVER (ORDER BY offset ASC) - offset AS length_to_next
                FROM offset_data
            )
            UPDATE offset_data
            SET length_to_next = cte.length_to_next
            FROM cte
            WHERE offset_data.offset = cte.offset;
            """
        )

        # then make indexes on the relevant fields
        self._con.execute("CREATE INDEX IF NOT EXISTS bin_index ON offset_data(bin);")
        self._con.execute(
            "CREATE INDEX IF NOT EXISTS start_index ON offset_data(start);"
        )
        self._con.execute("CREATE INDEX IF NOT EXISTS end_index ON offset_data(end);")

        self._con.execute(
            f"UPDATE meta_data SET value = '{insert_count}' WHERE key = 'record_count'"
        )

        self._con.commit()

        return insert_count

    def seek_and_read(self, offset, length_to_next):

        #start_seek = time.time()    
        """Seek to a specific offset in the compressed file and read the block."""
        # Query the next offset from SQLite
        #start_sql = time.time()    
        #cursor = self._con.cursor()
        #cursor.execute(
        #    "SELECT offset FROM offset_data WHERE offset > ? ORDER BY offset ASC LIMIT 1;",
        #    (offset,),
        #)
        #next_offset_row = cursor.fetchone()

        # Determine the length of the block
        #if next_offset_row:
        #    next_offset = next_offset_row[0]
        #    print(f'next offset: {next_offset}')
        #    length = next_offset - offset
        #else:
        #   # If no next offset, read until EOF
        #    length = None  # Indicates to read to the end or a large chunk
        #print("sql time:", time.time() - start_sql)
        #start_gz = time.time()    
        data = b""
        if length_to_next is not None:
            cmd = [
                gztool_path,
                "-b", str(offset),  # Offset to seek
                "-r", str(length_to_next),  # Byte length to read
                self._maf_file,
            ]
        else:
            cmd = [
                gztool_path,
                "-b", str(offset),  # Offset to seek
                self._maf_file,
            ]
        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
        data += result.stdout
        #print("gztool time:", time.time() - start_gz)
        start_parse = time.time()  
        from io import StringIO
        handle = StringIO(data.decode())
        try:
            record = next(MafIO.MafIterator(handle))
            #print("parse time:", time.time() - start_parse)
            #print("seek time:", time.time() - start_seek)
            return record
        except ValueError:
            raise RuntimeError("Failed to parse block.")
        
        
    def search(self, starts, ends):
        if len(starts) != len(ends):
            raise ValueError("Every position in starts must have a match in ends")
        for exonstart, exonend in zip(starts, ends):
            exonlen = exonend - exonstart
            if exonlen < 1:
                raise ValueError(
                    "Exon coordinates (%d, %d) invalid: exon length (%d) < 1"
                    % (exonstart, exonend, exonlen)
                )
        con = self._con
        yielded_rec_coords = set()
        for exonstart, exonend in zip(starts, ends):
            try:
                possible_bins = ", ".join(
                    map(str, self._region2bin(exonstart, exonend))
                )
            except TypeError:
                raise TypeError(
                    "Exon coordinates must be integers "
                    "(start=%d, end=%d)" % (exonstart, exonend)
                ) from None

            result = con.execute(
                """
                SELECT start, 
                   end, 
                   offset, 
                   length_to_next
                FROM offset_data
                WHERE bin IN (%s)
                AND (end BETWEEN %s AND %s OR %s BETWEEN start AND end)
                ORDER BY start, end, offset ASC;
                """ % (possible_bins, exonstart, exonend - 1, exonend - 1)
            )

            rows = result.fetchall()
            for rec_start, rec_end, offset, length_to_next in rows:
                print(f'recs info: {rec_start}\t{rec_end}')
                print(f'offsets: {offset},{length_to_next}')
                if (rec_start, rec_end) in yielded_rec_coords:
                    continue
                else:
                    yielded_rec_coords.add((rec_start, rec_end))
                
                if os.path.splitext(self._maf_file)[1] == ".gz":
                    fetched = self._get_record_gz(int(offset), int(length_to_next))
                else:
                    fetched = self._get_record(int(offset))

                for record in fetched:
                    if record.id == self._target_seqname:
                        start = record.annotations["start"]
                        end = start + record.annotations["size"] - 1

                        if not (start == rec_start and end == rec_end):
                            raise ValueError(
                                "Expected %s-%s @ offset %s, found %s-%s"
                                % (rec_start, rec_end, offset, start, end)
                            )

                yield fetched
    def _get_record_gz(self, offset, length_to_next):
        record = self.seek_and_read(offset, length_to_next)
        return record

    def _get_record(self, offset):
        self._maf_fp.seek(offset)
        record = next(self._mafiter)
        return record

In [None]:
start_total = time.time()  
#target_species = 'hg38'
target_species = 'Homo_sapiens'
chrom = 'chr1'
#maf_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/cactus447/chr1.maf.gz'
#mafindex_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/cactus447/chr1.mafindex'
maf_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/zoonomia_241_species/241-mammalian-2020v2b.maf.chr1.gz'
mafindex_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/zoonomia_241_species/241-mammalian-2020v2b.maf.chr1.mafindex'

strand = '-'
start = 4381114
end = 4382190
maf_id = f'{target_species}.{chrom}'
    
index_maf = gzMafIndex(mafindex_file, maf_file, maf_id) 
n_strand = -1 if strand == '-' else 1
print([start],[end],n_strand)
results =index_maf.get_spliced([start],[end],n_strand)
print("total time:", time.time() - start_total)

[4381114] [4382190] -1
recs info: 4381109	4381295
offsets: 9207960614,30004
recs info: 4381296	4381369
offsets: 9207990618,2861
recs info: 4381370	4381457
offsets: 9207993479,3025
recs info: 4381458	4381486
offsets: 9207996504,1786
recs info: 4381487	4381581
offsets: 9207998290,3172
recs info: 4381582	4381651
offsets: 9208001462,2647
recs info: 4381652	4381729
offsets: 9208004109,2815
recs info: 4381730	4381827
offsets: 9208006924,3392
recs info: 4381828	4381828
offsets: 9208010316,1177
recs info: 4381829	4381840
offsets: 9208011493,1567
recs info: 4381841	4381841
offsets: 9208013060,1233
recs info: 4381842	4381895
offsets: 9208014293,2534
recs info: 4381896	4381896
offsets: 9208016827,1233
recs info: 4381897	4381909
offsets: 9208018060,1591
recs info: 4381910	4381917
offsets: 9208019651,1453
recs info: 4381918	4381918
offsets: 9208021104,1075
recs info: 4381919	4382115
offsets: 9208022179,5846
recs info: 4382116	4382194
offsets: 9208028025,2971
total time: 3.901312828063965


In [None]:
start_total = time.time()  
start_list = [start]
end_list = [end]
start_flanked=[min(start_list)-5000] + start_list + [max(end_list)]
end_flanked = [min(start_list)] + end_list + [max(end_list)+5000]
results =index_maf.get_spliced(start_flanked,end_flanked,n_strand)
print("total time:", time.time() - start_total)

recs info: 4376114	4376114
offsets: 9195034037,6801
recs info: 4376115	4376115
offsets: 9195040838,5528
recs info: 4376116	4376117
offsets: 9195046366,6892
recs info: 4376118	4376118
offsets: 9195053258,6629
recs info: 4376119	4376119
offsets: 9195059887,5841
recs info: 4376120	4376131
offsets: 9195065728,9790
recs info: 4376132	4376132
offsets: 9195075518,6727
recs info: 4376133	4376136
offsets: 9195082245,8529
recs info: 4376137	4376137
offsets: 9195090774,7769
recs info: 4376138	4376138
offsets: 9195098543,8030
recs info: 4376139	4376139
offsets: 9195106573,8030
recs info: 4376140	4376140
offsets: 9195114603,8030
recs info: 4376141	4376141
offsets: 9195122633,7963
recs info: 4376142	4376142
offsets: 9195130596,7903
recs info: 4376143	4376143
offsets: 9195138499,8030
recs info: 4376144	4376146
offsets: 9195146529,8244
recs info: 4376147	4376147
offsets: 9195154773,6787
recs info: 4376148	4376150
offsets: 9195161560,8244
recs info: 4376151	4376154
offsets: 9195169804,8061
recs info: 4

In [None]:
#target_species = 'hg38'
target_species = 'Homo_sapiens'
chrom = 'chr1'
#maf_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/cactus447/chr1.maf.gz'
#mafindex_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/cactus447/chr1.mafindex'
maf_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/zoonomia_241_species/241-mammalian-2020v2b.maf.chr1'
mafindex_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/zoonomia_241_species/241-mammalian-2020v2b.maf.chr1.mafindex'

strand = '-'
start = 4381114
end = 4382190
maf_id = f'{target_species}.{chrom}'
    
index_maf = MafIO.MafIndex(mafindex_file, maf_file, maf_id) 
n_strand = -1 if strand == '-' else 1
results_old =index_maf.get_spliced([start],[end],n_strand)

In [None]:
#target_species = 'hg38'
target_species = 'Homo_sapiens'
chrom = 'chr1'
#maf_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/cactus447/chr1.maf.gz'
#mafindex_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/cactus447/chr1.mafindex'
maf_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/zoonomia_241_species/241-mammalian-2020v2b.maf.chr1'
mafindex_file = '/rds/project/rds-XrHDlpCeVDg/users/pakkanan/data/resource/multi_species_multiple_alignment_maf/zoonomia_241_species/241-mammalian-2020v2b.maf.chr1.mafindex'

strand = '-'
start = 4381114
end = 4382190
maf_id = f'{target_species}.{chrom}'
    
index_maf = MafIO.MafIndex(mafindex_file, maf_file, maf_id) 
n_strand = -1 if strand == '-' else 1
results_old =index_maf.get_spliced([start],[end],n_strand)

In [None]:
start_list = [start]
end_list = [end]
start_flanked=[min(start_list)-5000] + start_list + [max(end_list)]
end_flanked = [min(start_list)] + end_list + [max(end_list)+5000]
results =index_maf.get_spliced(start_flanked,end_flanked,n_strand)