# Goals

* Test connecting and updating a GCP Cloud SQL database

In [34]:
import os
import warnings
from importlib import resources
import psycopg2
from dynaconf import Dynaconf
from psycopg2.extensions import connection
import pandas as pd
from pypika import Query, Table, Field, Column, Table

In [35]:
from dotenv import load_dotenv
load_dotenv()
os.environ["DYNACONF"] = "test"
warnings.filterwarnings("ignore", message="pandas only supports SQLAlchemy connectable")

# Connect

In [36]:
def db_connect() -> connection:
    """Connect to the sql database"""
    s_path = None
    with resources.path("SRAgent", "settings.yml") as settings_path:
        s_path = str(settings_path)
    settings = Dynaconf(
        settings_files=["settings.yml", s_path], 
        environments=True, 
        env_switcher="DYNACONF"
    )
    # connect to db
    db_params = {
        'host': settings.db_host,
        'database': settings.db_name,
        'user': settings.db_user,
        'password': os.environ["GCP_SQL_DB_PASSWORD"],
        'port': settings.db_port,
        'connect_timeout': settings.db_timeout
    }
    return psycopg2.connect(**db_params)

conn = db_connect()

  with resources.path("SRAgent", "settings.yml") as settings_path:


In [37]:
def execute_query(stmt, conn):
    try:
        with conn.cursor() as cur:
            cur.execute(str(stmt))
            conn.commit() 
    except psycopg2.errors.DuplicateTable as e:
        print(f"Table already exists: {e}")

In [38]:
# list tables in pypika
def list_tables_pypika():
    tables = Table('tables', schema='information_schema')
    query = Query.from_(tables).select('table_name').where(tables.table_schema == 'public')
    with conn.cursor() as cur:
        cur.execute(str(query))
        tables = cur.fetchall()
        return tables
list_tables_pypika()

[('srx_srr',), ('screcounter',), ('srx_metadata',), ('eval',)]

# Create tables

In [None]:
# SRX_metadata
stmt = Query \
    .create_table("srx_metadata") \
    .columns(
        Column("id", "SERIAL", nullable=False),
        Column("database", "VARCHAR(20)", nullable=False),
        Column("entrez_id", "INT", nullable=False),
        Column("srx_accession", "VARCHAR(20)"),
        Column("is_illumina", "VARCHAR(10)"),
        Column("is_single_cell", "VARCHAR(10)"),
        Column("is_paired_end", "VARCHAR(10)"),
        Column("lib_prep", "VARCHAR(30)"),
        Column("tech_10x", "VARCHAR(30)"),
        Column("cell_prep", "VARCHAR(30)"),
        Column("organism", "VARCHAR(80)"),
        Column("tissue", "VARCHAR(80)"),
        Column("disease", "VARCHAR(100)"),
        Column("purturbation", "VARCHAR(100)"),
        Column("cell_line", "VARCHAR(100)"),
        Column("notes", "TEXT"),
    ) \
    .unique("database", "entrez_id") \
    .primary_key("id")

execute_query(stmt, conn)

In [14]:
# SRX_SRR
stmt = Query \
    .create_table("srx_srr") \
    .columns(
        Column("id", "SERIAL", nullable=False),
        Column("srx_accession", "VARCHAR(20)", nullable=False),
        Column("srr_accession", "VARCHAR(20)", nullable=False)
    ) \
    .unique("srx_accession", "srr_accession") \
    .primary_key("id")

execute_query(stmt, conn)

InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block


In [None]:
# scRecounter log
stmt = Query \
    .create_table("screcounter") \
    .columns(
        Column("id", "SERIAL", nullable=False),
        Column("database", "VARCHAR(20)", nullable=False),
        Column("entrez_id", "INT", nullable=False),
        Column("sample", "VARCHAR(20)", nullable=False),
        Column("accession", "VARCHAR(20)", nullable=False),
        Column("pipeline_version", "VARCHAR(10)", nullable=False),
        Column("run_id", "VARCHAR(30)", nullable=False),
        Column("task_name", "VARCHAR(20)", nullable=False),
        Column("task_exit_status", "VARCHAR(10)"),
        Column("log", "TEXT", nullable=False)
    ) \
    .primary_key("id")

execute_query(stmt, conn)

