# SQL Alchemy : usage of union with order by and relationship




In [7]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from testcontainers.mysql import MySqlContainer

MYSQL_VERSION = '5.7.17'

def run_db():
    """This function will run an instance of mysql db container and yield a testcontainer object
    This object will be used to CRUD data and profile the performance
    """
    with MySqlContainer(f'mysql:{MYSQL_VERSION}') as mysql:
        yield mysql

# Mysql Test container
mysql = run_db()
# Connection string
conn_string = next(mysql).get_connection_url()
# Engine object
engine = create_engine(conn_string)
# Session object
Session = sessionmaker(bind=engine)
print(f"DB ready for connection at URL : {conn_string}")

Pulling image mysql:5.7.17
Container started: e17d6a7465
Waiting to be ready...


DB ready for connection at URL : mysql+pymysql://test:test@localhost:49153/test


## Declare all needed table

In [10]:

import time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
# Declarative base object
Base = declarative_base()


def get_epoch_time_milliseconds() :
    return int(time.time() * 1000)


class Kv(Base):
    __tablename__ = 'kv'
    id = Column(Integer, primary_key=True, autoincrement=True)
    org_id = Column(Integer)
    position = Column(Integer)
    kvis = relationship("Kvi", back_populates='kv')

class Kvi(Base):
    __tablename__ = 'kvi'
    id = Column(Integer, primary_key=True, autoincrement=True)
    vid = Column(Integer, ForeignKey('kv.id'))
    interpretation = Column(Text(65535))
    kv = relationship("Kv", back_populates='kvis')

    
Base.metadata.create_all(engine)


In [13]:
import random
import string

def random_string():
    return ''.join(random.choice(string.ascii_letters) for i in range(10))

ORG_IDS = [1,2,3]

with Session() as session:
    for i in range(1000):
        random_kv = Kv(org_id=random.choice(ORG_IDS), position=random.randint(15000,50000))
        random_kvi = Kvi(kv=random_kv, interpretation=random_string())
        session.add(random_kv)
        session.add(random_kvi)

    # Add some duplicate row by position accross different organization
    SAME_POSITION = 1000
    random_kv_1 = Kv(org_id=1, position=SAME_POSITION)
    random_kv_2 = Kv(org_id=2, position=SAME_POSITION)
    random_kv_3 = Kv(org_id=2, position=SAME_POSITION)
    random_kvi_1 = Kvi(kv=random_kv_1, interpretation=random_string())
    random_kvi_2 = Kvi(kv=random_kv_2, interpretation=random_string())
    random_kvi_3 = Kvi(kv=random_kv_3, interpretation=random_string())

    session.add(random_kv_1)
    session.add(random_kv_2)
    session.add(random_kv_3)
    session.add(random_kvi_1)
    session.add(random_kvi_2)
    session.add(random_kvi_3)

    session.commit()

## Session Query syntax

In [21]:
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import aliased
from sqlalchemy import and_


specific_query = session.query(Kv).join(Kvi).options(contains_eager(Kv.kvis)).filter(Kv.org_id ==2)

with Session() as session :
    result = specific_query.all()
    print(f"Type of result without union : {type(result[0])}")

v1 = aliased(Kv)
v2 = aliased(Kv)
sub_exists = session.query(1).filter(and_(v2.position == v1.position, v2.org_id == 2)).exists()
exclude_query = session.query(Kv).join(Kvi).options(contains_eager(Kv.kvis)).filter(~sub_exists, v1.org_id.in_([1,3]))
with Session() as session :
    my_query = exclude_query.union(specific_query)
    result = my_query.all()
    print(f"Type of result with union : {type(result[0])}")


Type of result without union : <class '__main__.Kv'>
Type of result with union : <class '__main__.Kv'>


## 2.0 Syntax

In [22]:
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import aliased
from sqlalchemy import and_
from sqlalchemy import select


specific_query = select(Kv).join(Kvi).options(contains_eager(Kv.kvis)).filter(Kv.org_id ==2)

with Session() as session :
    result = session.execute(specific_query).unique().scalars().all()
    print(f"Type of result without union : {type(result[0])}")

v1 = aliased(Kv)
v2 = aliased(Kv)
sub_exists = select(1).filter(and_(v2.position == v1.position, v2.org_id == 2)).exists()
exclude_query = select(Kv).join(Kvi).options(contains_eager(Kv.kvis)).filter(~sub_exists, v1.org_id.in_([1,3]))
with Session() as session :
    my_query = exclude_query.union(specific_query)
    result =session.execute(my_query).unique().scalars().all()
    print(f"Type of result with union : {type(result[0])}")

Type of result without union : <class '__main__.Kv'>


  result =session.execute(my_query).unique().scalars().all()


Type of result with union : <class 'int'>


## Create Deletion Variant

Deletions are more simple to represents since it will generally be only one range (one row in location table) and one variant.

In [None]:
# First let's declare the variant with DEL type for "deletion"
variant = Variant(variant_type=VariantTypeEnum.DEL)

