# Data Conversion: MSP to JSONL for Mass Spectrometry Prediction

This notebook implements the conversion pipeline from raw MSP format data to JSONL format compatible with the mass spectrometry prediction pipeline.

## Overview:

Mass Spectrometry Prediction (MSP) files contain spectral data in a text-based format. Each entry includes:
- Metadata fields (Name, Comments, etc.)
- Peak data as m/z and intensity pairs

The conversion process extracts:
$$\text{SMILES} \rightarrow \text{Molecular Structure}$$
$$\text{Peaks} \rightarrow [(m/z_1, I_1), (m/z_2, I_2), ..., (m/z_n, I_n)]$$

Where:
- $m/z$ = mass-to-charge ratio
- $I$ = intensity value

**Key Features:**
- Robust SMILES extraction from various MSP formats
- Peak data validation and normalization
- Error tracking for corrupted records
- Compatibility with multiple MSP sources (GNPS, MoNA, etc.)

**Input**: `data/raw/{dataset_name}/*.msp`  
**Output**: `data/input/{dataset_name}/spectral_data.jsonl`  
**Format**: `{"smiles": "...", "peaks": [[mz, intensity], ...]}`

## 1. Environment Setup

Import required libraries and configure the conversion environment.

In [2]:
# Standard library imports
import json
import re
import os
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from datetime import datetime

# Data science imports
import numpy as np
from tqdm import tqdm

# Configuration
print("DATA CONVERSION: MSP TO JSONL")
print("=" * 80)
print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\nEnvironment setup complete")

DATA CONVERSION: MSP TO JSONL
Timestamp: 2025-07-17 17:06:48

Environment setup complete


## 2. Configuration and Path Setup

Define dataset configuration and directory structure.

### Directory Structure:
```
data/
├── raw/
│   └── {dataset_name}/
│       └── *.msp
└── input/
    └── {dataset_name}/
        ├── spectral_data.jsonl
        └── corrupted_records.jsonl
```

In [3]:
# Conversion configuration
CONVERSION_CONFIG = {
    'dataset': {
        'name': 'GNPS',  # Change to any dataset name in data/raw/
        'msp_filename': None,  # Set to specific filename, or None to auto-detect
    },
    'paths': {
        'base_dir': Path('../'),
        'raw_data_dir': lambda base: base / 'data' / 'raw',
        'input_data_dir': lambda base: base / 'data' / 'input',
    },
    'validation': {
        'min_peaks': 1,
        'max_mz': 5000,
        'max_intensity': 1e10,
        'min_smiles_length': 2,
    },
    'processing': {
        'verbose': True,
        'save_corrupted': True,
    }
}

# Set up paths
BASE_DIR = CONVERSION_CONFIG['paths']['base_dir']
RAW_DATA_DIR = CONVERSION_CONFIG['paths']['raw_data_dir'](BASE_DIR)
INPUT_DATA_DIR = CONVERSION_CONFIG['paths']['input_data_dir'](BASE_DIR)

DATASET_NAME = CONVERSION_CONFIG['dataset']['name']
DATASET_RAW_DIR = RAW_DATA_DIR / DATASET_NAME
DATASET_INPUT_DIR = INPUT_DATA_DIR / DATASET_NAME

# Create output directory
DATASET_INPUT_DIR.mkdir(parents=True, exist_ok=True)

print("\nCONFIGURATION")
print("-" * 40)
print(f"Dataset: {DATASET_NAME}")
print(f"Raw data directory: {DATASET_RAW_DIR}")
print(f"Output directory: {DATASET_INPUT_DIR}")
print(f"\nValidation parameters:")
print(f"  - Minimum peaks: {CONVERSION_CONFIG['validation']['min_peaks']}")
print(f"  - Maximum m/z: {CONVERSION_CONFIG['validation']['max_mz']}")
print(f"  - Maximum intensity: {CONVERSION_CONFIG['validation']['max_intensity']:.0e}")


CONFIGURATION
----------------------------------------
Dataset: GNPS
Raw data directory: ../data/raw/GNPS
Output directory: ../data/input/GNPS

