# Transforming Unstructured Data from an AWS S3 bucket into RAG-Ready Data in Singdata Lakehouse

In [1]:
# 替换 Cell f3af592c - 切换到本地开发版本
import sys
import importlib

# 卸载已安装的版本并重新安装本地开发版本
print("🔄 切换到本地开发版本...")
!pip uninstall unstructured-ingest -y -q
!pip install -e /Users/liangmo/Documents/GitHub/unstructured-ingest-clickzetta/ -q

# 清理模块缓存，强制重新导入
modules_to_remove = [module for module in sys.modules.keys() if module.startswith('unstructured_ingest')]
for module in modules_to_remove:
    if module in sys.modules:
        del sys.modules[module]

# 重新导入
import unstructured_ingest
print(f"✅ 切换到本地开发版本: {unstructured_ingest.__file__}")

# 验证 DashScope 支持
try:
    from unstructured_ingest.processes.embedder import EmbedderConfig
    # 测试是否支持 dashscope
    test_config = EmbedderConfig(
        embedding_provider="dashscope",
        embedding_model_name="text-embedding-v4",
        embedding_api_key="test"
    )
    print("✅ DashScope 支持已成功添加到 EmbedderConfig")
    
    # 测试 get_embedder 方法
    try:
        embedder = test_config.get_embedder()
        print("✅ DashScope get_embedder 方法正常工作")
    except Exception as e:
        print(f"⚠️  get_embedder 方法需要修复: {e}")
        
except Exception as e:
    print(f"❌ DashScope 支持检查失败: {e}")
    import traceback
    traceback.print_exc()

🔄 切换到本地开发版本...
✅ 切换到本地开发版本: /Users/liangmo/Documents/GitHub/unstructured-ingest-clickzetta/unstructured_ingest/__init__.py
✅ DashScope 支持已成功添加到 EmbedderConfig
✅ DashScope get_embedder 方法正常工作


In [2]:
import json
import pandas as pd
import logging
import warnings

logging.basicConfig(level=logging.ERROR, force=True)
warnings.filterwarnings("ignore", category=UserWarning)

# if you want to drop the tables before write data, set drop_tables to True
drop_tables = True

In [3]:
# 安装必需的依赖
# !pip install "dashscope"

In [4]:
import os
import dotenv

dotenv.load_dotenv('.env') # replace with the path to your .env file
os.getenv("LOCAL_FILE_INPUT_DIR")

'/Users/liangmo/yunqidoc/cn_markdown_20250526'

In [10]:
# 修改配置部分 - Cell 6501bbf9
# Define the table names to use for storing the data in Lakehouse.
index_and_table_prefix = "dashscope_v4_1024_2048_20250611_"  # 更新前缀反映使用DashScope v4
raw_table_name = f"{index_and_table_prefix}yunqi_raw_elements"
silver_table_name = f"{index_and_table_prefix}yunqi_elements"
embeddings_dimensions = 1024  # DashScope text-embedding-v4 的维度
chunk_max_characters = 2048
chunk_overlap = 512
embedding_provider = "dashscope"  # 使用 dashscope
embedding_model_name = "text-embedding-v4"  # 使用 DashScope v4 模型
api_key = "sk-7d178531cbd14ce6bba2d16fe3948239"

print(f"✅ 配置更新:")
print(f"  使用 DashScope text-embedding-v4")
print(f"  模型: {embedding_model_name}")
print(f"  维度: {embeddings_dimensions}")

✅ 配置更新:
  使用 DashScope text-embedding-v4
  模型: text-embedding-v4
  维度: 1024


In [11]:
# Get the connection parameter to Singdata Lakehouse.
_username = os.getenv("cz_username")
_password = os.getenv("cz_password")
_service = os.getenv("cz_service")
_instance = os.getenv("cz_instance")
_workspace = os.getenv("cz_workspace")
_schema = os.getenv("cz_schema")
_vcluster = os.getenv("cz_vcluster")

