In [44]:
%pip install -Uqqq pip --progress-bar off
%pip install -qqq ollama --progress-bar off
%pip install -qqq pathlib --progress-bar off
%pip install -qqq pandas --progress-bar off
%pip install -qqq pdfplumber --progress-bar off
%pip install -qqq owlready2 --progress-bar off
%pip install -qqq rdflib --progress-bar off
%pip install -qqq langchain-ollama --progress-bar off
%pip install -qqq langchain-community --progress-bar off
%pip install -qqq langchain_community --progress-bar off
%pip install -qqq langchain-chroma --progress-bar off
%pip install -qqq pdfplumber --progress-bar off
%pip install -qqq rank_bm25  --progress-bar off

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [45]:
import json
import re
import pdfplumber
from typing import List, Dict
from ollama import Client
from collections import defaultdict


In [46]:
# ---------------------------
# 1. PDF Processing Layer
# ---------------------------

def extract_text_with_tables(pdf_path: str) -> List[Dict]:
    """Enhanced PDF extraction preserving tables and structure"""
    text_chunks = []
    current_section = ""
    current_section_header = "Document Header"  # Initialize with default
    TABLE_START = "<<<TABLE>>>"
    TABLE_END = "<</TABLE>>>"
    
    with pdfplumber.open(pdf_path) as pdf:
        for page_number, page in enumerate(pdf.pages, 1):
            text = page.extract_text() or ""
            tables = page.extract_tables()
            
            # Process tables
            table_text = ""
            for table in tables:
                for row in table:
                    table_text += "| " + " | ".join(str(cell) for cell in row) + " |\n"
                table_text += "\n"
            
            # Detect section headers
            section_match = re.search(r'\n(\d+\.\s+[A-Z][a-z]+(?: [A-Z][a-z]+)*)\n', text)
            if section_match:
                if current_section:
                    text_chunks.append({
                        "content": current_section.strip(),
                        "metadata": {
                            "page": page_number,
                            "section": current_section_header
                        }
                    })
                current_section_header = section_match.group(1)
                current_section = ""
            
            # Build content
            page_content = f"{text}\n{TABLE_START}\n{table_text}{TABLE_END}"
            current_section += f"\n{page_content}"
            
        # Add remaining content
        if current_section:
            text_chunks.append({
                "content": current_section.strip(),
                "metadata": {
                    "page": page_number,
                    "section": current_section_header
                }
            })
            
    return text_chunks

In [55]:
# ---------------------------
# 2. Chunk Processing Engine
# ---------------------------

class LidarProcessor:
    def __init__(self, model_name="deepseek-r1:8b-llama-distill-q8_0"):
        self.client = Client(host='http://localhost:11434')
        self.model = model_name
        self.extraction_prompt = """As a LiDAR sensor expert, analyze this technical text to extract:

**Target Entities**:
1. Sensor Models: Manufacturer-branded names (Velodyne HDL-64E, Livox Horizon)
2. Components: Physical/software parts (e.g., MEMS mirror, FPGA processor)
3. Technical Specs: Quantified values with units (120m range, 0.08° resolution)
4. Implementations: Standards/protocols (IEEE 802.11p, ROS2)

**Extraction Rules**:
- Preserve contextual relationships: 
  "The Velodyne VLP-32C's rotating assembly enables 360° coverage" → 
  {{"parts": ["rotating assembly"], "properties": ["field of view: 360°"]}}
- Capture implied properties from comparisons:
  "Outperforms Ouster OS2 in range" → 
  {{"properties": ["comparative_range: > Ouster OS2"]}}
- Retain partial information with "[INFERRED]" tags


**Enhanced Format**:
```json
{{
  "sensors": [
    {{
      "name": "Livox Horizon",
      "category": "Automotive Lidar",
      "parts": ["MEMS mirror", "905nm laser array"],
      "properties": [
        "horizontal_fov: 81.7°", 
        "range: 260m @ 10% reflectivity",
        "scan_pattern: non-repetitive"
      ],
      "implements": ["ROS2", "AutoSAR"]
    }}
  ]
}}
```
**Critical Instructions**:
1. If no sensors are found, return: {{"sensors": []}}
2. Always maintain valid JSON structure
3. Never add extra text outside the JSON
4. Use exact values from tables when available

<example>
Input: "The Velodyne VLP-32C has a 200m range and 0.2° resolution"
Output:
{{
  "sensors": [
    {{
      "name": "Velodyne VLP-32C",
      "category": "Automotive LiDAR",
      "parts": [],
      "properties": ["range: 200m", "resolution: 0.2°"],
      "implements": []
    }}
  ]
}}
</example>


<text>
{text}
<text>
"""

    def process_chunk(self, text: str) -> List[Dict]:
        """Execute LLM extraction with error handling"""
        try:
            response = self.client.generate(
                model=self.model,
                prompt=f"{self.extraction_prompt}{text}",
                format="json",
                options={
                    "temperature": 0,
                    "top_p": 0.5,
                    "num_ctx": 4096,
                    "num_predict": 1024 
                }
            )
            # Handle empty responses
            raw_response = response.get('response', '{}').strip()
            if not raw_response or raw_response.count('{') == 0:
                print(f"Empty response for chunk: {text[:50]}...")
                return []
            # Add JSON validation
            try:
                parsed = json.loads(raw_response)
                if "sensors" not in parsed:
                    print(f"Missing 'sensors' key in response: {raw_response}")
                    return []
                    
                if not isinstance(parsed["sensors"], list):
                    print(f"Invalid sensors format: {type(parsed['sensors'])}")
                    return []
                    
                return parsed["sensors"]
                
            except json.JSONDecodeError as e:
                print(f"Failed to parse JSON: {e}\nRaw response: {raw_response}")
                return []
        except Exception as e:
          print(f"Unexpected error: {str(e)}")
          return []