Validation parameters:
  - Minimum peaks: 1
  - Maximum m/z: 5000
  - Maximum intensity: 1e+10


## 3. MSP File Detection

Auto-detect MSP files in the dataset directory.

In [4]:
def find_msp_file(dataset_dir: Path, filename: Optional[str] = None) -> Path:
    """Find MSP file in dataset directory.
    
    Args:
        dataset_dir: Directory containing MSP files
        filename: Specific filename to use, or None for auto-detection
        
    Returns:
        Path to MSP file
        
    Raises:
        FileNotFoundError: If no MSP file is found
    """
    if filename:
        msp_path = dataset_dir / filename
        if msp_path.exists():
            return msp_path
        else:
            raise FileNotFoundError(f"Specified MSP file not found: {msp_path}")
    
    # Auto-detect MSP files
    msp_files = list(dataset_dir.glob('*.msp'))
    
    if not msp_files:
        raise FileNotFoundError(f"No MSP files found in {dataset_dir}")
    
    if len(msp_files) == 1:
        return msp_files[0]
    
    # Multiple MSP files found
    print(f"\nMultiple MSP files found in {dataset_dir}:")
    for i, f in enumerate(msp_files):
        print(f"  {i+1}. {f.name}")
    
    # Use the first one or the one matching dataset name
    for f in msp_files:
        if DATASET_NAME.lower() in f.name.lower():
            print(f"\nAuto-selected: {f.name}")
            return f
    
    print(f"\nUsing first file: {msp_files[0].name}")
    return msp_files[0]

# Find the MSP file to process
msp_file_path = find_msp_file(DATASET_RAW_DIR, CONVERSION_CONFIG['dataset']['msp_filename'])
print(f"\nProcessing MSP file: {msp_file_path}")


Processing MSP file: ../data/raw/GNPS/GNPS.msp


## 4. MSP Parsing Functions

Functions for parsing MSP format and extracting spectral data.

### MSP Format Structure:
```
Name: Compound Name
Comments: "SMILES=C1CC1" "InChI=..." ...
Num Peaks: N
mz1 intensity1
mz2 intensity2
...
```

### Parsing Strategy:
1. Split entries by double newlines
2. Extract metadata from key-value pairs
3. Parse peak data after "Num Peaks:" line
4. Extract SMILES from Comments field using regex patterns

