In [1]:
%load_ext autoreload
%autoreload 2
import parse_uniprot_xml_taxonomy_with_error_handling
import polars as pl

## Parse UniProtKB/TREMBL (unreviewed, computationally-derived annotations)

### Test with a single batch to make sure the output is okay before running on everything

In [14]:
input_file = "/Users/olga/Downloads/uniprot_trembl.xml.gz"
output_file = "uniprot_trembl_taxonomy_summary_first_batch.parquet"
parse_uniprot_xml_taxonomy_with_error_handling.process_uniprot_file(
    input_file, output_file, n_batches=1, force_restart=True
)

pl.scan_parquet(output_file).head().collect()

2025-02-20 17:04:51,334 - INFO - Starting to process /Users/olga/Downloads/uniprot_trembl.xml.gz
2025-02-20 17:04:54,730 - INFO - Processing complete. Final results written to uniprot_trembl_taxonomy_summary_first_batch.parquet


organism,reviewed_count,unreviewed_count,pdb_structures_count,organism_id,lineage,type
str,u32,u32,u32,i64,str,str
"""Rotaria magnacalcarata""",0,981,0,392030,"""Bdelloidea; Eukaryota; Eurotat…","""Animal"""
"""Helianthus annuus""",0,344,0,4232,"""Asteraceae; Asterales; Asteroi…","""Plant"""
"""Setaria italica""",0,263,1,4555,"""Cenchrinae; Embryophyta; Eukar…","""Plant"""
"""Aphanomyces astaci""",0,245,0,112090,"""Aphanomyces; Eukaryota; Oomyco…","""other Eukaryota"""
"""Xenopus laevis""",0,228,1,8355,"""Amphibia; Anura; Batrachia; Ch…","""Animal"""


In [15]:
pl.scan_parquet(output_file).describe()

statistic,organism,reviewed_count,unreviewed_count,pdb_structures_count,organism_id,lineage,type
str,str,f64,f64,f64,f64,str,str
"""count""","""1055""",1055.0,1055.0,1055.0,1055.0,"""1055""","""1055"""
"""null_count""","""0""",0.0,0.0,0.0,0.0,"""0""","""0"""
"""mean""",,0.0,9.478673,0.004739,865397.759242,,
"""std""",,0.0,41.169471,0.081357,874180.522606,,
"""min""","""Abia inflata""",0.0,1.0,0.0,294.0,"""50 kb inversion clade; Embryop…","""Animal"""
"""25%""",,0.0,1.0,0.0,105302.0,,
"""50%""",,0.0,1.0,0.0,473390.0,,
"""75%""",,0.0,2.0,0.0,1608996.0,,
"""max""","""uncultured marine bacterium""",0.0,981.0,2.0,3151515.0,"""Viruses""","""other Eukaryota"""


## Run on all data

### Use custom code for processing

In [16]:
import parse_uniprot_xml_taxonomy_with_error_handling

input_file = "/Users/olga/Downloads/uniprot_trembl.xml.gz"
output_file = "uniprot_trembl_taxonomy_summary.parquet"
parse_uniprot_xml_taxonomy_with_error_handling.process_uniprot_file(
    input_file, output_file, force_restart=True
)

2025-02-20 17:04:54,761 - INFO - Starting to process /Users/olga/Downloads/uniprot_trembl.xml.gz
2025-02-20 17:05:10,745 - INFO - Processed 100,000 entries
2025-02-20 17:05:27,432 - INFO - Processed 200,000 entries
2025-02-20 17:05:43,893 - INFO - Processed 300,000 entries
2025-02-20 17:06:04,606 - INFO - Processed 400,000 entries
2025-02-20 17:06:20,291 - INFO - Processed 500,000 entries
2025-02-20 17:06:37,615 - INFO - Processed 600,000 entries
2025-02-20 17:06:49,529 - INFO - Processed 700,000 entries
2025-02-20 17:07:06,634 - INFO - Processed 800,000 entries
2025-02-20 17:07:16,206 - INFO - Processed 900,000 entries
2025-02-20 17:07:28,376 - INFO - Saved intermediate results for 1,000,000 entries to uniprot_trembl_taxonomy_summary_part_0.parquet
2025-02-20 17:07:28,377 - INFO - Processed 1,000,000 entries
2025-02-20 17:07:39,713 - INFO - Processed 1,100,000 entries
2025-02-20 17:07:52,660 - INFO - Processed 1,200,000 entries
2025-02-20 17:08:03,711 - INFO - Processed 1,300,000 entr

KeyboardInterrupt: 

### Should have processed 252,633,201 entries

Latest TREMBL release is https://www.uniprot.org/uniprotkb/statistics

- ~252m entries
- 1,318,803 species

In [17]:
pl.scan_parquet(output_file).describe()

statistic,organism,reviewed_count,unreviewed_count,pdb_structures_count,organism_id,lineage,type
str,str,f64,f64,f64,f64,str,str
"""count""","""1055""",1055.0,1055.0,1055.0,1055.0,"""1055""","""1055"""
"""null_count""","""0""",0.0,0.0,0.0,0.0,"""0""","""0"""
"""mean""",,0.0,9.478673,0.004739,865410.98673,,
"""std""",,0.0,41.169471,0.081357,874192.779307,,
"""min""","""Abia inflata""",0.0,1.0,0.0,294.0,"""50 kb inversion clade; Embryop…","""Animal"""
"""25%""",,0.0,1.0,0.0,105302.0,,
"""50%""",,0.0,1.0,0.0,473390.0,,
"""75%""",,0.0,2.0,0.0,1608996.0,,
"""max""","""uncultured marine bacterium""",0.0,981.0,2.0,3151515.0,"""Viruses""","""other Eukaryota"""


# Try parallel processing

In [3]:
import parse_uniprot_xml_taxonomy_with_error_handling_parallelized

input_file = "/Users/olga/Downloads/uniprot_trembl.xml.gz"
output_file = "uniprot_trembl_taxonomy_summary.parquet"
parse_uniprot_xml_taxonomy_with_error_handling_parallelized.parallel_process_uniprot(
    input_file, output_file, num_processes=12  # force_restart=True
)

KeyboardInterrupt: 

In [7]:
from concurrent.futures import ProcessPoolExecutor


# Add this to test multiprocessing
def test_parallel():
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(lambda x: x * x, range(10)))
    print(results)


test_parallel()

AttributeError: Can't get local object 'test_parallel.<locals>.<lambda>'

In [10]:
# Example usage
def worker(x):
    """Worker function that must be defined at module level."""
    import os
    import time

    pid = os.getpid()
    time.sleep(1)  # Simulate work
    return f"Process {pid} processed {x}"


def simple_parallel_test():
    """Simple test to verify multiprocessing is working."""
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(worker, range(8)))
    for result in results:
        print(result)


simple_parallel_test()

Process SpawnProcess-9:
Process SpawnProcess-10:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/olga/anaconda3/envs/2025-biodata-wealth-inequality/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/Users/olga/anaconda3/envs/2025-biodata-wealth-inequality/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/Users/olga/anaconda3/envs/2025-biodata-wealth-inequality/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/olga/anaconda3/envs/2025-biodata-wealth-inequality/lib/python3.13/concurrent/futures/process.py", line 242, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/olga/anaconda3/envs/2025-biodata-wealth-inequality/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

import polars as pl

pl.scan_parquet(output_file).describe()

In [12]:
%%file test_parallel.py

from concurrent.futures import ProcessPoolExecutor


# Example usage
def worker(x):
    """Worker function that must be defined at module level."""
    import os
    import time

    pid = os.getpid()
    time.sleep(1)  # Simulate work
    return f"Process {pid} processed {x}"


def simple_parallel_test():
    """Simple test to verify multiprocessing is working."""
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(worker, range(8)))
    for result in results:
        print(result)


if __name__ == "__main__":
    simple_parallel_test()

Overwriting test_parallel.py


In [13]:
! python test_parallel.py

Process 23118 processed 0
Process 23116 processed 1
Process 23117 processed 2
Process 23119 processed 3
Process 23119 processed 4
Process 23118 processed 5
Process 23116 processed 6
Process 23117 processed 7


In [3]:
! python parse_uniprot_xml_taxonomy_with_error_handling_parallelized.py \
    --num-processes 12 \
    "/Users/olga/Downloads/uniprot_trembl.xml.gz" \
    "uniprot_trembl_taxonomy_summary.parquet"

2025-02-21 10:24:13,389 - MainProcess - INFO - Starting processing with 12 processes
2025-02-21 10:24:13,389 - MainProcess - INFO - Decompressing /Users/olga/Downloads/uniprot_trembl.xml.gz to /Users/olga/Downloads/uniprot_trembl.xml
^C

Aborted!