In [56]:
# ---------------------------
# 3. Knowledge Fusion System
# ---------------------------

class KnowledgeMerger:
    def __init__(self):
        self.sensor_map = defaultdict(lambda: {
            "parts": set(),
            "properties": set(),
            "implements": set(),
            "categories": set()
        })

    def add_sensor(self, sensor: Dict):
        key = sensor['name'].lower().strip()
        entry = self.sensor_map[key]
        
        # Merge properties
        entry['name'] = sensor['name']
        entry['parts'].update(sensor.get('parts', []))
        entry['properties'].update(sensor.get('properties', []))
        entry['implements'].update(sensor.get('implements', []))
        entry['categories'].add(sensor.get('category', 'Uncategorized'))

    def finalize(self) -> List[Dict]:
        """Convert merged data to final format"""
        return [{
            "name": v['name'],
            "categories": list(v['categories']),
            "parts": sorted(v['parts']),
            "properties": sorted(v['properties']),
            "implements": sorted(v['implements'])
        } for v in self.sensor_map.values()]


In [57]:
# ---------------------------
# 4. Execution Pipeline
# ---------------------------

def process_pdf_to_kg(pdf_path: str) -> List[Dict]:
    # 1. Extract structured content
    chunks = extract_text_with_tables(pdf_path)

    # 2. Initialize processors
    processor = LidarProcessor()
    merger = KnowledgeMerger()
    
    # 3. Process each chunk
    for chunk in chunks:
        sensors = processor.process_chunk(chunk['content'])
        for sensor in sensors:
            merger.add_sensor(sensor)
    
    # 4. Post-process and validate
    return merger.finalize()

In [60]:
results = process_pdf_to_kg("remotesensing-16-04623.pdf")
    
with open("lidar_kg.json", "w") as f:
    json.dump({"sensors": results}, f, indent=2)

Missing 'sensors' key in response: {
"LiDAR sensors are widely used in agriculture for various applications such as crop classification, inspection of crop viability, crop mapping, cloud profiling, collision avoidance, obstacle detection in autonomous traveling, and soil resource conservation. They also play a significant role in agricultural field machinery, object detection, and high-throughput crop phenotyping."

: "LiDAR sensors are widely used in agriculture for various applications such as crop classification, inspection of crop viability, crop mapping, cloud profiling, collision avoidance, obstacle detection in autonomous traveling, and soil resource conservation. They also play a significant role in agricultural field machinery, object detection, and high-throughput crop phenotyping."

}
Missing 'sensors' key in response: {
"abstract": {
  "en": "This paper presents a review of recent advancements in LiDAR technology for agricultural applications, focusing on its use in crop mo

In [61]:
import json
import re
import unicodedata

def normalize_properties(properties: List[str]) -> Dict[str, str]:
    """Generic property normalization and deduplication"""
    prop_map = {}
    
    for prop in properties:
        try:
            # 1. Normalize Unicode characters
            normalized = unicodedata.normalize('NFKC', prop)
            
            # 2. Split into key/value (case-insensitive)
            key, value = re.split(r":\s*", normalized, 1)
            key = key.strip().lower()
            
            # 3. Standardize units and values
            value = re.sub(r'(\d+)\s*(m|meters?)\b', r'\1m', value)  # Range
            value = re.sub(r'(\d+)\s*degrees?', r'\1°', value)       # Angles
            value = re.sub(r'\u00b0', '°', value)                    # Degree symbol
            
            # 4. Conflict resolution (keep last occurrence)
            prop_map[key] = value
            
        except ValueError:
            continue  # Skip malformed properties

    # 5. Deduplicate and format
    return {k: v for k, v in prop_map.items()}

def clean_sensor_data(sensors: List[Dict]) -> List[Dict]:
    """Process sensor data generically"""
    return [
        {
            "name": sensor["name"],
            "category": sensor["categories"][0] if sensor["categories"] else "Other",
            "specifications": normalize_properties(sensor["properties"]),
            "components": list(set(sensor["parts"])),
            "interfaces": list(set(sensor["implements"]))
        }
        for sensor in sensors
    ]

# Usage example
with open("lidar_kg.json") as f:
    data = json.load(f)

cleaned_data = clean_sensor_data(data["sensors"])

with open("cleaned_output.json", "w") as f:
    json.dump({"sensors": cleaned_data}, f, indent=2, ensure_ascii=False)