In [None]:
# Install the following pacakges: pandas and python-pptx

In [None]:
# ── extract_pptx.py ─────────────────────────────────────────────────────────────
"""
Native PPTX extractor for RAG pipelines.
• Text → one row per shape (or per nested shape) with (x, y, w, h) in EMUs
• Pictures → one row with placeholder text '[IMAGE]' and image metadata
• Saves a Parquet file; prints the first few rows so you can eyeball the result
"""

import os
import tempfile
import shutil
import pandas as pd
from snowflake.snowpark.context import get_active_session
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
from collections import deque

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
EMU_PER_INCH = 914_400  # constant from Office spec

In [None]:
def extract_from_stage(stage_name, file_name, image_stage_name):
    """Extract content from a PPTX file stored in a Snowflake stage"""
    import os
    import tempfile
    import shutil
    from pptx import Presentation
    from collections import deque
    
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()
    temp_path = os.path.join(temp_dir, file_name)
    
    try:
        # Download file from stage to temp directory
        session.file.get(f"@{stage_name}/{file_name}", temp_dir)
        
        # The actual file path might be different - find the downloaded file
        actual_file_path = os.path.join(temp_dir, file_name)
        
        # Check if the downloaded item is a directory
        if os.path.isdir(actual_file_path):
            # Look for the file inside the directory
            files = os.listdir(actual_file_path)
            if files:
                # Use the first file found
                actual_file_path = os.path.join(actual_file_path, files[0])
            else:
                raise FileNotFoundError(f"No files found in {actual_file_path}")
        
        # Process the file
        prs = Presentation(actual_file_path)
        rows = []
        
        for idx, slide in enumerate(prs.slides, start=1):
            for shape_data in process_slide_shapes(slide, idx, image_stage_name):
                rows.append(shape_data)

        if not rows:
            return pd.DataFrame()
            
        df = pd.DataFrame(rows)
        
        # Extract coordinates from bbox for sorting
        df["top"] = df["bbox"].apply(lambda x: x[1])
        df["left"] = df["bbox"].apply(lambda x: x[0])
        
        # Group elements into columns based on horizontal position 
        df["column_group"] = pd.cut(df["left"], bins=10, labels=False)
        
        # Sort by slide, then column group, then vertical position
        df = df.sort_values(by=["slide", "column_group", "top"])
        
        # Drop the temporary columns
        df = df.drop(columns=["top", "left", "column_group"])
        
        return df
        
    finally:
        # Clean up temp directory and all contents
        shutil.rmtree(temp_dir, ignore_errors=True)


In [None]:
def process_slide_shapes(slide, slide_idx, image_stage_name):
    """Process shapes from a slide and return data for each shape"""
    from collections import deque
    from pptx.enum.shapes import MSO_SHAPE_TYPE
    import tempfile
    import os
    
    stack = deque(slide.shapes)
    results = []
    
    while stack:
        shp = stack.pop()
        # Recursively expand group shapes
        if shp.shape_type == MSO_SHAPE_TYPE.GROUP:
            stack.extend(shp.shapes)
            continue

        bbox = [int(shp.left), int(shp.top), int(shp.width), int(shp.height)]

        if shp.has_text_frame and shp.text_frame.text.strip():
            results.append({
                "slide": slide_idx,
                "shape_id": shp.shape_id,
                "type": "TEXT",
                "content": shp.text_frame.text.strip(),
                "bbox": bbox,
            })

        elif shp.shape_type == MSO_SHAPE_TYPE.PICTURE:
            image = shp.image
            name = f"slide{slide_idx}_img{shp.shape_id}{image.ext}"
            
            # Create temp file for the image
            temp_img_path = os.path.join(tempfile.mkdtemp(), name)
            
            try:
                # Save image to temp file
                with open(temp_img_path, 'wb') as f:
                    f.write(image.blob)
                
                # Upload to Snowflake stage
                session.file.put(temp_img_path, f"@{image_stage_name}/", auto_compress=False, overwrite=True)
                
                results.append({
                    "slide": slide_idx,
                    "shape_id": shp.shape_id,
                    "type": "IMAGE",
                    "content": "[IMAGE]",
                    "file": name,
                    "bbox": bbox,
                })
            finally:
                # Clean up
                if os.path.exists(temp_img_path):
                    os.remove(temp_img_path)
                if os.path.exists(os.path.dirname(temp_img_path)):
                    os.rmdir(os.path.dirname(temp_img_path))
    
    return results

In [None]:
def save_to_snowflake_table(df, table_name):
    """Save the extracted data to a Snowflake table"""
    # Reset to standard RangeIndex so no warning appears
    df_copy = df.reset_index(drop=True).copy()
    df_copy["bbox"] = df_copy["bbox"].apply(str)

    snowpark_df = session.create_dataframe(df_copy)
    snowpark_df.write.mode("overwrite").save_as_table(table_name)
    return f"Saved {len(df_copy)} records to table {table_name}"

In [None]:
# Setup stages if they don't exist

# session.sql("""
# CREATE STAGE IF NOT EXISTS PPTX
#   ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')          -- server-side encryption by Snowflake KMS
#   DIRECTORY  = (ENABLE = TRUE)                   -- keeps a manifest of everything you PUT
#   );
# """).collect()

# session.sql("""
# CREATE STAGE IF NOT EXISTS PPTX_IMAGES
#   ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')          -- same SSE envelope encryption
#   DIRECTORY  = (ENABLE = TRUE)
#   );
# """).collect()

# Upload a PPTX file to the stage (can be done through Snowflake UI or code)
# session.file.put("local_file.pptx", "@pptx_files/", overwrite=True)



In [None]:
# Extract content
df = extract_from_stage("PPTX", "sample3.pptx", "PPTX_IMAGES")

# Display the data
df.head(20)

# Saving to Snowflake Table

In [None]:
# # Save to Snowflake table
result = save_to_snowflake_table(df, "PPTX_EXTRACTED_CONTENT")
print(result)

In [None]:
select *
from pptx_extracted_content;

In [None]:
-- drop table pptx_extracted_content;