In [None]:
!pip install PyMuPDF
# Trial 계정에서는 동작하지 않습니다.  

In [None]:
import os
import sys
import argparse
import time
from pathlib import Path
from typing import List, Optional
import fitz  # PyMuPDF

from snowflake.snowpark.context import get_active_session
session = get_active_session() 

In [None]:
create or replace stage demo.magi_handson.docs_stg 
encryption = (type = 'SNOWFLAKE_SSE')
directory = (enable = true AUTO_REFRESH = TRUE);

create or replace stage demo.magi_handson.image_stg 
encryption = (type = 'SNOWFLAKE_SSE') 
directory = (enable = true AUTO_REFRESH = TRUE);

In [None]:
// demo.magi_handson.docs_stg 에 문서업로드 
ls @demo.magi_handson.docs_stg ;

In [None]:
"""
1. 클래스 역할 : PDF 파일을 페이지 단위로 Image(흑백) 파일로 각각 생성하는 
2. 수정할 부분 : 사용자 환경에 맞춰 main() 함수의 input/output Stage 경로 수정
              - input_stage, output_stage
"""

class PyMuPDFGrayscaleConverter:
    def __init__(self, default_zoom: float = 4.17, image_format: str = "PNG"):
        self.default_zoom = default_zoom
        self.image_format = image_format.upper()

    def convert_pdf(self, pdf_path: str, output_folder: str) -> List[str]:
        pdf_basename = Path(pdf_path).stem
        os.makedirs(output_folder, exist_ok=True)
        doc = fitz.open(pdf_path)
        total_pages = len(doc)
        image_paths = []
        for page_index in range(total_pages):
            page = doc.load_page(page_index)
            mat = fitz.Matrix(self.default_zoom, self.default_zoom)
            pix = page.get_pixmap(matrix=mat, colorspace=fitz.csGRAY, alpha=False)
            image_path = os.path.join(
                output_folder, f"{pdf_basename}_page_{page_index + 1:03d}_grayscale.{self.image_format.lower()}"
            )
            pix.save(image_path)
            image_paths.append(image_path)
            pix = None
            page = None
        doc.close()
        return image_paths

def main():

    # 사용자 환경에 맞게 Stage 경로 수정해야 함
    input_stage = "@demo.magi_handson.docs_stg"
    output_stage = "@demo.magi_handson.image_stg"

    local_pdf_dir = "pdf_input"
    local_image_dir = "image_output"
    os.makedirs(local_pdf_dir, exist_ok=True)
    os.makedirs(local_image_dir, exist_ok=True)

    # Step 1: Download all PDFs
    pdf_stage_list = session.sql(f"LIST {input_stage}").collect()
    pdf_files = []
    for item in pdf_stage_list:
        filename = item['name']
        print(f"[filename] == {filename}")
        session.file.get(input_stage, local_pdf_dir)
        local_pdf_path = os.path.join(local_pdf_dir, os.path.basename(filename))
        pdf_files.append(local_pdf_path)

    # Step 2: Convert PDFs to images
    converter = PyMuPDFGrayscaleConverter()
    for pdf_file in pdf_files:
        converter.convert_pdf(pdf_file, local_image_dir)

    # Step 3: Upload images to IMAGE_INTERNAL
    for image_file in Path(local_image_dir).glob("*.png"):
        session.file.put(str(image_file), output_stage, overwrite=True, auto_compress= False)

    print(f"All PDFs processed. Images uploaded to {output_stage}")


main()

In [None]:
ls @demo.magi_handson.image_stg;

In [None]:
-- CREATE DATABASE IF NOT EXISTS demo;
-- CREATE SCHEMA IF NOT EXISTS demo.magi_handson;

-- =================================================================
-- STEP 1: 소스 테이블 생성
-- =================================================================

-- 이미지에서 추출된 원본 텍스트 저장
CREATE OR REPLACE TABLE demo.magi_handson.IMAGE_DATA_RAW (
    RELATIVE_PATH STRING,
    EXTRACTED_CONTENT STRING,
    PROCESSING_TIMESTAMP TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    AI_MODEL STRING DEFAULT 'llama4-maverick'
);

-- 검색용 청크 테이블
CREATE OR REPLACE TABLE demo.magi_handson.IMAGE_DATA_CHUNK (
    RELATIVE_PATH STRING,
    FILE_URL STRING,
    CHUNK_INDEX INT,
    CHUNK STRING,
    CREATED_TIMESTAMP TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);

In [None]:
-- =================================================================
-- STEP 2: 데이터 청킹
-- =================================================================

CREATE OR REPLACE PROCEDURE DEMO.magi_handson.EXTRACT_DATA_FROM_IMAGES()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
    new_images_count NUMBER := 0;
    processed_count NUMBER := 0;
    result_message STRING := '';
