Skip to content

Commit

Permalink
Adding function annotations and type-hints.
Browse files Browse the repository at this point in the history
  • Loading branch information
regentsai committed Mar 23, 2023
1 parent 0095a6f commit f3de1ea
Show file tree
Hide file tree
Showing 152 changed files with 2,161 additions and 1,406 deletions.
23 changes: 12 additions & 11 deletions bin/ant_thony.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import logging
from multiprocessing import Process, cpu_count

from lightdock.util.typing import StrOrPath, List

logging.basicConfig(
format="[\U0001F41C-Thony] %(levelname)s: %(message)s", level=logging.DEBUG
)
Expand All @@ -15,11 +17,11 @@
class Task:
"""A task class"""

def __init__(self, command, path="."):
def __init__(self, command:StrOrPath, path:StrOrPath=".") -> None:
self.command = command
self.path = path

def run(self):
def run(self) -> None:
"""Runs a command in the given path"""
os.chdir(self.path)
os.system(self.command)
Expand All @@ -28,15 +30,15 @@ def run(self):
class Ant(Process):
"""Ant-Thony's buddies"""

created = 0
created: int = 0

def __init__(self, tasks):
def __init__(self, tasks: List[Task]) -> None:
super().__init__(name=f"\U0001F41C-{Ant.created+1}")
self.tasks = tasks
logging.info(f"{self.name} ready with {len(self.tasks)} tasks")
Ant.created += 1

def run(self):
def run(self) -> None:
"""Runs all the assigned tasks"""
for task in self.tasks:
task.run()
Expand All @@ -46,7 +48,7 @@ def run(self):
class Ant_Thony:
"""Our buddy Ant-Thony"""

def __init__(self, tasks, num_cpus=0):
def __init__(self, tasks: List[Task], num_cpus: int = 0) -> None:
try:
self.num_processes = int(num_cpus)
if self.num_processes < 1:
Expand All @@ -61,7 +63,7 @@ def __init__(self, tasks, num_cpus=0):

self.tasks = tasks
self.num_tasks = len(tasks)
self.workers = []
self.workers: List[Ant] = []
workers_tasks = [
tasks[i :: self.num_processes] for i in range(self.num_processes)
]
Expand All @@ -70,7 +72,7 @@ def __init__(self, tasks, num_cpus=0):
worker = Ant(workers_tasks[i])
self.workers.append(worker)

def release(self):
def release(self) -> None:
logging.info("Swarming!")
for ant in self.workers:
ant.start()
Expand All @@ -80,14 +82,13 @@ def release(self):

logging.info(f"{self.num_tasks} tasks done")

def go_home(self):
def go_home(self) -> None:
for ant in self.workers:
ant.terminate()
logging.info("All \U0001F41C back to the nest")


if __name__ == "__main__":

