# ProtSpace: Interactive Protein Embedding Visualization

Interactive visualization of high-dimensional protein embeddings in 2D/3D space. Supports multiple dimensionality reduction methods (PCA, UMAP, t-SNE, PaCMAP) with annotation-based coloring and integrated structure viewing.

📚 [GitHub](https://github.com/tsenoner/protspace) • [Manuscript](https://www.sciencedirect.com/science/article/pii/S0022283625000063?via%3Dihub)


In [None]:
# @title Install Dependencies and Import Libraries (~1min)
%%capture
# !pip install -q h5py matplotlib scikit-learn umap-learn pacmap numpy pandas bioservices tqdm taxopy pyarrow
# !pip install -q -no-deps "git+https://github.com/tsenoner/protspace.git@8c3f3e9df07a98a2ee0a85d91d592f001feb9139#egg=protspace"
!pip install -q "git+https://github.com/tsenoner/protspace.git@8c3f3e9df07a98a2ee0a85d91d592f001feb9139#egg=protspace"

import gzip
import os
import requests
import subprocess
import sys
from google.colab import files
from ipywidgets import SelectMultiple, Tab, VBox, HBox, Text, Checkbox, Button, Output, HTML, IntSlider, FloatSlider
from IPython.display import display, clear_output
from pathlib import Path
from urllib.parse import urlparse

import h5py
import numpy as np
import pandas as pd

# 📊 Data Collection from UniProt

## Step 1: Search and Generate Embeddings

1. **Go to UniProt:** [https://www.uniprot.org/](https://www.uniprot.org/)

2. **Search for proteins** using UniProt syntax:

   - Example: `(ft_domain:phosphatase) AND (reviewed:true)`

3. **Generate embeddings:**
   - Click **"Customize"** → Select **"Embeddings"**
   - Submit the job (ProtT5 embeddings will be generated)
   - Download results from the [**Jobs Dashboard**](https://www.uniprot.org/tool-dashboard) when ready

**Alternative:** Generate embeddings from any FASTA file using this [Google Colab notebook](https://colab.research.google.com/github/tsenoner/protspace/blob/main/examples/notebook/ClickThrough_GenerateEmbeddings.ipynb)

## Step 2: Upload Your Data

Upload your embedding files (.h5, .hdf5, or .csv) using the file upload widget below:


In [None]:
# @title 📂 Upload Embedding File {display-mode: "form"}
# @markdown Upload your embedding file (.h5, .hdf5, .gz, or no extension).<br>
# @markdown Wait until you get feedback about the successful upload.

import ipywidgets as widgets
from IPython.display import display
import h5py
import gzip
from pathlib import Path

def process_and_validate(filename, data):
    """Process and validate embedding file."""
    filepath = Path(filename)

    # Handle compressed .gz files
    if filepath.suffix == '.gz':
        try:
            decompressed_data = gzip.decompress(data)
            output_name = filepath.stem + '.h5'
            with open(output_name, 'wb') as f:
                f.write(decompressed_data)
        except (gzip.BadGzipFile, OSError):
            print(f"❌ {filename} - Failed to decompress")
            return None

    # Handle files with no extension (already decompressed HDF5)
    elif filepath.suffix == '':
        output_name = filepath.name + '.h5'
        with open(output_name, 'wb') as f:
            f.write(data)

    # Handle uncompressed HDF5 files
    elif filepath.suffix in ['.h5', '.hdf5']:
        output_name = filename
        with open(filename, 'wb') as f:
            f.write(data)
    else:
        print(f"❌ {filename} - Unsupported format")
        return None

    # Validate HDF5 and check embeddings
    try:
        with h5py.File(output_name, 'r') as f:
            first_key = list(f.keys())[0]
            first_embedding = f[first_key][:]
            if len(first_embedding.shape) == 1 and first_embedding.shape[0] > 0:
                print(f"✅ {filename} - {first_key}: {first_embedding.shape[0]} dim")
                return output_name
            else:
                print(f"❌ {filename} - Invalid embedding format")
                return None
    except (OSError, IndexError):
        print(f"❌ {filename} - Invalid HDF5 or no embeddings")
        return None

def on_file_upload(change):
    """Handle file upload event."""
    global embedding_file

    if len(change['new']) == 0:
        return

    if len(change['new']) > 1:
        print(f"⚠️ Multiple files uploaded. Please upload only one embedding file.")
        embedding_file = None
        return

    # Process the single uploaded file
    # change['new'] is a dict with filename as key
    filename = list(change['new'].keys())[0]
    uploaded_file = change['new'][filename]
    data = uploaded_file['content']

    print(f"Processing file: {filename}")
    embedding_file = process_and_validate(filename, data)

    if embedding_file:
        print(f"\n🎉 Embedding file ready: {embedding_file}")
    else:
        print(f"\n❌ Failed to process embedding file")

# Initialize global variable
embedding_file = None

# Create FileUpload widget
upload_widget = widgets.FileUpload(
    accept='.h5,.hdf5,.gz',  # Accept specific file types
    multiple=False,  # Only allow single file upload
    description='Choose File',
    button_style='primary'
)

# Attach event handler
upload_widget.observe(on_file_upload, names='value')

# Display instructions and widget
print("📁 Upload one embedding file (.h5, .hdf5, .gz, or no extension)")
display(upload_widget)

In [None]:
# @title 🚀 Generate ProtSpace Parquet Bundle {display-mode: "form"}
# @markdown Configure and run protspace-local to generate visualization files

import subprocess
import os
from pathlib import Path
from ipywidgets import SelectMultiple, Tab, VBox, HBox, Text, Checkbox, Button, Output, HTML, IntSlider, FloatSlider
from IPython.display import display, clear_output

# Updated feature categories based on user specifications
FEATURES = {
    'UniProt': [
        'annotation_score',
        # 'cc_subcellular_location',  # Future version
        # 'ft_intramem',              # Future version
        # 'ft_signal',                # Future version
        # 'ft_topo_dom',              # Future version
        # 'ft_transmem',              # Future version
        'fragment',
        'length_fixed',
        'length_quantile',
        'protein_existence',
        'protein_families',
        'reviewed'
    ],
    'InterPro': [
        'cath',
        # 'pfam',           # Future version
        'superfamily',
        'signal_peptide'
    ],
    'Taxonomy': [
        # 'superkingdom',   # Future version
        'kingdom',
        'phylum',
        'class',
        'order',
        'family',
        'genus',
        'species'
    ]
}

METHODS = ['PCA', 'UMAP', 't-SNE', 'MDS', 'PaCMAP']

# Method parameters with defaults
METHOD_PARAMS = {
    'UMAP': {
        'n_neighbors': (5, 200, 30),  # (min, max, default)
        'min_dist': (0.0, 1.0, 0.5)
    },
    't-SNE': {
        'perplexity': (5, 50, 30),
        'learning_rate': (10, 1000, 200)
    },
    'PaCMAP': {
        'n_neighbors': (5, 200, 30),
        'mn_ratio': (0.1, 1.0, 0.5),
        'fp_ratio': (1.0, 5.0, 2.0)
    },
    'MDS': {
        'n_init': (1, 10, 4),
        'max_iter': (50, 1000, 300)
    }
}

class ProtSpaceConfigWidget:
    def __init__(self):
        self.setup_widgets()
        self.create_layout()

    def setup_widgets(self):
        # Feature selection tabs
        feature_widgets = {}
        for category, features in FEATURES.items():
            # Set default selections per category
            if category == 'UniProt':
                default_selection = ['reviewed', 'fragment', 'length_fixed', 'protein_families']
            elif category == 'InterPro':
                default_selection = []
            else:  # Taxonomy
                default_selection = ['phylum', 'class']

            feature_widgets[category] = SelectMultiple(
                options=features,
                value=default_selection,
                description='',
                style={'description_width': 'initial'},
                layout={'height': '120px', 'width': '300px'}
            )

        self.feature_tabs = Tab()
        self.feature_tabs.children = list(feature_widgets.values())
        for i, category in enumerate(FEATURES.keys()):
            self.feature_tabs.set_title(i, category)

        # Method selection
        self.methods = SelectMultiple(
            options=METHODS,
            value=['PCA', 'UMAP'],
            description='',
            style={'description_width': 'initial'},
            layout={'height': '120px', 'width': '200px'}
        )

        # Method parameter widgets
        self.param_widgets = {}
        for method, params in METHOD_PARAMS.items():
            method_widgets = {}
            for param_name, (min_val, max_val, default_val) in params.items():
                if isinstance(default_val, int):
                    widget = IntSlider(
                        value=default_val,
                        min=min_val,
                        max=max_val,
                        description=param_name.replace('_', ' ').title() + ':',
                        style={'description_width': '120px'},
                        layout={'width': '300px'}
                    )
                else:
                    widget = FloatSlider(
                        value=default_val,
                        min=min_val,
                        max=max_val,
                        step=0.1 if max_val <= 1.0 else 10,
                        description=param_name.replace('_', ' ').title() + ':',
                        style={'description_width': '120px'},
                        layout={'width': '300px'}
                    )
                method_widgets[param_name] = widget
            self.param_widgets[method] = method_widgets

        # Create parameter tabs
        param_tab_children = []
        param_tab_titles = []
        for method in METHOD_PARAMS.keys():
            widgets_list = list(self.param_widgets[method].values())
            param_tab_children.append(VBox(widgets_list))
            param_tab_titles.append(method)

        self.param_tabs = Tab()
        self.param_tabs.children = param_tab_children
        for i, title in enumerate(param_tab_titles):
            self.param_tabs.set_title(i, title)

        # Other options
        self.output_dir = Text(
            value='protspace_output',
            description='Output Dir:',
            style={'description_width': '80px'},
            layout={'width': '250px'}
        )

        self.keep_temp = Checkbox(
            value=False,
            description='Keep temp files'
        )

        self.legacy_json = Checkbox(
            value=False,
            description='Legacy JSON format'
        )

        # Action button and output
        self.run_button = Button(
            description='🚀 Generate Bundle',
            button_style='primary',
            layout={'width': '200px'}
        )
        self.run_button.on_click(self.run_protspace)

        self.output = Output()

    def create_layout(self):
        # Features section
        features_section = VBox([
            HTML('<h3>📋 Select Features</h3>'),
            HTML('<p><i>Hold Ctrl (Cmd on Mac) and click to select multiple features from each category</i></p>'),
            HTML('<p><small>Note: Some features are commented out and will be available in future versions</small></p>'),
            self.feature_tabs
        ])

        # Methods section
        methods_section = VBox([
            HTML('<h3>📊 Select 2D Methods</h3>'),
            HTML('<p><i>Hold Ctrl (Cmd on Mac) and click to select multiple methods</i></p>'),
            self.methods
        ])

        # Parameters section
        params_section = VBox([
            HTML('<h3>⚙️ Method Parameters</h3>'),
            HTML('<p><i>Fine-tune parameters for selected methods (PCA has no adjustable parameters)</i></p>'),
            self.param_tabs
        ])

        # Options section
        options_section = VBox([
            HTML('<h3>🔧 General Options</h3>'),
            self.output_dir,
            self.keep_temp,
            self.legacy_json
        ])

        # Main layout
        self.widget = VBox([
            features_section,
            methods_section,
            params_section,
            options_section,
            HBox([self.run_button]),
            self.output
        ])

    def get_selected_features(self):
        """Get all selected features from all tabs."""
        selected = []
        for widget in self.feature_tabs.children:
            selected.extend(widget.value)
        return selected

    def get_method_commands(self):
        """Build method commands with parameters."""
        selected_methods = list(self.methods.value)
        method_commands = []

        for method in selected_methods:
            if method == 'PCA':
                method_commands.append('pca2')
            else:
                # Convert display name to command format
                method_cmd = method.lower().replace('-', '') + '2'
                if method_cmd == 'tsne2':
                    method_cmd = 'tsne2'

                method_commands.append(method_cmd)

        return method_commands

    def run_protspace(self, button):
        """Generate protspace command and execute it."""
        with self.output:
            clear_output()

            # Check if embedding file exists
            if not globals().get('embedding_file') or not os.path.exists(globals().get('embedding_file', '')):
                print("❌ No valid embedding file found. Please upload an embedding file first.")
                return False

            # Get selections
            features = self.get_selected_features()
            method_commands = self.get_method_commands()

            if not method_commands:
                print("❌ Please select at least one method.")
                return False

            # Build base command
            cmd = [
                "protspace-local",
                "-i", globals().get('embedding_file'),
                "-o", self.output_dir.value,
                "-m", ','.join(method_commands)
            ]

            # Add features if specified
            if features:
                cmd.extend(["-f", ','.join(features)])

            # Add method parameters
            selected_methods = list(self.methods.value)
            for method in selected_methods:
                if method in METHOD_PARAMS:
                    for param_name, widget in self.param_widgets[method].items():
                        cmd.extend([f'--{param_name}', str(widget.value)])

            # Add optional flags
            if self.keep_temp.value:
                cmd.append("--keep-tmp")

            if self.legacy_json.value:
                cmd.append("--non-binary")

            # Display selections
            print("🔧 Configuration:")
            print(f"   Features: {', '.join(features) if features else 'None'}")
            print(f"   Methods: {', '.join(selected_methods)}")

            # Show parameters
            for method in selected_methods:
                if method in METHOD_PARAMS:
                    params_str = ', '.join([
                        f'{param}={widget.value}'
                        for param, widget in self.param_widgets[method].items()
                    ])
                    print(f"   {method} params: {params_str}")

            print(f"   Output: {self.output_dir.value}")
            print()

            print("🔧 Running command:")
            print(" ".join(cmd))
            print()

            # Execute command with Jupyter-native progress bars
            try:
                from ipywidgets import IntProgress, HTML as HTMLWidget, VBox as WidgetVBox
                from IPython.display import display as widget_display
                import re

                # Create progress tracking widgets
                progress_widgets = {}
                status_widget = HTMLWidget(value="<b>🚀 Starting ProtSpace processing...</b>")
                widget_display(status_widget)

                # Run the process
                process = subprocess.Popen(
                    cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    text=True,
                    bufsize=1,
                    universal_newlines=True
                )

                # Filter patterns for unwanted output
                skip_patterns = [
                    'Unable to register cu', 'WARNING: All log messages', 'E0000', 'W0000',
                    'This TensorFlow binary', 'Creating directory', 'Welcome to Bioservices',
                    'It looks like you do not have', 'We are creating one with default', 'Done',
                    'To enable the following instructions'
                ]

                current_tasks = set()

                # Stream output and update progress bars
                while True:
                    output = process.stdout.readline()
                    if output == '' and process.poll() is not None:
                        break
                    if output:
                        line = output.strip()

                        # Skip unwanted technical messages
                        if any(pattern in line for pattern in skip_patterns):
                            continue

                        # Parse progress bars
                        if 'Fetching' in line and '%|' in line:
                            # Extract task type
                            if 'UniProt features' in line:
                                task_key = 'uniprot'
                                task_name = '🔍 UniProt Features'
                            elif 'taxonomy features' in line:
                                task_key = 'taxonomy'
                                task_name = '🌿 Taxonomy Features'
                            elif 'InterPro features' in line:
                                task_key = 'interpro'
                                task_name = '🧬 InterPro Features'
                            else:
                                continue

                            # Create progress bar if not exists
                            if task_key not in progress_widgets:
                                progress_bar = IntProgress(
                                    value=0, min=0, max=100,
                                    description='',
                                    bar_style='info',
                                    style={'bar_color': '#17a2b8'},
                                    layout={'width': '400px'}
                                )
                                task_label = HTMLWidget(value=f"<b>{task_name}</b>")
                                progress_widgets[task_key] = {
                                    'bar': progress_bar,
                                    'label': task_label,
                                    'container': WidgetVBox([task_label, progress_bar])
                                }
                                widget_display(progress_widgets[task_key]['container'])
                                current_tasks.add(task_key)

                            # Extract and update progress
                            percent_match = re.search(r'(\d+)%', line)
                            if percent_match:
                                percent = int(percent_match.group(1))
                                progress_widgets[task_key]['bar'].value = percent

                                # Update color based on completion
                                if percent == 100:
                                    progress_widgets[task_key]['bar'].bar_style = 'success'
                                    progress_widgets[task_key]['label'].value = f"<b>{task_name} ✅</b>"

                # Wait for process to complete
                process.wait()

                if process.returncode == 0:
                    status_widget.value = "<b>✅ ProtSpace processing completed successfully!</b>"
                    print(f"📁 Output directory: {self.output_dir.value}")

                    # List generated files
                    output_path = Path(self.output_dir.value)
                    if output_path.exists():
                        files = list(output_path.glob("*"))
                        if files:
                            print(f"📋 Generated files:")
                            for file in sorted(files):
                                print(f"   {file.name}")

                    # Store output directory globally
                    globals()['protspace_output_dir'] = self.output_dir.value
                    print(f"📋 Available variable: protspace_output_dir = '{self.output_dir.value}'")

                    return True
                else:
                    status_widget.value = f"<b>❌ Process failed with return code: {process.returncode}</b>"
                    return False

            except ImportError:
                # Fallback to simple text output if ipywidgets not available
                print("📊 Processing (progress bars not available)...")
                result = subprocess.run(cmd, capture_output=True, text=True)
                if result.returncode == 0:
                    print("✅ ProtSpace processing completed successfully!")
                    globals()['protspace_output_dir'] = self.output_dir.value
                    return True
                else:
                    print(f"❌ Process failed: {result.stderr}")
                    return False

            except FileNotFoundError:
                print("❌ protspace-local command not found. Please install protspace first.")
                return False
            except Exception as e:
                print(f"❌ Unexpected error: {str(e)}")
                return False

# Create and display the widget
config_widget = ProtSpaceConfigWidget()
display(config_widget.widget)

In [None]:
# @title 📥 Download Parquet Bundle {display-mode: "form"}
# @markdown ### Download your generated parquet bundle file
# @markdown
# @markdown **Instructions:**
# @markdown 1. Click the "📥 Download Bundle" button below
# @markdown 2. Go to the ProtSpace Web Viewer: https://tsenoner.github.io/protspace_web/
# @markdown 3. Upload your downloaded `data.parquetbundle` file
# @markdown 4. Explore your protein embeddings with interactive visualizations
# @markdown
# @markdown **Note:** The web viewer works best with Chrome or Firefox browsers.

import os
from pathlib import Path
from google.colab import files
from ipywidgets import Button, Output, VBox

# Create download button
download_button = Button(
    description='📥 Download Bundle',
    button_style='success',
    layout={'width': '200px', 'height': '40px'},
    style={'font_weight': 'bold'}
)

# Create output area for feedback
output_area = Output()

def download_parquet_bundle(button):
    """Download the generated parquet bundle file."""
    with output_area:
        output_area.clear_output()

        # Check if output directory variable exists
        if 'protspace_output_dir' not in globals():
            print("❌ No output directory found. Please run the ProtSpace generation cell first.")
            return

        output_dir = globals()['protspace_output_dir']
        bundle_path = Path(output_dir) / "data.parquetbundle"

        # Check if the bundle file exists
        if not bundle_path.exists():
            print(f"❌ Parquet bundle not found at: {bundle_path}")
            return

        # Get file size and download
        file_size = bundle_path.stat().st_size
        size_mb = file_size / (1024 * 1024)

        print(f"📦 Downloading: {bundle_path.name} ({size_mb:.2f} MB)")

        try:
            files.download(str(bundle_path))
            print("✅ Download completed!")
        except Exception as e:
            print(f"❌ Download failed: {str(e)}")

# Connect button to download function
download_button.on_click(download_parquet_bundle)

# Display the widget
widget = VBox([download_button, output_area])
display(widget)