In [20]:
# ground truth
stmt = Query \
    .create_table("eval") \
    .columns(
        Column("id", "SERIAL", nullable=False),
        Column("dataset_id", "VARCHAR(30)", nullable=False),
        Column("database", "VARCHAR(20)", nullable=False),
        Column("entrez_id", "INT", nullable=False),
        Column("srx_accession", "VARCHAR(20)"),
        Column("is_illumina", "VARCHAR(10)"),
        Column("is_single_cell", "VARCHAR(10)"),
        Column("is_paired_end", "VARCHAR(10)"),
        Column("lib_prep", "VARCHAR(30)"),
        Column("tech_10x", "VARCHAR(30)"),
        Column("organism", "VARCHAR(80)"),
        Column("cell_prep", "VARCHAR(30)"),
    ) \
    .unique("dataset_id", "database", "entrez_id") \
    .primary_key("id")

execute_query(stmt, conn)

# Delete tables

> WARNING

In [None]:
# for table in ["srx_metadata", "srx_srr", "screcounter"]:
for table in ["srx_metadata"]:
#for table in ["ground_truth"]:
    stmt = Query.drop_table(table)
    print(str(stmt))
    execute_query(stmt, conn)

DROP TABLE "srx_metadata"


# Insert data

In [33]:
srx_metadata = Table("srx_metadata")

q = Query.into(srx_metadata) \
    .columns('database', 'entrez_id', 'srx_accession', 'is_illumina', 'is_single_cell', 'is_paired_end', 'is_10x', 'tech_10x', 'organism') \
    .insert('sra', 35087715, 'SRX25994842', 'yes', 'yes', 'yes', 'yes', '3_prime_gex', 'human')

execute_query(q, conn)

# Query data

In [40]:
tbl = Table("srx_metadata")
#tbl = Table("srx_srr")
stmt = Query \
    .from_(tbl) \
    .select("*") 
df = pd.read_sql(str(stmt), conn)
df.head()