In [5]:
def parse_msp_file(file_path: str) -> List[Dict]:
    """Parse MSP file and extract entries as dictionaries.
    
    Args:
        file_path: Path to MSP file
        
    Returns:
        List of parsed entries
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    
    raw_entries = content.split('\n\n')
    entries = []
    
    print(f"\nParsing {len(raw_entries)} potential entries...")
    
    for raw_entry in tqdm(raw_entries, desc="Parsing MSP entries"):
        if not raw_entry.strip():
            continue
        entry = parse_single_msp_entry(raw_entry)
        if entry:
            entries.append(entry)
    
    return entries

def parse_single_msp_entry(raw_entry: str) -> Optional[Dict]:
    """Parse a single MSP entry.
    
    Args:
        raw_entry: Raw text of single MSP entry
        
    Returns:
        Parsed entry dictionary or None if parsing fails
    """
    lines = raw_entry.strip().split('\n')
    entry = {}
    peak_section = False
    peaks = []
    
    for line in lines:
        line = line.strip()
        if not line:
            continue
            
        if line.startswith('Num Peaks:'):
            try:
                entry['num_peaks'] = int(line.split(':', 1)[1].strip())
                peak_section = True
                continue
            except ValueError:
                continue
        
        if peak_section:
            try:
                parts = line.split()
                if len(parts) >= 2:
                    mz = float(parts[0])
                    intensity = float(parts[1])
                    peaks.append([mz, intensity])
            except ValueError:
                continue
        else:
            if ':' in line:
                key, value = line.split(':', 1)
                key = key.strip().lower().replace(' ', '_')
                value = value.strip()
                
                if key == 'comments':
                    entry['comments'] = value
                elif key == 'name':
                    entry['name'] = value
    
    entry['peaks'] = peaks
    return entry if peaks else None

## 5. SMILES Extraction and Validation

Functions for extracting and validating SMILES strings from MSP comments.

### SMILES Extraction Strategy:

Multiple regex patterns are used to handle various MSP formats:
- GNPS format: `"SMILES=..."`
- MoNA format: `"computed SMILES=..."`
- Variations in capitalization and spacing

### Validation Criteria:

Valid SMILES strings must:
1. Have length ≥ 2 characters
2. Contain only valid SMILES characters
3. Not be placeholder values (N/A, NA)

In [6]:
def extract_smiles_from_comments(comments: str) -> Optional[str]:
    """Extract SMILES string from MSP Comments field.
    
    Args:
        comments: Comments field from MSP entry
        
    Returns:
        Extracted SMILES string or None
    """
    if not comments:
        return None
    
    # Common patterns across different MSP sources
    patterns = [
        r'"SMILES=([^"]+)"',
        r'"computed SMILES=([^"]+)"',
        r'"SMILES\s*=\s*([^"]+)"',
        r'"computed\s+SMILES\s*=\s*([^"]+)"',
        r'SMILES=([^\s;]+)',  # Without quotes
        r'Smiles=([^\s;]+)',  # Case variation
        r'"Smiles=([^"]+)"'  # Case variation with quotes
    ]
    
    for pattern in patterns:
        match = re.search(pattern, comments, re.IGNORECASE)
        if match:
            smiles = match.group(1).strip()
            if smiles and smiles != 'N/A' and smiles != 'NA':
                return smiles
    
    return None

def validate_smiles_basic(smiles: str) -> bool:
    """Basic SMILES validation.
    
    Args:
        smiles: SMILES string to validate
        
    Returns:
        True if SMILES appears valid
    """
    if not smiles or len(smiles) < CONVERSION_CONFIG['validation']['min_smiles_length']:
        return False
    
    # Extended character set for broader SMILES compatibility
    valid_chars = set('CNOSFPB[]()=#+\\/-@123456789.%cnosfpb')
    return all(c in valid_chars or c.isupper() or c.islower() for c in smiles)

def validate_peaks(peaks: List[List[float]]) -> bool:
    """Validate peak data.
    
    Args:
        peaks: List of [m/z, intensity] pairs
        
    Returns:
        True if peaks are valid
    """
    if not peaks or len(peaks) < CONVERSION_CONFIG['validation']['min_peaks']:
        return False
    
    for peak in peaks:
        if len(peak) != 2:
            return False
        mz, intensity = peak
        # Validation ranges from config
        if not (0 <= mz <= CONVERSION_CONFIG['validation']['max_mz']) or \
           not (0 <= intensity <= CONVERSION_CONFIG['validation']['max_intensity']):
            return False
    
    return True

## 6. Data Conversion Pipeline

Convert parsed MSP entries to JSONL format with error tracking.

### Conversion Process:

For each MSP entry:
1. Extract SMILES from comments
2. Validate SMILES structure
3. Validate peak data
4. Convert to JSONL format
5. Track corrupted records

### Error Tracking:

Corrupted records are saved with:
- Entry index and name
- Error description
- Relevant context for debugging

In [7]:
def convert_msp_to_jsonl(entries: List[Dict]) -> Tuple[List[Dict], Dict]:
    """Convert parsed MSP entries to JSONL format.
    
    Args:
        entries: List of parsed MSP entries
        
    Returns:
        Tuple of (jsonl_entries, statistics)
    """
    jsonl_entries = []
    stats = {
        'total_entries': len(entries),
        'successful': 0,
        'corrupted_records': []
    }
    
    print(f"\nConverting {len(entries)} entries to JSONL format...")
    
    for i, entry in enumerate(tqdm(entries, desc="Converting to JSONL")):
        try:
            # Extract and validate SMILES
            smiles = extract_smiles_from_comments(entry.get('comments', ''))
            if not smiles or not validate_smiles_basic(smiles):
                stats['corrupted_records'].append({
                    'index': i,
                    'name': entry.get('name', 'Unknown'),
                    'error': 'Invalid or missing SMILES',
                    'comments': entry.get('comments', '')[:200]  # First 200 chars for debugging
                })
                continue
            
            # Validate peaks
            peaks = entry.get('peaks', [])
            if not validate_peaks(peaks):
                stats['corrupted_records'].append({
                    'index': i,
                    'name': entry.get('name', 'Unknown'),
                    'error': 'Invalid peaks',
                    'num_peaks': len(peaks)
                })
                continue
            
            # Create JSONL entry
            jsonl_entries.append({
                'smiles': smiles,
                'peaks': peaks
            })
            stats['successful'] += 1
            
        except Exception as e:
            stats['corrupted_records'].append({
                'index': i,
                'name': entry.get('name', 'Unknown'),
                'error': f'Conversion error: {str(e)}'
            })
    
    return jsonl_entries, stats

def save_jsonl(data: List[Dict], output_path: str) -> None:
    """Save data to JSONL format.
    
    Args:
        data: List of dictionaries to save
        output_path: Output file path
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(json.dumps(item) + '\n')