parser = argparse.ArgumentParser(prog="ant_thony")
parser.add_argument(
"tasks_file_name",
Expand All @@ -107,7 +108,7 @@ def go_home(self):
args = parser.parse_args()

with open(args.tasks_file_name) as handle:
all_tasks = []
all_tasks: List[Task] = []
for line in handle:
if line and not line.startswith("#"):
all_tasks.append(Task(line.rstrip(os.linesep)))
Expand Down
2 changes: 1 addition & 1 deletion bin/lgd_calculate_diameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
log = LoggingManager.get_logger("diameter")


def parse_command_line():
def parse_command_line() -> argparse.Namespace:
parser = argparse.ArgumentParser(prog="calculate_diameter")
parser.add_argument(
"pdb", help="PDB file for structure to calculate maximum diameter"
Expand Down
21 changes: 11 additions & 10 deletions bin/lgd_calculate_reference_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@

import os
import argparse
from lightdock.util.typing import List, NDArrayFloat1D, StrOrPath, Sequence, Union
from lightdock.constants import (
DEFAULT_LIST_EXTENSION,
DEFAULT_REFERENCE_POINTS_EXTENSION,
)
from lightdock.error.lightdock_errors import MinimumVolumeEllipsoidError
from lightdock.mathutil.ellipsoid import MinimumVolumeEllipsoid
from lightdock.pdbutil.PDBIO import parse_complex_from_file, create_pdb_from_points
from lightdock.structure.complex import Complex
from lightdock.structure.complex import Complex, Structure_
from lightdock.util.logger import LoggingManager


script_name = "reference_points"
log = LoggingManager.get_logger(script_name)


def parse_command_line():
def parse_command_line() -> argparse.Namespace:
parser = argparse.ArgumentParser(prog=script_name)
parser.add_argument(
"structure", help="structure to calculate reference points", metavar="structure"
Expand All @@ -34,9 +35,9 @@ def parse_command_line():
return parser.parse_args()


def get_pdb_files(input_file_name):
def get_pdb_files(input_file_name:StrOrPath) -> List[str]:
"""Get a list of the PDB files in the input_file_name"""
structure_file_names = []
structure_file_names:List[str] = []
with open(input_file_name) as input_file:
for line in input_file:
structure_file_name = line.rstrip(os.linesep)
Expand All @@ -45,7 +46,7 @@ def get_pdb_files(input_file_name):
return structure_file_names


def get_point_respresentation(point):
def get_point_respresentation(point:Union[Sequence[float],NDArrayFloat1D]) -> str:
return "%8.5f %8.5f %8.5f" % (point[0], point[1], point[2])


Expand All @@ -54,13 +55,13 @@ def get_point_respresentation(point):
# Parse command line
args = parse_command_line()

atoms_to_ignore = []
atoms_to_ignore:List[str] = []
if args.noxt:
atoms_to_ignore.append("OXT")

structures = []
file_names = []
file_name, file_extension = os.path.splitext(args.structure)
structures:List[Structure_] = []
file_names:List[str] = []
file_name, file_extension = os.path.splitext(args._)
if file_extension == DEFAULT_LIST_EXTENSION:
file_names.extend(get_pdb_files(args.structure))
else:
Expand Down Expand Up @@ -98,7 +99,7 @@ def get_point_respresentation(point):
output.write(get_point_respresentation(ellipsoid.center) + os.linesep)
log.info("Points written to %s" % output_file_name)

points = [point for point in ellipsoid.poles]
points: List[Union[List[float], NDArrayFloat1D]] = [point for point in ellipsoid.poles]
points.append(ellipsoid.center)
pdb_file_name = output_file_name + ".pdb"
create_pdb_from_points(pdb_file_name, points)
Expand Down
13 changes: 7 additions & 6 deletions bin/lgd_calculate_scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import argparse
import importlib
from lightdock.pdbutil.PDBIO import parse_complex_from_file
from lightdock.scoring.functions import ModelAdapter, ScoringFunction
from lightdock.structure.complex import Complex
from lightdock.util.logger import LoggingManager


log = LoggingManager.get_logger("calculate_scoring")


def parse_command_line():
def parse_command_line() -> argparse.Namespace:
parser = argparse.ArgumentParser(prog="calculate_scoring")
parser.add_argument("scoring_function", help="scoring function")
parser.add_argument("receptor", help="PDB receptor")
Expand All @@ -25,7 +26,7 @@ def parse_command_line():
args = parse_command_line()

try:
scoring_function_module = "lightdock.scoring.%s.driver" % args.scoring_function
scoring_function_module:str = "lightdock.scoring.%s.driver" % args.scoring_function
module = importlib.import_module(scoring_function_module)
except ImportError:
raise SystemExit("Scoring function not found or not available")
Expand All @@ -35,11 +36,11 @@ def parse_command_line():
atoms, residues, chains = parse_complex_from_file(args.ligand)
ligand = Complex(chains, atoms, structure_file_name=args.ligand)

CurrentScoringFunction = getattr(module, "DefinedScoringFunction")
CurrentModelAdapter = getattr(module, "DefinedModelAdapter")
CurrentScoringFunction:ScoringFunction = getattr(module, "DefinedScoringFunction")
CurrentModelAdapter:ModelAdapter = getattr(module, "DefinedModelAdapter")

adapter = CurrentModelAdapter(receptor, ligand)
scoring_function = CurrentScoringFunction()
adapter: ModelAdapter = CurrentModelAdapter(receptor, ligand)
scoring_function: ScoringFunction = CurrentScoringFunction()

energy = scoring_function(
adapter.receptor_model,
Expand Down
24 changes: 12 additions & 12 deletions bin/lgd_cluster_bsas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@

import argparse
from pathlib import Path
from prody import parsePDB, confProDy, calcRMSD
from lightdock.util.analysis import read_lightdock_output
from prody import AtomGroup, parsePDB, confProDy, calcRMSD
from lightdock.util.analysis import DockingResult, read_lightdock_output
from lightdock.util.logger import LoggingManager
from lightdock.constants import CLUSTER_REPRESENTATIVES_FILE
from lightdock.util.typing import List, Dict, Any

# Disable ProDy output
confProDy(verbosity="info")

log = LoggingManager.get_logger("lgd_cluster_bsas")


def parse_command_line():
def parse_command_line() -> argparse.Namespace:
"""Parses command line arguments"""
parser = argparse.ArgumentParser(prog="lgd_cluster_bsas")

Expand All @@ -26,17 +27,17 @@ def parse_command_line():
return parser.parse_args()


def get_backbone_atoms(ids_list, swarm_path):
def get_backbone_atoms(ids_list: List[int], swarm_path: Path) -> Dict[int, Any]: # FIXME:ensure ca_atoms' type
"""Get all backbone atoms (CA or P) of the PDB files specified by the ids_list.
PDB files follow the format lightdock_ID.pdb where ID is in ids_list
"""
ca_atoms = {}
ca_atoms: Dict[int, Any] = {}
try:
for struct_id in ids_list:
pdb_file = swarm_path / f"lightdock_{struct_id}.pdb"
log.info(f"Reading CA from {pdb_file}")
structure = parsePDB(str(pdb_file))
structure: AtomGroup = parsePDB(str(pdb_file))
selection = structure.select("name CA P")
ca_atoms[struct_id] = selection
except IOError as e:
Expand All @@ -48,22 +49,22 @@ def get_backbone_atoms(ids_list, swarm_path):
return ca_atoms


def clusterize(sorted_ids, swarm_path):
def clusterize(sorted_ids: List[int], swarm_path: Path) -> Dict[int, List[int]]:
"""Clusters the structures identified by the IDS inside sorted_ids list"""

clusters_found = 0
clusters = {clusters_found: [sorted_ids[0]]}
clusters: Dict[int, List[int]] = {clusters_found: [sorted_ids[0]]}

# Read all structures backbone atoms
backbone_atoms = get_backbone_atoms(sorted_ids, swarm_path)

for j in sorted_ids[1:]:
log.info("Glowworm %d with pdb lightdock_%d.pdb" % (j, j))
in_cluster = False
in_cluster: bool = False
for cluster_id in list(clusters.keys()):
# For each cluster representative
representative_id = clusters[cluster_id][0]
rmsd = calcRMSD(backbone_atoms[representative_id], backbone_atoms[j]).round(
rmsd: float = calcRMSD(backbone_atoms[representative_id], backbone_atoms[j]).round(
4
)
log.info("RMSD between %d and %d is %5.3f" % (representative_id, j, rmsd))
Expand All @@ -80,7 +81,7 @@ def clusterize(sorted_ids, swarm_path):
return clusters


def write_cluster_info(clusters, gso_data, swarm_path):
def write_cluster_info(clusters: Dict[int, List[int]], gso_data: List[DockingResult], swarm_path: Path) -> None:
"""Writes the clustering result"""
file_name = swarm_path / CLUSTER_REPRESENTATIVES_FILE
with open(file_name, "w") as output:
Expand All @@ -99,7 +100,6 @@ def write_cluster_info(clusters, gso_data, swarm_path):


if __name__ == "__main__":

try:
# Parse command line
args = parse_command_line()
Expand Down
23 changes: 2 additions & 21 deletions bin/lgd_copy_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,16 @@
import shutil
import re
from lightdock.util.logger import LoggingManager
from lightdock.util.analysis import read_ranking_file
from lightdock.util.analysis import read_ranking_file, get_structures


clustered_folder = "clustered"

log = LoggingManager.get_logger("lgd_copy_structures")


def get_structures(ranking, base_path="."):
structures = []
for rank in ranking:
swarm_id = rank.id_swarm
glowworm_id = rank.id_glowworm
score = rank.scoring
structures.append(
[
os.path.join(
base_path,
"swarm_{}".format(swarm_id),
"lightdock_{}.pdb".format(glowworm_id),
),
score,
]
)
return structures


def parse_command_line():
def parse_command_line() -> argparse.Namespace:
"""Parses command line arguments"""
parser = argparse.ArgumentParser(prog="lgd_copy_structures")

Expand All @@ -46,7 +28,6 @@ def parse_command_line():


if __name__ == "__main__":

# Parse command line
args = parse_command_line()

Expand Down
Loading

0 comments on commit f3de1ea

Please sign in to comment.