In [2]:
import requests
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class UniProtConfig:
    """Configuration for UniProt API client"""
    base_url: str = "https://rest.uniprot.org"
    polling_interval: int = 3
    max_retries: int = 5

class UniProtMappingError(Exception):
    """Custom exception for UniProt mapping errors"""
    pass

class UniProtMapper:
    """Client for UniProt ID mapping service"""
    
    def __init__(self, config: Optional[UniProtConfig] = None):
        """Initialize the UniProt mapping client
        
        Args:
            config: Optional configuration object
        """
        self.config = config or UniProtConfig()
        self.session = self._create_session()
    
    def _create_session(self) -> requests.Session:
        """Create requests session with retries"""
        session = requests.Session()
        retries = requests.adapters.Retry(
            total=self.config.max_retries,
            backoff_factor=0.25,
            status_forcelist=[500, 502, 503, 504]
        )
        session.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
        return session
    
    def _check_response(self, response: requests.Response) -> None:
        """Check response for errors"""
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise UniProtMappingError(f"Request failed: {response.text}")
    
    def submit_mapping_job(self, from_db: str, to_db: str, ids: List[str]) -> str:
        """Submit an ID mapping job
        
        Args:
            from_db: Source database (e.g., "UniProtKB_AC-ID")
            to_db: Target database (e.g., "Ensembl")
            ids: List of identifiers to map
            
        Returns:
            str: Job ID for checking results
        """
        request = self.session.post(
            f"{self.config.base_url}/idmapping/run",
            data={
                "from": from_db,
                "to": to_db,
                "ids": ",".join(ids)
            }
        )
        self._check_response(request)
        return request.json()["jobId"]
    
    def check_job_status(self, job_id: str) -> Dict[str, Any]:
        """Check if mapping job is complete
        
        Args:
            job_id: Job ID from submit_mapping_job
            
        Returns:
            Dict containing status information or results redirect
            
        Raises:
            UniProtMappingError: If job failed
        """
        request = self.session.get(
            f"{self.config.base_url}/idmapping/status/{job_id}",
            allow_redirects=False
        )
        
        # Handle 303 redirect which indicates results are ready
        if request.status_code == 303:
            redirect_url = request.headers.get("Location")
            if redirect_url:
                return {"redirect": redirect_url}
            
        self._check_response(request)
        return request.json()
    
    def get_mapping_results(self, results_url: str) -> Dict[str, Any]:
        """Get mapping results from URL
        
        Args:
            results_url: URL for retrieving results
            
        Returns:
            Dict containing mapping results with 'results' and 'failedIds' keys
        """
        request = self.session.get(results_url)
        self._check_response(request)
        return request.json()
    
    def map_ids(self, from_db: str, to_db: str, ids: List[str]) -> Dict[str, Any]:
        """Complete ID mapping workflow
        
        Args:
            from_db: Source database (e.g., "UniProtKB_AC-ID")
            to_db: Target database (e.g., "Ensembl")
            ids: List of identifiers to map
            
        Returns:
            Dict containing mapping results
        """
        # Submit job
        job_id = self.submit_mapping_job(from_db, to_db, ids)
        
        # Poll until complete
        while True:
            status = self.check_job_status(job_id)
            
            # If we get a redirect URL, results are ready
            if "redirect" in status:
                return self.get_mapping_results(status["redirect"])
                
            # If still running, wait and try again
            if "jobStatus" in status:
                if status["jobStatus"] in ("NEW", "RUNNING"):
                    time.sleep(self.config.polling_interval)
                    continue
                raise UniProtMappingError(f"Job failed: {status['jobStatus']}")
                
            # If we get results directly, return them
            if "results" in status or "failedIds" in status:
                return status
                
            raise UniProtMappingError("Unexpected response format")

# Example usage:
if __name__ == "__main__":
    # Initialize client
    mapper = UniProtMapper()
    
    # Example: Map UniProt accessions to Ensembl IDs
    try:
        results = mapper.map_ids(
            from_db="UniProtKB_AC-ID",
            to_db="Ensembl",
            ids=["P05067", "P12345"]
        )
        print("Successful mappings:")
        for mapping in results.get("results", []):
            print(f"From: {mapping['from']} -> To: {mapping['to']}")
        print("\nFailed IDs:")
        for failed_id in results.get("failedIds", []):
            print(failed_id)
    except UniProtMappingError as e:
        print(f"Mapping failed: {e}")

