In [15]:
import os
import json
from typing import Dict, List, Optional, Tuple
from notion_client import Client
import pandas as pd
from datetime import datetime

class NotionProcessor:
    def __init__(self, auth_token: str):
        """Initialize the Notion client with authentication token."""
        self.notion = Client(auth=auth_token)
    
    def get_database_pages(self, database_id: str, parent_properties: Dict = None) -> List[Dict]:
        """
        Retrieve all pages from a Notion database.
        If parent_properties provided, merge them with each page's properties.
        """
        pages = []
        cursor = None
        
        while True:
            response = self.notion.databases.query(
                database_id=database_id,
                start_cursor=cursor
            )
            
            # If parent properties exist, merge them with each page
            if parent_properties:
                for page in response['results']:
                    self._merge_parent_properties(page, parent_properties)
            
            pages.extend(response['results'])
            
            if not response.get('has_more'):
                break
                
            cursor = response['next_cursor']
            
        return pages
    
    def _merge_parent_properties(self, page: Dict, parent_properties: Dict):
        """Merge parent properties into page properties, avoiding duplicates."""
        for key, value in parent_properties.items():
            if key not in page['properties']:
                # Create a new property of type 'rich_text' to store inherited values
                page['properties'][f'parent_{key}'] = {
                    'type': 'rich_text',
                    'rich_text': [{
                        'type': 'text',
                        'text': {'content': str(value)},
                        'plain_text': str(value)
                    }]
                }
    
    def extract_properties(self, page: Dict) -> Dict:
        """Extract properties from a page."""
        properties = {}
        
        for prop_name, prop_data in page['properties'].items():
            prop_type = prop_data['type']
            
            if prop_type == 'title':
                properties[prop_name] = self._get_rich_text_content(prop_data['title'])
            elif prop_type == 'rich_text':
                properties[prop_name] = self._get_rich_text_content(prop_data['rich_text'])
            elif prop_type == 'select':
                if prop_data['select']:
                    properties[prop_name] = prop_data['select']['name']
            elif prop_type == 'multi_select':
                properties[prop_name] = [item['name'] for item in prop_data['multi_select']]
            elif prop_type == 'date':
                if prop_data['date']:
                    properties[prop_name] = prop_data['date']['start']
            elif prop_type in ['number', 'checkbox']:
                properties[prop_name] = prop_data[prop_type]
                
        return properties
    
    def _get_rich_text_content(self, rich_text: List) -> str:
        """Extract text content from rich text array."""
        return ' '.join([text['plain_text'] for text in rich_text if text.get('plain_text')])
    
    def get_block_children(self, block_id: str, level: int = 0) -> List[Tuple[Dict, int]]:
        """Retrieve all child blocks of a given block with their nesting level."""
        blocks = []
        cursor = None
        
        while True:
            response = self.notion.blocks.children.list(
                block_id=block_id,
                start_cursor=cursor
            )
            
            for block in response['results']:
                blocks.append((block, level))
                
                if block.get('has_children'):
                    # For child database blocks, don't process them here
                    if block['type'] != 'child_database':
                        child_blocks = self.get_block_children(block['id'], level + 1)
                        blocks.extend(child_blocks)
            
            if not response.get('has_more'):
                break
                
            cursor = response['next_cursor']
            
        return blocks
    
    def process_blocks(self, blocks: List[Tuple[Dict, int]]) -> Tuple[Dict, List[str]]:
        """
        Process blocks to extract headers (with sub-headers) and content.
        Returns tuple of (headers_dict, content_sections).
        """
        current_main_header = None
        current_sub_headers = []
        current_content = []
        headers = {}
        content_sections = []
        current_bullet_group = []
        
        def save_current_section():
            """Helper function to save current section's content."""
            nonlocal current_content, current_bullet_group, content_sections, current_main_header, current_sub_headers, headers
            
            # Save any remaining bullet group
            if current_bullet_group:
                current_content.append(self._merge_bullet_group(current_bullet_group))
                current_bullet_group = []
            
            # Save content if exists and we have a header
            if current_main_header is not None and current_content:
                content_sections.append('\n'.join(filter(None, current_content)))
                full_header = current_main_header
                if current_sub_headers:
                    full_header += ' - ' + ' - '.join(current_sub_headers)
                headers[full_header] = len(content_sections) - 1  # Point to the section we just added
        
        for block, level in blocks:
            block_type = block['type']
            
            # Handle headers
            if block_type.startswith('heading_'):
                # Save current section before starting new one
                save_current_section()
                
                # Reset content collection for new section
                current_content = []
                current_bullet_group = []
                
                # Process new header
                header_text = self._get_rich_text_content(block[block_type]['rich_text'])
                header_level = int(block_type[-1])
                
                if header_level == 1:
                    current_main_header = header_text
                    current_sub_headers = []
                else:
                    current_sub_headers.append(header_text)
            
            # Handle child database
            elif block_type == 'child_database':
                if current_bullet_group:
                    current_content.append(self._merge_bullet_group(current_bullet_group))
                    current_bullet_group = []
                current_content.append(f"[Database: {block['id']}]")
            
            # Handle bullet points and numbered lists
            elif block_type in ['bulleted_list_item', 'numbered_list_item']:
                text_content = self._get_rich_text_content(block[block_type]['rich_text'])
                
                if level == 0:
                    if current_bullet_group:
                        current_content.append(self._merge_bullet_group(current_bullet_group))
                        current_bullet_group = []
                    current_bullet_group = [(text_content, level)]
                else:
                    current_bullet_group.append((text_content, level))
            
            # Handle regular paragraphs
            elif block_type == 'paragraph':
                if current_bullet_group:
                    current_content.append(self._merge_bullet_group(current_bullet_group))
                    current_bullet_group = []
                
                text_content = self._get_rich_text_content(block[block_type]['rich_text'])
                if text_content:
                    current_content.append(text_content)
        
        # Save final section
        save_current_section()
        
        return headers, content_sections
    
    def _merge_bullet_group(self, bullet_group: List[Tuple[str, int]]) -> str:
        """Merge a group of bullets into a single line, with sub-bullets inline."""
        if not bullet_group:
            return ""
        
        result = []
        current_main_bullet = []
        
        for text, level in bullet_group:
            if level == 0:
                if current_main_bullet:
                    result.append(' '.join(current_main_bullet))
                current_main_bullet = [text]
            else:
                current_main_bullet.append(text)
        
        if current_main_bullet:
            result.append(' '.join(current_main_bullet))
        
        return '\n'.join(result)
    
    def process_page(self, page: Dict, parent_properties: Dict = None) -> List[Dict]:
        """Process a single page and its nested databases."""
        results = []
        properties = self.extract_properties(page)
        
        # Merge parent properties if they exist
        if parent_properties:
            properties.update({f'parent_{k}': v for k, v in parent_properties.items()})
        
        # Process page blocks
        blocks = self.get_block_children(page['id'])
        headers, content_sections = self.process_blocks(blocks)
        
        # Add headers to properties and create entries
        for header, section_index in headers.items():
            section_properties = properties.copy()
            section_properties['header'] = header
            
            # Create entry for this section
            if 0 <= section_index < len(content_sections):
                results.append({
                    'properties': section_properties,
                    'content': content_sections[section_index]
                })
        
        # Process any child databases found in the blocks
        for block, _ in blocks:
            if block['type'] == 'child_database':
                child_pages = self.get_database_pages(block['id'], properties)
                for child_page in child_pages:
                    results.extend(self.process_page(child_page, properties))
        
        return results
    
    def process_database(self, database_id: str) -> List[Dict]:
        """Process entire database and return structured data."""
        processed_data = []
        
        # Get all pages from the database
        pages = self.get_database_pages(database_id)
        
        # Process each page and its nested databases
        for page in pages:
            processed_data.extend(self.process_page(page))
            
        return processed_data