def save_corrupted_records(corrupted_records: List[Dict], output_path: str) -> None:
    """Save corrupted records for analysis.
    
    Args:
        corrupted_records: List of corrupted record information
        output_path: Output file path
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        for record in corrupted_records:
            f.write(json.dumps(record) + '\n')

## 7. Execute Conversion Pipeline

Run the complete conversion pipeline from MSP to JSONL.

In [8]:
print("\n" + "=" * 80)
print("EXECUTING CONVERSION PIPELINE")
print("=" * 80)

# Parse MSP file
entries = parse_msp_file(str(msp_file_path))
print(f"\nParsed {len(entries)} entries from {msp_file_path.name}")

# Convert to JSONL format
jsonl_data, conversion_stats = convert_msp_to_jsonl(entries)

print("\nCONVERSION RESULTS")
print("-" * 40)
print(f"Successfully converted: {conversion_stats['successful']} entries")
print(f"Failed conversions: {len(conversion_stats['corrupted_records'])} entries")

# Calculate success rate
success_rate = (conversion_stats['successful'] / conversion_stats['total_entries'] * 100 
                if conversion_stats['total_entries'] > 0 else 0)
print(f"Success rate: {success_rate:.1f}%")

# Save results
output_file = DATASET_INPUT_DIR / 'spectral_data.jsonl'
corrupted_file = DATASET_INPUT_DIR / 'corrupted_records.jsonl'

save_jsonl(jsonl_data, str(output_file))
print(f"\nOutput saved to: {output_file}")

if conversion_stats['corrupted_records'] and CONVERSION_CONFIG['processing']['save_corrupted']:
    save_corrupted_records(conversion_stats['corrupted_records'], str(corrupted_file))
    print(f"Corrupted records saved to: {corrupted_file}")


EXECUTING CONVERSION PIPELINE

Parsing 23802 potential entries...


Parsing MSP entries: 100%|██████████| 23802/23802 [00:11<00:00, 2101.70it/s] 



Parsed 23801 entries from GNPS.msp

Converting 23801 entries to JSONL format...


Converting to JSONL: 100%|██████████| 23801/23801 [00:01<00:00, 12739.09it/s]



CONVERSION RESULTS
----------------------------------------
Successfully converted: 23630 entries
Failed conversions: 171 entries
Success rate: 99.3%

Output saved to: ../data/input/GNPS/spectral_data.jsonl
Corrupted records saved to: ../data/input/GNPS/corrupted_records.jsonl


## 8. Data Quality Analysis

Analyze the converted data to understand its characteristics.

### Quality Metrics:

- **SMILES Length Distribution**: Indicates molecular complexity
- **Peak Count Distribution**: Shows spectrum richness
- **m/z Range**: Mass range coverage
- **Intensity Range**: Dynamic range of measurements

In [9]:
if jsonl_data:
    print("\nDATA QUALITY ANALYSIS")
    print("=" * 80)
    print(f"Dataset: {DATASET_NAME}")
    print(f"Total records: {len(jsonl_data):,}")
    
    # SMILES analysis
    smiles_lengths = [len(entry['smiles']) for entry in jsonl_data]
    print(f"\nSMILES Statistics:")
    print(f"  - Mean length: {np.mean(smiles_lengths):.1f}")
    print(f"  - Min length: {min(smiles_lengths)}")
    print(f"  - Max length: {max(smiles_lengths)}")
    print(f"  - Std deviation: {np.std(smiles_lengths):.1f}")
    
    # Peak statistics
    peak_counts = [len(entry['peaks']) for entry in jsonl_data]
    all_mz_values = [peak[0] for entry in jsonl_data for peak in entry['peaks']]
    all_intensities = [peak[1] for entry in jsonl_data for peak in entry['peaks']]
    
    print(f"\nPeak Statistics:")
    print(f"  - Mean peaks per spectrum: {np.mean(peak_counts):.1f}")
    print(f"  - Min peaks: {min(peak_counts)}")
    print(f"  - Max peaks: {max(peak_counts)}")
    print(f"  - Std deviation: {np.std(peak_counts):.1f}")
    
    print(f"\nm/z Statistics:")
    print(f"  - Range: {min(all_mz_values):.1f} - {max(all_mz_values):.1f}")
    print(f"  - Mean: {np.mean(all_mz_values):.1f}")
    print(f"  - Median: {np.median(all_mz_values):.1f}")
    
    print(f"\nIntensity Statistics:")
    print(f"  - Range: {min(all_intensities):.2e} - {max(all_intensities):.2e}")
    print(f"  - Mean: {np.mean(all_intensities):.2e}")
    print(f"  - Median: {np.median(all_intensities):.2e}")
    
    # Sample entry
    print(f"\nSAMPLE ENTRY")
    print("-" * 40)
    sample = jsonl_data[0]
    print(f"SMILES: {sample['smiles'][:50]}{'...' if len(sample['smiles']) > 50 else ''}")
    print(f"Number of peaks: {len(sample['peaks'])}")
    print(f"First 3 peaks: {sample['peaks'][:3]}")
else:
    print("\nNo data converted successfully")


DATA QUALITY ANALYSIS
Dataset: GNPS
Total records: 23,630

SMILES Statistics:
  - Mean length: 71.8
  - Min length: 3
  - Max length: 642
  - Std deviation: 43.2

Peak Statistics:
  - Mean peaks per spectrum: 878.7
  - Min peaks: 1
  - Max peaks: 361421
  - Std deviation: 5281.7

m/z Statistics:
  - Range: 20.4 - 4971.1
  - Mean: 453.3
  - Median: 263.2

Intensity Statistics:
  - Range: 0.00e+00 - 1.04e+09
  - Mean: 7.61e+02
  - Median: 1.00e-01

SAMPLE ENTRY
----------------------------------------
SMILES: CC(N(O)CCCCCNC(CCC(N(O)CCCCCNC(CCC(N(O)CCCCN)=O)=O...
Number of peaks: 1415
First 3 peaks: [[97.798515, 0.005925], [97.833961, 0.012199], [99.689133, 0.011502]]


## 9. Error Analysis

Analyze conversion failures to identify potential issues.

### Common Error Types:

1. **Invalid or missing SMILES**: Comments field doesn't contain valid SMILES
2. **Invalid peaks**: Peak data doesn't meet validation criteria
3. **Conversion errors**: Unexpected errors during processing

In [10]:
if conversion_stats['corrupted_records']:
    print("\nERROR ANALYSIS")
    print("=" * 80)
    print(f"Total failed conversions: {len(conversion_stats['corrupted_records'])}")
    
    # Analyze error types
    error_types = {}
    for record in conversion_stats['corrupted_records']:
        error = record['error'].split(':')[0]
        error_types[error] = error_types.get(error, 0) + 1
    
    print("\nError Type Breakdown:")
    print("-" * 40)
    for error_type, count in sorted(error_types.items(), key=lambda x: x[1], reverse=True):
        percentage = count / len(conversion_stats['corrupted_records']) * 100
        print(f"  {error_type}: {count} ({percentage:.1f}%)")
    
    # Show sample errors for debugging
    print("\nSample Failed Conversions (First 5):")
    print("-" * 40)
    for i, record in enumerate(conversion_stats['corrupted_records'][:5]):
        print(f"\n{i+1}. Entry {record['index']}:")
        print(f"   Name: {record['name']}")
        print(f"   Error: {record['error']}")
        if 'comments' in record:
            print(f"   Comments excerpt: {record['comments'][:100]}...")
        if 'num_peaks' in record:
            print(f"   Number of peaks: {record['num_peaks']}")
else:
    print("\nNo conversion errors encountered!")


ERROR ANALYSIS
Total failed conversions: 171

Error Type Breakdown:
----------------------------------------
  Invalid or missing SMILES: 171 (100.0%)

Sample Failed Conversions (First 5):
----------------------------------------

1. Entry 394:
   Name: Methylmycofactocinol-9
   Error: Invalid or missing SMILES
   Comments excerpt: "SMILES=OC1=C(O)C(C)(C)C(N1)CC(C=C2)=CC=C2OC(C3O)OC(COC([C@@H]4O)OC(COC(C5O)OC(COC(C6O)OC(COC(C7O)OC...

2. Entry 395:
   Name: GlycylAHDP-8
   Error: Invalid or missing SMILES
   Comments excerpt: "SMILES=OC(C(COC([C@@H]1O)OC(COC(C2O)OC(COC(C3O)OC(COC(C4O)OC(COC(C5O)OC(COC(C6O)OC(COC(C7O)OC(CO)[C...

3. Entry 396:
   Name: Methylmycofactocinol-8
   Error: Invalid or missing SMILES
   Comments excerpt: "SMILES=OC1=C(O)C(C)(C)C(N1)CC(C=C2)=CC=C2OC(C3O)OC(COC([C@@H]4O)OC(COC(C5O)OC(COC(C6O)OC(COC(C7O)OC...

4. Entry 397:
   Name: Methylmycofactocinone-8
   Error: Invalid or missing SMILES
   Comments excerpt: "SMILES=O=C(C1=O)NC(C1(C)C)CC(C=C2)=CC=C2OC(C3O)OC

## 10. Format Validation

Validate that the output format is compatible with the downstream pipeline.

### Pipeline Requirements:

The featurization pipeline expects:
- JSONL format with one record per line
- Each record must have 'smiles' and 'peaks' fields
- 'smiles': string containing valid SMILES notation
- 'peaks': list of [m/z, intensity] pairs

In [None]:
print("\nFORMAT VALIDATION")
print("=" * 80)

# Validate format compatibility with existing pipeline
if jsonl_data:
    # Check sample entry
    sample = jsonl_data[0]
    required_keys = {'smiles', 'peaks'}
    has_required_keys = required_keys.issubset(sample.keys())
    
    # Validate data types
    valid_types = (
        isinstance(sample['smiles'], str) and
        isinstance(sample['peaks'], list) and
        all(isinstance(p, list) and len(p) == 2 for p in sample['peaks'][:5]) and
        all(isinstance(p[0], (int, float)) and isinstance(p[1], (int, float)) 
            for p in sample['peaks'][:5])
    )
    
    print("Validation Results:")
    print(f"  - Required keys present: {'✓' if has_required_keys else '✗'}")
    print(f"  - Data types valid: {'✓' if valid_types else '✗'}")
    print(f"  - Format compatible: {'✓' if has_required_keys and valid_types else '✗'}")
    
    if has_required_keys and valid_types:
        print(f"\n{'='*60}")
        print("✓ CONVERSION SUCCESSFUL")
        print(f"{'='*60}")
        print(f"\nReady for pipeline processing!")
        print(f"Next step: Set dataset_type = '{DATASET_NAME}' in the featurization notebook")
        print(f"Output file: {output_file}")
    else:
        print("\n⚠ WARNING: Format validation failed!")
        print("Please check the conversion process.")
else:
    print("No data to validate")


FORMAT VALIDATION
Validation Results:
  - Required keys present: ✓
  - Data types valid: ✓
  - Format compatible: ✓

✓ CONVERSION SUCCESSFUL

Ready for pipeline processing!
Next step: Set dataset_type = 'GNPS' in the featurization notebook
Output file: ../data/input/GNPS/spectral_data.jsonl


: 