In [12]:
# Define the schema to use for storing the data in Singdata Lakehouse.
raw_table_ddl = f"""
CREATE TABLE IF NOT EXISTS {_schema}.{raw_table_name} (
    id STRING, -- Auto-increment sequence
    record_locator STRING,
    type STRING,
    record_id STRING, -- Record identifier from the data source (e.g., record locator in connector metadata)
    element_id STRING, -- Unique identifier for the element (SHA-256 or UUID)
    filetype STRING, -- File type (e.g., PDF, DOCX, EML, etc.)
    file_directory STRING, -- Directory where the file is located
    filename STRING, -- File name
    last_modified TIMESTAMP, -- Last modified time of the file
    languages STRING, -- Document language, supports a list of multiple languages
    page_number STRING, -- Page number (applicable for PDF, DOCX, etc.)
    text STRING, -- Extracted text content
    embeddings VECTOR({embeddings_dimensions}), -- Vector data
    parent_id STRING, -- Parent element ID, used to represent element hierarchy
    is_continuation BOOLEAN, -- Whether it is a continuation of the previous element (used in chunking)
    orig_elements STRING, -- Original element in JSON format (used to store the complete element structure)
    element_type STRING, -- Element type (e.g., NarrativeText, Title, Table, etc.)
    coordinates STRING, -- Element coordinates (stored in JSONB format)
    link_texts STRING, -- Added field: Link text
    link_urls STRING, -- Added field: Link URL
    email_message_id STRING, -- Added field: Email message ID
    sent_from STRING, -- Added field: Sender
    sent_to STRING, -- Added field: Recipient
    subject STRING, -- Added field: Subject
    url STRING, -- Added field: URL
    version STRING, -- Added field: Version
    date_created TIMESTAMP, -- Added field: Creation date
    date_modified TIMESTAMP, -- Added field: Modification date
    date_processed TIMESTAMP, -- Added field: Processing date
    text_as_html STRING, -- Added field: Text in HTML format
    emphasized_text_contents STRING,
    emphasized_text_tags STRING,
    documents_original_source STRING, -- Added field: Document source
);
"""

silver_table_ddl = f"""
CREATE TABLE IF NOT EXISTS {_schema}.{silver_table_name} (
    id STRING, -- Auto-increment sequence
    record_locator STRING,
    type STRING,
    record_id STRING, -- Record identifier from the data source (e.g., record locator in connector metadata)
    element_id STRING, -- Unique identifier for the element (SHA-256 or UUID)
    filetype STRING, -- File type (e.g., PDF, DOCX, EML, etc.)
    file_directory STRING, -- Directory where the file is located
    filename STRING, -- File name
    last_modified TIMESTAMP, -- Last modified time of the file
    languages STRING, -- Document language, supports a list of multiple languages
    page_number STRING, -- Page number (applicable for PDF, DOCX, etc.)
    text STRING, -- Extracted text content
    embeddings vector({embeddings_dimensions}), -- Vector data
    parent_id STRING, -- Parent element ID, used to represent element hierarchy
    is_continuation BOOLEAN, -- Whether it is a continuation of the previous element (used in chunking)
    orig_elements STRING, -- Original element in JSON format (used to store the complete element structure)
    element_type STRING, -- Element type (e.g., NarrativeText, Title, Table, etc.)
    coordinates STRING, -- Element coordinates (stored in JSONB format)
    link_texts STRING, -- Added field: Link text
    link_urls STRING, -- Added field: Link URL
    email_message_id STRING, -- Added field: Email message ID
    sent_from STRING, -- Added field: Sender
    sent_to STRING, -- Added field: Recipient
    subject STRING, -- Added field: Subject
    url STRING, -- Added field: URL
    version STRING, -- Added field: Version
    date_created TIMESTAMP, -- Added field: Creation date
    date_modified TIMESTAMP, -- Added field: Modification date
    date_processed TIMESTAMP, -- Added field: Processing date
    text_as_html STRING, -- Added field: Text in HTML format
    emphasized_text_contents STRING,
    emphasized_text_tags STRING,
    documents_source STRING, -- Added field: Document source
    INDEX {index_and_table_prefix}inverted_text_index_yunqi_cn (text) INVERTED  PROPERTIES('analyzer'='unicode'),
    INDEX {index_and_table_prefix}embeddings_vec_index_yunqi_cn(embeddings) USING vector properties (
        "scalar.type" = "f32",
        "distance.function" = "cosine_distance")
);
"""