Successful mappings:
From: P05067 -> To: ENSG00000142192.22

Failed IDs:
P12345


In [3]:
import requests
import time
import json
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed

@dataclass
class UniProtConfig:
    """Configuration for UniProt API client"""
    base_url: str = "https://rest.uniprot.org"
    polling_interval: int = 3
    max_retries: int = 5
    max_workers: int = 3  # For parallel mapping requests

class UniProtMappingError(Exception):
    """Custom exception for UniProt mapping errors"""
    pass

class UniProtMapper:
    """Enhanced client for UniProt ID mapping service"""
    
    def __init__(self, config: Optional[UniProtConfig] = None):
        self.config = config or UniProtConfig()
        self.session = self._create_session()
        self._mapping_options = None
    
    def _create_session(self) -> requests.Session:
        """Create requests session with retries"""
        session = requests.Session()
        retries = requests.adapters.Retry(
            total=self.config.max_retries,
            backoff_factor=0.25,
            status_forcelist=[500, 502, 503, 504]
        )
        session.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
        return session

    def _check_response(self, response: requests.Response) -> None:
        """Check response for errors"""
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise UniProtMappingError(f"Request failed: {response.text}")

    def get_mapping_options(self) -> Dict[str, Set[str]]:
        """Get available mapping options for each source database
        
        Returns:
            Dict mapping source databases to sets of possible target databases
        """
        if self._mapping_options is not None:
            return self._mapping_options

        # Get available fields from UniProt API
        response = self.session.get(f"{self.config.base_url}/configure/idmapping/fields")
        self._check_response(response)
        data = response.json()

        # Process rules and build mapping options
        options = {}
        rules = {rule["ruleId"]: rule for rule in data.get("rules", [])}
        
        for group in data.get("groups", []):
            for item in group.get("items", []):
                if item.get("from"):  # If this can be a source database
                    rule_id = item.get("ruleId")
                    if rule_id and rule_id in rules:
                        options[item["name"]] = set(rules[rule_id]["tos"])

        self._mapping_options = options
        return options

    def submit_mapping_job(self, from_db: str, to_db: str, ids: List[str]) -> str:
        """Submit an ID mapping job"""
        request = self.session.post(
            f"{self.config.base_url}/idmapping/run",
            data={
                "from": from_db,
                "to": to_db,
                "ids": ",".join(ids)
            }
        )
        self._check_response(request)
        return request.json()["jobId"]

    def check_job_status(self, job_id: str) -> Dict[str, Any]:
        """Check if mapping job is complete"""
        request = self.session.get(
            f"{self.config.base_url}/idmapping/status/{job_id}",
            allow_redirects=False
        )
        
        if request.status_code == 303:
            redirect_url = request.headers.get("Location")
            if redirect_url:
                return {"redirect": redirect_url}
            
        self._check_response(request)
        return request.json()

    def get_mapping_results(self, results_url: str) -> Dict[str, Any]:
        """Get mapping results from URL"""
        request = self.session.get(results_url)
        self._check_response(request)
        return request.json()

    def map_ids(self, from_db: str, to_db: str, ids: List[str]) -> Dict[str, Any]:
        """Map IDs between databases"""
        job_id = self.submit_mapping_job(from_db, to_db, ids)
        
        while True:
            status = self.check_job_status(job_id)
            
            if "redirect" in status:
                return self.get_mapping_results(status["redirect"])
                
            if "jobStatus" in status:
                if status["jobStatus"] in ("NEW", "RUNNING"):
                    time.sleep(self.config.polling_interval)
                    continue
                raise UniProtMappingError(f"Job failed: {status['jobStatus']}")
                
            if "results" in status or "failedIds" in status:
                return status
                
            raise UniProtMappingError("Unexpected response format")

    def map_to_all(self, from_db: str, ids: List[str]) -> Dict[str, Dict[str, Any]]:
        """Map IDs to all possible target databases
        
        Args:
            from_db: Source database
            ids: List of identifiers to map
            
        Returns:
            Dict mapping target databases to their respective mapping results
        """
        options = self.get_mapping_options()
        if from_db not in options:
            raise UniProtMappingError(f"Invalid source database: {from_db}")

        results = {}
        
        # Use ThreadPoolExecutor for parallel mapping requests
        with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
            future_to_db = {
                executor.submit(self.map_ids, from_db, to_db, ids): to_db
                for to_db in options[from_db]
            }
            
            for future in as_completed(future_to_db):
                to_db = future_to_db[future]
                try:
                    results[to_db] = future.result()
                except Exception as e:
                    print(f"Warning: Mapping to {to_db} failed: {str(e)}")
                    continue
                    
        return results

    def print_mapping_summary(self, mapping_results: Dict[str, Dict[str, Any]], indent: int = 2) -> None:
        """Print a readable summary of mapping results
        
        Args:
            mapping_results: Results from map_to_all
            indent: Number of spaces to indent output
        """
        indent_str = " " * indent
        
        for to_db, result in mapping_results.items():
            print(f"\n{indent_str}=== Mappings to {to_db} ===")
            
            if "results" in result:
                print(f"{indent_str}Successful mappings:")
                for mapping in result["results"]:
                    from_id = mapping.get("from", "N/A")
                    to_id = mapping.get("to", "N/A")
                    if isinstance(to_id, dict):
                        to_id = to_id.get("id", "N/A")
                    print(f"{indent_str * 2}{from_id} -> {to_id}")
            
            if "failedIds" in result and result["failedIds"]:
                print(f"{indent_str}Failed IDs:")
                for failed_id in result["failedIds"]:
                    print(f"{indent_str * 2}{failed_id}")