# Example usage in Jupyter notebook (separate cells):

In [16]:
# Example usage in Jupyter notebook (separate cells):

# Cell 1: Initialize processor
NOTION_TOKEN = os.getenv("NOTION_TOKEN")
DATABASE_ID = os.getenv("NOTION_DATABASE_ID")

processor = NotionProcessor(NOTION_TOKEN)
processed_data = processor.process_database(DATABASE_ID)

In [23]:
processed_data[0]['content']


'The  Boundary Diagram Tool (BDT)  is an advanced tool designed for  change impact analysis  in large-scale Simulink models, particularly those used in embedded systems like automotive control units. The tool enables engineers to trace how changes in specific parts of a system propagate through other models and network interfaces, aiding in software maintenance, debugging, and ensuring compliance with safety standards like  ISO 26262 . It was developed as part of a collaboration with  Stellantis  (formerly FCA), where it played a key role in managing changes within complex automotive systems, such as  hybrid electric vehicle control systems .\nAs part of the development team, my work on the BDT Tool involved both using the tool for various critical tasks and improving its functionality to make it more efficient and reliable.'

In [17]:
# Cell 3: Save to file
output_file = f"notion_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w', encoding='utf-8') as f:
    json.dump(processed_data, f, ensure_ascii=False, indent=2)

print(f"Data exported to {output_file}")

Data exported to notion_export_20241106_215738.json


In [24]:
from llama_index.core import Document

documents = [Document(text=record['content'], metadata=record['properties']) for record in processed_data]


In [25]:
documents[0]

Document(id_='95854c94-20ae-44f9-8540-c8a100de8a15', embedding=None, metadata={'Employer': 'McMaster University - Partnered with Stellantis', 'Description': 'Maintained and improved the  Boundary Diagram Tool  (BDT) for change impact analysis in large-scale Simulink models, specifically for automotive systems at Stellantis   ', 'Project Size': 'Medium', 'When': '2020-06-01', 'Position': 'Research Assistant', 'Tags': ['MATLAB', 'Simulink'], 'Name': 'MATLAB/Simulink - Boundary Diagram Tool', 'header': 'Project Overview:'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, text='The  Boundary Diagram Tool (BDT)  is an advanced tool designed for  change impact analysis  in large-scale Simulink models, particularly those used in embedded systems like automotive control units. The tool enables engineers to trace how changes in specific parts of a system propagate through other models and network interfaces, aiding in software maintenance, debugging, and ensuri