clean_transformation_data_sql = f"""
INSERT overwrite {_schema}.{silver_table_name}
SELECT 
    id, 
    record_locator, 
    type, 
    record_id, 
    element_id, 
    filetype, 
    file_directory, 
    filename, 
    last_modified, 
    languages, 
    page_number, 
    text, 
    CAST(embeddings AS VECTOR({embeddings_dimensions})) AS embeddings, 
    parent_id, 
    is_continuation, 
    orig_elements, 
    element_type, 
    coordinates, 
    link_texts, 
    link_urls, 
    email_message_id, 
    sent_from, 
    sent_to, 
    subject, 
    url, 
    version, 
    date_created, 
    date_modified, 
    date_processed, 
    text_as_html,
    emphasized_text_contents, 
    emphasized_text_tags,
    "https://yunqi.tech/documents" as documents_source
FROM {_schema}.{raw_table_name};
"""

In [13]:
# Define the function to create the connection to Singdata Lakehouse.
from clickzetta.connector import connect
import pandas as pd
def get_connection(password, username, service, instance, workspace, schema, vcluster):
    connection = connect(
        password=password,
        username=username,
        service=service,
        instance=instance,
        workspace=workspace,
        schema=schema,
        vcluster=vcluster)
    return connection

In [14]:
# Create the connection to Singdata Lakehouse.
conn = get_connection(password=_password, username=_username, service=_service, instance=_instance, workspace=_workspace, schema=_schema, vcluster=_vcluster)

In [15]:
# Function to execute SQL statements
def excute_sql(conn,sql_statement: str):
    with conn.cursor() as cur:

        stmt = sql_statement

        cur.execute(stmt)

        results = cur.fetchall()

    return results

In [16]:
if drop_tables:
    excute_sql(conn,f"DROP TABLE IF EXISTS {_schema}.{raw_table_name}")
    excute_sql(conn,f"DROP TABLE IF EXISTS {_schema}.{silver_table_name}")

In [17]:
# Create Table in  Lakehouse
excute_sql(conn, raw_table_ddl)
excute_sql(conn, silver_table_ddl)

[['OPERATION SUCCEED']]

### PDFs/Images/Emails ingestion and preprocessing pipeline

In [18]:
from unstructured_ingest.interfaces import ProcessorConfig
from unstructured_ingest.pipeline.pipeline import Pipeline
from unstructured_ingest.processes.chunker import ChunkerConfig
from unstructured_ingest.processes.connectors.fsspec.s3 import (
    S3ConnectionConfig,
    S3DownloaderConfig,
    S3IndexerConfig,
    S3AccessConfig,
)
from unstructured_ingest.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig
)
from unstructured_ingest.processes.embedder import EmbedderConfig
from unstructured_ingest.processes.partitioner import PartitionerConfig
from unstructured_ingest.embed.dashscope import DashScopeEmbeddingConfig, DashScopeEmbeddingEncoder


from unstructured_ingest.processes.connectors.sql.clickzetta import (
    ClickzettaConnectionConfig,
    ClickzettaAccessConfig,
    ClickzettaUploadStagerConfig,
    ClickzettaUploaderConfig
)

In [19]:
# !rm -rf /Users/liangmo/.cache/unstructured/ingest/pipeline/*

In [None]:
# 修改 Pipeline 配置 - Cell bac33d0b
import os
import logging
import dashscope

# 设置 DashScope API Key
dashscope.api_key = api_key

# 设置更详细的日志级别
logging.basicConfig(level=logging.DEBUG, force=True)

print(f"🔍 配置检查:")
print(f"  DashScope API Key: {api_key[:10]}...")
print(f"  Embedding Provider: {embedding_provider}")
print(f"  Embedding Model: {embedding_model_name}")
print(f"  Embedding Dimensions: {embeddings_dimensions}")
print(f"  LOCAL_FILE_INPUT_DIR: {os.getenv('LOCAL_FILE_INPUT_DIR')}")