def add_deletion_g_dot(variant, assembly, chromosome, start, end) :
    refseq_version = 10 if assembly == AssemblyEnum.GRCH37 else 11
    with Session() as session:
        session.add(variant)
        location = Posedit(
            variant = variant,
            reference_sequence = f"NC_0000{chromosome}.{refseq_version}",
            assembly = assembly,
            type = PoseditTypeEnum.g,
            chromosome = chromosome,
            ref = None, # here ref would be calculated based on the reference sequence fasta, start and end
            start_base = start,
            end_base = end,
        )
        session.add(location)
        session.commit()

add_deletion_g_dot(variant, AssemblyEnum.GRCH37, 1, 8500, 8650)
add_deletion_g_dot(variant, AssemblyEnum.GRCH38, 1, 7300, 7450)


## Insert n Rows of Deletion variant

In [None]:
for i in range(ROWS_NUMBER):
    variant = Variant(variant_type=VariantTypeEnum.DEL)
    pos_37 = get_fake_pos()
    pos_38 = get_fake_pos()
    add_deletion_g_dot(variant, AssemblyEnum.GRCH37, get_fake_chrom(),  pos_37, pos_37 + random.randint(1,1000))
    add_deletion_g_dot(variant, AssemblyEnum.GRCH38, get_fake_chrom(),  pos_38, pos_38 + random.randint(1,1000))

## Retrieve all deletion variant

In [None]:
Index('vartype_idx1', Variant.variant_type).create(bind=engine)
with Session() as session :
    query = session.query(Variant).join(Variant.locations).options(contains_eager(Variant.locations)).filter(Variant.variant_type == VariantTypeEnum.DEL)
    explain_query(query, session)
    execute_query(query)


              EXPLAIN QUERY                   
SELECT posedit.id, posedit.variant_id, posedit.reference_sequence, posedit.assembly, posedit.type, posedit.uncertain, posedit.chromosome, posedit.ref, posedit.ref_path, posedit.alt, posedit.alt_path, posedit.start_base, posedit.start_offset, posedit.start_uncertain, posedit.end_base, posedit.end_offset, posedit.end_uncertain, posedit.copy_allele_1, posedit.copy_allele_2, variant.id AS id_1, variant.variant_type 
FROM variant INNER JOIN posedit ON variant.id = posedit.variant_id 
WHERE variant.variant_type = 'DEL'
Parameter      Value
-------------  --------------------
id             1
select_type    SIMPLE
table          variant
partitions
type           ref
possible_keys  PRIMARY,vartype_idx1
key            vartype_idx1
key_len        8
ref            const
rows           99973
filtered       100.0
Extra          Using index


Execution time for : execute_query
Time Type             Time (ms)
------------------  -----------
CPU time     

## STR Variant

In [None]:


def add_str_g_dot(variant, assembly, start, end, chromosome, ref, copy_allele_1, copy_allele_2) :
    refseq_version = 10 if assembly == AssemblyEnum.GRCH37 else 11
    with Session() as session:
        session.add(variant)
        location = Posedit(
            variant = variant,
            reference_sequence = f"NC_0000{chromosome}.{refseq_version}",
            assembly = assembly,
            type = PoseditTypeEnum.g,
            chromosome = chromosome,
            ref = ref, 
            start_base= start,
            end_base= end,
            copy_allele_1 = copy_allele_1,
            copy_allele_2 = copy_allele_2
        )
        session.add(location)
        session.commit()

In [None]:
for i in range(ROWS_NUMBER):
    variant = Variant(variant_type=VariantTypeEnum.DUP)
    chrom = get_fake_chrom()
    pos_37 = get_fake_pos()
    pos_38 = get_fake_pos()
    add_str_g_dot(variant, AssemblyEnum.GRCH37,  pos_37, pos_37 + 200, chrom, 'TCC', random.randint(5,30), random.randint(5,30))
    add_str_g_dot(variant, AssemblyEnum.GRCH38,  pos_38, pos_38 + 200, chrom, 'TCC', random.randint(5,30), random.randint(5,30))

KeyboardInterrupt: 

## INSERTIONS

In [None]:
from typing import Union
def add_insertion_g_dot(variant, assembly, start, chromosome, ref, alt) :
    refseq_version = 10 if assembly == AssemblyEnum.GRCH37 else 11
    with Session() as session:
        session.add(variant)
        location = Posedit(
            variant = variant,
            reference_sequence = f"NC_0000{chromosome}.{refseq_version}",
            assembly = assembly,
            type = PoseditTypeEnum.g,
            uncertain = False,
            chromosome = chromosome,
            ref = ref,
            alt = alt,
            start_base = start,
            end_base = start + 1,
        )
        session.add(location)
        session.commit()

for i in range(ROWS_NUMBER):
    variant = Variant(variant_type=VariantTypeEnum.INS)
    add_insertion_g_dot(variant, AssemblyEnum.GRCH37,  get_fake_pos(), get_fake_chrom(), get_fake_ref(), get_fake_alt())