# Example usage:
if __name__ == "__main__":
    # Initialize client
    mapper = UniProtMapper()
    
    try:
        # Get all available mappings for UniProt accessions
        print("\nAvailable mapping options:")
        options = mapper.get_mapping_options()
        for from_db, to_dbs in options.items():
            print(f"\n{from_db} can be mapped to:")
            for to_db in sorted(to_dbs):
                print(f"  - {to_db}")

        # Example: Map a UniProt ID to all possible target databases
        print("\nMapping example protein to all databases:")
        results = mapper.map_to_all(
            from_db="UniProtKB_AC-ID",
            ids=["P05067"]  # APP protein
        )
        
        # Print results summary
        mapper.print_mapping_summary(results)
        
    except UniProtMappingError as e:
        print(f"Mapping failed: {e}")


Available mapping options:

UniProtKB_AC-ID can be mapped to:
  - Allergome
  - ArachnoServer
  - Araport
  - BioCyc
  - BioGRID
  - BioMuta
  - CCDS
  - CGD
  - CPTAC
  - CRC64
  - ChEMBL
  - ChiTaRS
  - CollecTF
  - ComplexPortal
  - ConoServer
  - DIP
  - DMDM
  - DNASU
  - DisProt
  - DrugBank
  - EMBL-GenBank-DDBJ
  - EMBL-GenBank-DDBJ_CDS
  - ESTHER
  - EchoBASE
  - Ensembl
  - Ensembl_Genomes
  - Ensembl_Genomes_Protein
  - Ensembl_Genomes_Transcript
  - Ensembl_Protein
  - Ensembl_Transcript
  - FlyBase
  - GI_number
  - GeneCards
  - GeneID
  - GeneReviews
  - GeneTree
  - GeneWiki
  - Gene_Name
  - GenomeRNAi
  - GlyConnect
  - GuidetoPHARMACOLOGY
  - HGNC
  - HOGENOM
  - IDEAL
  - KEGG
  - LegioList
  - Leproma
  - MEROPS
  - MGI
  - MIM
  - MaizeGDB
  - OMA
  - OpenTargets
  - Orphanet
  - OrthoDB
  - PATRIC
  - PDB
  - PHI-base
  - PIR
  - PeroxiBase
  - PharmGKB
  - PlantReactome
  - PomBase
  - ProteomicsDB
  - PseudoCAP
  - REBASE
  - RGD
  - Reactome
  - RefSeq_Nucleo