# 测试 DashScope 连接
try:
    print("\n🔍 测试 DashScope 连接...")
    from dashscope import TextEmbedding
    
    response = TextEmbedding.call(
        model=embedding_model_name,
        input="测试连接"
    )
    
    if response.status_code == 200:
        embedding = response.output['embeddings'][0]['embedding']
        print(f"✅ DashScope 连接成功！嵌入维度: {len(embedding)}")
    else:
        print(f"❌ DashScope 连接失败: {response.message}")
        
except Exception as e:
    print(f"❌ DashScope 连接失败: {e}")

# 先测试各个组件是否正常
try:
    print("\n🔍 测试 EmbedderConfig...")
    embedder_config = EmbedderConfig(
        embedding_provider="dashscope",
        embedding_model_name=embedding_model_name,
        embedding_api_key=api_key,
    )
    print("✅ EmbedderConfig 创建成功")
except Exception as e:
    print(f"❌ EmbedderConfig 创建失败: {e}")

try:
    print("\n🔍 测试 ClickzettaConnectionConfig...")
    clickzetta_config = ClickzettaConnectionConfig(
        access_config=ClickzettaAccessConfig(password=_password),
        username=_username,
        service=_service,
        instance=_instance,
        workspace=_workspace,
        schema=_schema,
        vcluster=_vcluster,
    )
    print("✅ ClickzettaConnectionConfig 创建成功")
except Exception as e:
    print(f"❌ ClickzettaConnectionConfig 创建失败: {e}")

try:
    print("\n🔍 测试 LocalIndexerConfig...")
    local_config = LocalIndexerConfig(
        input_path=os.getenv("LOCAL_FILE_INPUT_DIR"),
        file_glob="**/*", 
        recursive=True
    )
    print("✅ LocalIndexerConfig 创建成功")
except Exception as e:
    print(f"❌ LocalIndexerConfig 创建失败: {e}")



In [None]:
# 替换 Cell 0f194520 - 使用单进程调试模式
try:
    print("\n🔍 创建 Pipeline...")
    pipeline = Pipeline.from_configs(
        context=ProcessorConfig(
            verbose=True,
            tqdm=True,
            num_processes=1,  # 改为单进程调试
        ),

        indexer_config=LocalIndexerConfig(
            input_path=os.getenv("LOCAL_FILE_INPUT_DIR"),
            file_glob="**/*", 
            recursive=True
        ),
        downloader_config=LocalDownloaderConfig(),
        source_connection_config=LocalConnectionConfig(),

        partitioner_config=PartitionerConfig(
            partition_by_api=False,
            api_key=os.getenv("UNSTRUCTURED_API_KEY"),
            partition_endpoint=os.getenv("UNSTRUCTURED_URL"),
            strategy="hi_res",
            additional_partition_args={
                "split_pdf_page": True,
                "split_pdf_allow_failed": True,
                "split_pdf_concurrency_level": 1  # 减少并发
            }
        ),

        chunker_config=ChunkerConfig(
            chunking_strategy="by_title",
            chunk_max_characters=chunk_max_characters,
            chunk_overlap=chunk_overlap,
            chunk_combine_text_under_n_chars=200,
        ),

        # 使用 DashScope 嵌入器
        embedder_config=EmbedderConfig(
            embedding_provider="dashscope",
            embedding_model_name=embedding_model_name,
            embedding_api_key=api_key,
        ),

        destination_connection_config=ClickzettaConnectionConfig(
            access_config=ClickzettaAccessConfig(password=_password),
            username=_username,
            service=_service,
            instance=_instance,
            workspace=_workspace,
            schema=_schema,
            vcluster=_vcluster,
        ),
        stager_config=ClickzettaUploadStagerConfig(),
        uploader_config=ClickzettaUploaderConfig(
            table_name=raw_table_name, 
            documents_original_source="https://yunqi.tech/documents"
        ),
    )
    print("✅ Pipeline 创建成功")
    
    # 运行 pipeline
    print("\n🚀 运行 Pipeline...")
    pipeline.run()
    