BEGIN
    -- 1. 새로 추가된 이미지 파일 확인
    CREATE OR REPLACE TEMPORARY TABLE DEMO.magi_handson.temp_new_images AS
    SELECT DISTINCT RELATIVE_PATH
      FROM DIRECTORY('@demo.magi_handson.image_stg')
     WHERE (RELATIVE_PATH LIKE '%.png' 
            OR RELATIVE_PATH LIKE '%.jpg'
            OR RELATIVE_PATH LIKE '%.jpeg'
            OR RELATIVE_PATH LIKE '%.PNG'
            OR RELATIVE_PATH LIKE '%.JPG'
            OR RELATIVE_PATH LIKE '%.JPEG')
       AND RELATIVE_PATH NOT IN (
              SELECT RELATIVE_PATH
                FROM DEMO.magi_handson.IMAGE_DATA_RAW
           );
    
    -- 새 이미지 개수 확인
    SELECT COUNT(*) INTO new_images_count FROM DEMO.magi_handson.temp_new_images;
    
    IF (new_images_count > 0) THEN
        -- 2. 각 이미지를 개별적으로 처리 (단순한 INSERT 방식)
        INSERT INTO DEMO.magi_handson.IMAGE_DATA_RAW (RELATIVE_PATH, EXTRACTED_CONTENT)
        SELECT 
               RELATIVE_PATH,
               AI_COMPLETE(
                'llama4-maverick',
                '첨부 이미지의 표 데이터를 JSON 형식으로 변환해줘. 모든 행과 열의 데이터를 누락 없이 추출하고, 다음 규칙을 반드시 따라야 해.
1. 원본 이미지의 표, 차트, 그래프, 텍스트 등 모든 컨텐츠를 JSON에 포함시켜야 해.
2. 데이터의 순서는 원본 이미지와 동일하게 유지해야 해.
3. 병합된 셀의 정보는 해당 병합된 셀에 속한 모든 하위 항목에 동일하게 반복해서 기재해야 해.
   (예를들어, 세부 항목이 2개 이상인 경우, 병합된 셀에 속한 모든 하위 항목을 반복할때는 기준이 되는 마스터 값은 모두에 포함되어야 함).
4. 표의 제목, 행/열 이름을 키(Key)로 사용하고, 모든 키와 값을 빠짐없이 채워야 해.
5. 추측성 답변은 하지 말고, 오직 이미지에 있는 내용으로만 JSON 데이터를 작성해야 해.',
                TO_FILE('@demo.magi_handson.image_stg', RELATIVE_PATH)
               ) as extracted_content
          FROM DEMO.magi_handson.temp_new_images
         ORDER BY RELATIVE_PATH;
        
        -- 3. 성공적으로 처리된 이미지 개수 확인
        SELECT COUNT(*) INTO processed_count 
          FROM DEMO.magi_handson.IMAGE_DATA_RAW 
         WHERE RELATIVE_PATH IN (SELECT RELATIVE_PATH FROM DEMO.magi_handson.temp_new_images)
           AND EXTRACTED_CONTENT IS NOT NULL
           AND EXTRACTED_CONTENT NOT LIKE 'ERROR:%';
        
        -- 4. 추출된 텍스트를 청킹하여 IMAGE_DATA_CHUNK에 저장
        INSERT INTO DEMO.magi_handson.IMAGE_DATA_CHUNK (
            RELATIVE_PATH, FILE_URL, CHUNK_INDEX, CHUNK
        )
        SELECT
               rt.RELATIVE_PATH,
               -- BUILD_SCOPED_FILE_URL('@demo.magi_handson.image_stg', rt.RELATIVE_PATH) AS file_url,
               GET_PRESIGNED_URL('@demo.magi_handson.image_stg', rt.RELATIVE_PATH, 7200)  AS file_url,
               c.index AS chunk_index,
               CONCAT(rt.RELATIVE_PATH, ': ', c.value::TEXT) AS chunk
          FROM
               DEMO.magi_handson.IMAGE_DATA_RAW rt,
               LATERAL FLATTEN(SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(
                   rt.EXTRACTED_CONTENT,
                   'none',
                   1000,  -- chunk size
                   200    -- overlap size
               )) c
         WHERE rt.RELATIVE_PATH IN (SELECT RELATIVE_PATH FROM DEMO.magi_handson.temp_new_images)
           AND rt.EXTRACTED_CONTENT IS NOT NULL
           AND rt.EXTRACTED_CONTENT NOT LIKE 'ERROR:%'
           AND LENGTH(rt.EXTRACTED_CONTENT) > 10
         ORDER BY rt.RELATIVE_PATH;
        
        result_message := ' ' || new_images_count || '개의 새로운 이미지 중 ' || processed_count || '개가 성공적으로 처리되었습니다. Cortex Search 서비스가 자동으로 업데이트됩니다.';
    ELSE
        result_message := ' 처리할 새로운 이미지 파일이 없습니다.';
    END IF;
    
    RETURN result_message;
END;
$$;

CALL DEMO.magi_handson.EXTRACT_DATA_FROM_IMAGES();

In [None]:
select * from demo.magi_handson.IMAGE_DATA_CHUNK;