Unnamed: 0,id,database,entrez_id,srx_accession,is_illumina,is_single_cell,is_paired_end,lib_prep,tech_10x,cell_prep,organism,tissue,disease,purturbation,cell_line
0,11,sra,35087715,SRX25994842,yes,yes,yes,10x_Genomics,3_prime_gex,single_cell,human,subcutaneous adipose tissue,breast cancer-related lymphedema,injection of adipose-derived regenerative cell...,adipose derived regenerative cells/stromal vas...
1,14,sra,36178506,ERX11887200,yes,yes,yes,10x_Genomics,3_prime_gex,single_cell,mouse,lung tumor,murine lung cancer,murine recombinant coronavirus vector (mCOV) t...,Lewis lung carcinoma (LLC) expressing LCMV gly...
2,15,sra,30749595,SRX22716300,yes,yes,yes,10x_Genomics,3_prime_gex,single_cell,human,bone marrow,no,no treatment,primary cells (CD34+ cells)
3,20,sra,18060880,SRX13201194,yes,no,no,not_applicable,not_applicable,not_applicable,mouse,bone marrow,not specified,Notch1+/- genotype,Common Lymphoid Progenitor (CLP)
4,21,sra,36106630,SRX26727599,yes,no,yes,not_applicable,not_applicable,not_applicable,mouse,spinal cord,experimental autoimmune encephalomyelitis (EAE...,EAE_vehicle (control treatment),other


In [9]:
# write to db directory
outfile = "../db/2024-12-16_srx_metadata.csv"
df.to_csv(outfile, index=False)

In [30]:
srx_metadata = Table("srx_metadata")
stmt = Query \
    .from_(srx_metadata) \
    .where(srx_metadata.entrez_id == 35537624) \
    .select("*") 
pd.read_sql(str(stmt), conn)

Unnamed: 0,id,database,entrez_id,srx_accession,is_illumina,is_single_cell,is_paired_end,lib_prep,tech_10x,cell_prep,organism,tissue,disease,purturbation,cell_line
0,27,sra,35537624,SRX26314714,no,yes,no,other,not_applicable,single_cell,human,pancreatic ductal adenocarcinoma cell line,pancreatic ductal adenocarcinoma,"neoadjuvant therapy, tumor regression grade 2",HG008-T


In [10]:
srx_srr = Table("srx_srr")
stmt = Query \
    .from_(srx_srr) \
    .select("*") 
pd.read_sql(str(stmt), conn)

Unnamed: 0,id,srx_accession,srr_accession
0,1,SRX26727599,SRR31350667
1,2,SRX23538581,SRR27876733
2,3,SRX23261451,SRR27592690
3,4,SRX23261451,SRR27592688
4,5,SRX23261451,SRR27592689
5,6,SRX23261451,SRR27592687
6,8,SRX25994842,SRR30571763
7,9,ERX11887200,ERR12511670
8,10,ERX11887200,ERR12511661


In [None]:
tbl = Table("screcounter")
stmt = Query \
    .from_(tbl) \
    .select("*") 
pd.read_sql(str(stmt), conn)

  pd.read_sql(str(stmt), conn)


Unnamed: 0,id,sample_id,pipeline_version,run_id,task_name,task_exit_status,log


In [23]:
tbl = Table("eval")
stmt = Query \
    .from_(tbl) \
    .select("*") 
pd.read_sql(str(stmt), conn)

Unnamed: 0,id,dataset_id,database,entrez_id,srx_accession,is_illumina,is_single_cell,is_paired_end,lib_prep,tech_10x,organism,cell_prep
0,1,eval1,sra,35087715,SRX25994842,yes,yes,yes,10X_Genomics,3_prime_gex,human,single_cell
1,2,eval1,sra,36178506,ERX11887200,yes,yes,yes,10X_Genomics,3_prime_gex,mouse,single_cell
2,3,eval1,sra,30749595,SRX22716300,yes,yes,yes,10X_Genomics,3_prime_gex,human,single_cell
3,4,eval1,sra,18060880,SRX13201194,yes,no,no,not_applicable,not_applicable,mouse,not_applicable
4,5,eval1,sra,36106630,SRX26727599,yes,no,yes,not_applicable,not_applicable,mouse,not_applicable
5,6,eval1,sra,35979902,SRX26636208,yes,no,yes,not_applicable,not_applicable,human,not_applicable
6,7,eval1,sra,35536066,SRX26313156,yes,no,yes,not_applicable,not_applicable,human,not_applicable
7,8,eval1,sra,31054998,SRX22985325,yes,no,yes,not_applicable,not_applicable,human,not_applicable
8,9,eval1,sra,35200088,SRX26085693,yes,yes,yes,other,not_applicable,chicken,
9,10,eval1,sra,34439895,ERX9692805,yes,yes,yes,other,3_prime_gex,chicken,


In [12]:
# filter to get unprocessed records
srx_metadata = Table("srx_metadata")
stmt = Query \
    .from_(srx_metadata) \
    .select("*") \
    .where((srx_metadata.processed != "complete") | (srx_metadata.processed.isnull())) \
    .where(srx_metadata.database == "sra")
pd.read_sql(str(stmt), conn)

  pd.read_sql(str(stmt), conn)


Unnamed: 0,id,database,entrez_id,srx_accession,is_illumina,is_single_cell,is_paired_end,is_10x,tech_10x,organism,processed


# Remove records

In [25]:
# remove from "srx_metadata" based on id column
record_id = 19
tbl = Table("srx_metadata")
stmt = Query \
    .from_(tbl) \
    .where(tbl.id == record_id) \
    .delete()
execute_query(stmt, conn)

# Add ground truth

In [17]:
df = pd.read_csv("../data/ground_truth1.csv")
df["dataset_id"] = "ground_truth1"
df.head()

Unnamed: 0,database,entrez_id,SRX,is_illumina,is_single_cell,is_paired_end,is_10x,tech_10x,organism,dataset_id
0,sra,35087715,SRX25994842,yes,yes,yes,yes,3_prime_gex,human,ground_truth1
1,sra,36178506,ERX11887200,yes,yes,yes,yes,3_prime_gex,mouse,ground_truth1
2,sra,30749595,SRX22716300,yes,yes,yes,yes,3_prime_gex,human,ground_truth1
3,sra,18060880,SRX13201194,yes,no,no,no,other,mouse,ground_truth1
4,sra,36106630,SRX26727599,yes,no,yes,no,other,mouse,ground_truth1


# Find blocking locks

In [32]:
query = """SELECT
    pid,
    usename,
    pg_blocking_pids(pid) AS blocked_by,
    state,
    query,
    NOW() - query_start AS duration
FROM pg_stat_activity
WHERE state != 'idle';
"""

PIDs = []
with conn.cursor() as cur:
    cur.execute(query)
    for x in cur.fetchall():
        PIDs.append(x[0])
PIDs

[244880]

In [33]:
query = "SELECT pg_terminate_backend({pid})"
with conn.cursor() as cur:
    for pid in PIDs:
        cur.execute(query.format(pid=pid))
        conn.commit()

OperationalError: SSL connection has been closed unexpectedly