except Exception as e:
    print(f"❌ Pipeline 创建或运行失败: {e}")
    import traceback
    traceback.print_exc()

### Clean/Transformation RAW table and Insert into Silver table

In [None]:
# You could excute more SQLs to clean and transform data before insert into Silver table.、
excute_sql(conn, clean_transformation_data_sql)

### Retrieve relevant documents from Singdata Lakehouse


In [None]:
# 修改检索函数中的维度检查 - Cell de07d5d7
import dashscope
from dashscope import TextEmbedding
import json

# 设置 DashScope API
dashscope.api_key = api_key

def get_embedding(query):
    """使用 DashScope 获取嵌入"""
    try:
        response = TextEmbedding.call(
            model=embedding_model_name,  # 现在是 text-embedding-v4
            input=query
        )
        if response.status_code == 200:
            embedding = response.output['embeddings'][0]['embedding']
            # 验证维度
            if len(embedding) != embeddings_dimensions:
                print(f"⚠️  警告: 实际嵌入维度 {len(embedding)} 与配置维度 {embeddings_dimensions} 不匹配")
            return embedding
        else:
            raise Exception(f"DashScope API error: {response.message}")
    except Exception as e:
        print(f"Error getting embedding: {e}")
        return [0.0] * embeddings_dimensions

def retrieve_documents(conn, query: str, num_results: int = 10):
    embedding = get_embedding(query)
    embedding_list = embedding  # DashScope 已经返回 list 格式
    
    with conn.cursor() as cur:
        stmt = f"""
            WITH 
            vector_embedding_result AS (
            SELECT
                "vector_embedding" as retrieve_method,
                record_locator,
                type,
                filename,
                text,
                orig_elements,
                cosine_distance(embeddings, cast({embedding_list} as vector({embeddings_dimensions}))) AS score
            FROM {silver_table_name}
            ORDER BY score ASC
            LIMIT {num_results} 
            )
            SELECT * FROM vector_embedding_result
            ORDER by score ASC;
        """

        cur.execute(stmt)
        results = cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        df = pd.DataFrame(results, columns=columns)
    return df

In [None]:
query_text = "创建索引的语法是什么？"
retrieve_documents_df = retrieve_documents(conn, query_text)
retrieve_documents_df

In [None]:
first_row__text = retrieve_documents_df.iloc[0]['text']
print(first_row__text)

In [None]:
filename = retrieve_documents_df.iloc[0]['filename']
with conn.cursor() as cur:

        stmt = f"""
            WITH 
            results AS (
            SELECT
                record_locator,
                type,
                filename,
                text,
                orig_elements,
            FROM {silver_table_name}
            WHERE filename = "{filename}"
            )
            SELECT    *  FROM      results;
        """

        cur.execute(stmt)

        results = cur.fetchall()
        columns = [desc[0] for desc in cur.description]  # Get column names from cursor description
        df = pd.DataFrame(results, columns=columns)
df

In [None]:
all_text = "".join(df["text"].astype(str).tolist())
print(all_text)

In [None]:
kb = "ClickZetta\云器\Singdata之间的关系:ClickZetta是云器、Singdata的技术品牌。云器是ClickZetta的中国商业公司，Singdata是ClickZetta的海外商业公司。所以在不区分商业和技术的时候，三者是等同的。另外云器经常也会用到Yunqi的汉语拼音名称。"
embedded_kb = get_embedding(kb)
embedded_kb = embedded_kb.tolist()
add_kb_sql = f"""
INSERT INTO quick_start.clickzetta_doc_kb.{silver_table_name} (
  id, type, record_id, element_id, filetype, last_modified, languages, text, embeddings, date_created, date_modified, date_processed
) VALUES (
  uuid(), 'UserInput', uuid(), uuid(), 'text', CURRENT_TIMESTAMP, '["zh-cn"]',
  '{kb}',
  CAST('{embedded_kb}' AS vector(float,{embeddings_dimensions})), CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
);
"""

In [None]:
with conn.cursor() as cur:
        cur.execute(add_kb_sql)