## From Documents to Data Pipelines

<img src="https://media.licdn.com/dms/image/v2/D560BAQGfWyKpeTmnWQ/company-logo_200_200/company-logo_200_200/0/1696531256111/unstructuredio_logo?e=2147483647&v=beta&t=elR2hMshiUs7scMPNfrWRfT_vQ07cEr45qY19nnCYbo" alt="Unstructured" width="200"/>
<img src="https://mms.businesswire.com/media/20230228005627/en/1724877/22/datastax-logo_%282%29.jpg" alt="DataStax" width="400"/>

### Leveraging Unstructured.io in an Astra DB AI Workflow

We've seen the power of [Unstructured.io](https://unstructured.io) and how it allows us to go from real-world data formats, messy as they may be, to something that is suitable for building RAG pipelines, AI applications, and tools that typically require clean, tabular, structured data. It's time to see it in action.

#### Unstructured Open Source

Let's get our feet wet by using Unstructured Open Source to parse a PDF. The Open Source library requires some dependencies in order to handle the parsing of files, particularly in the case of PDFs. Below installs these dependencies as well as some Astra packages we'll need later.

In [None]:
!apt-get install -y poppler-utils tesseract-ocr
%pip install -U python-dotenv nltk "unstructured[astradb]" "unstructured[pdf]" "unstructured-ingest[astradb]" langchain-astradb langchain-openai

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  tesseract-ocr-eng tesseract-ocr-osd
The following NEW packages will be installed:
  poppler-utils tesseract-ocr tesseract-ocr-eng tesseract-ocr-osd
0 upgraded, 4 newly installed, 0 to remove and 49 not upgraded.
Need to get 5,002 kB of archives.
After this operation, 16.3 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 poppler-utils amd64 22.02.0-2ubuntu0.5 [186 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tesseract-ocr-eng all 1:4.00~git30-7274cfa-1.1 [1,591 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tesseract-ocr-osd all 1:4.00~git30-7274cfa-1.1 [2,990 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tesseract-ocr amd64 4.1.1-2.1build1 [236 kB]
Fetched 5,002 kB in 1s (6,170 kB/s)
Selecting previously unselected package popp

Now, we can use the `partition` function and pass in... lots of things! For this, we will pass in a URL to an important paper in the development of LLMs, from arXiv.

In [None]:
from unstructured.partition.auto import partition

elements = partition(
    url="https://arxiv.org/pdf/1706.03762",
)

elements

[<unstructured.documents.elements.Text at 0x7a466abaf5e0>,
 <unstructured.documents.elements.NarrativeText at 0x7a4716790a60>,
 <unstructured.documents.elements.Title at 0x7a466abde050>,
 <unstructured.documents.elements.Text at 0x7a466abdee00>,
 <unstructured.documents.elements.NarrativeText at 0x7a466abde1a0>,
 <unstructured.documents.elements.Title at 0x7a466abde230>,
 <unstructured.documents.elements.Title at 0x7a466abde350>,
 <unstructured.documents.elements.Title at 0x7a466abde410>,
 <unstructured.documents.elements.Title at 0x7a466abde530>,
 <unstructured.documents.elements.Title at 0x7a466abde650>,
 <unstructured.documents.elements.Title at 0x7a466abde770>,
 <unstructured.documents.elements.Title at 0x7a466abdec80>,
 <unstructured.documents.elements.Title at 0x7a466abde950>,
 <unstructured.documents.elements.Title at 0x7a466abdea70>,
 <unstructured.documents.elements.Title at 0x7a466abdeb90>,
 <unstructured.documents.elements.NarrativeText at 0x7a466abdedd0>,
 <unstructured.doc

In [None]:
print(elements[4].text)
elements[5].text

Provided proper attribution is provided, Google hereby grants permission to reproduce the tables and figures in this paper solely for use in journalistic or scholarly works.


'Attention Is All You Need'

#### Unstructured with Astra DB: Destination Connector

Now, we've seen how incredibly simple it is to parse a document with Unstructured into structured text. With [destination connectors](https://docs.unstructured.io/open-source/ingest/destination-connectors/astradb), we can quickly build a pipeline which goes from a document or set of documents into the creation of a brand new Astra DB Collection, suitable for AI workflows.

First, we need to set some environment variables to *connect to Astra DB*:

- ASTRA_DB_API_ENDPOINT
- ASTRA_DB_APPLICATION_TOKEN
- ASTRA_DB_COLLECTION
- ASTRA_DB_EMBEDDING_DIMENSIONS
- OPENAI_API_KEY

In [None]:
import os
from dotenv import load_dotenv
from google.colab import drive

drive.mount('/content/drive')

load_dotenv('/content/drive/MyDrive/.env')

print(os.getenv("ASTRA_DB_EMBEDDING_DIMENSIONS"))
os.getenv("ASTRA_DB_COLLECTION")

Mounted at /content/drive
1536


'unstructured_stream'

Now, we can build a full Unstructured pipeline to directly ingest data into our Astra DB Collection! Note that there are LOTS of configuration options, ranging from the document partitioning stage, to the embedding configuration, to the chunking strategy. We don't have time to cover all, but [see here](https://docs.unstructured.io/api-reference/ingest/ingest-configuration/overview) for information on all of it.

In [None]:
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig

from unstructured_ingest.v2.processes.connectors.astradb import (
    AstraDBConnectionConfig,
    AstraDBAccessConfig,
    AstraDBUploadStagerConfig,
    AstraDBUploaderConfig
)
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.embedder import EmbedderConfig

# Chunking and embedding are optional.

if __name__ == "__main__":
    Pipeline.from_configs(
        context=ProcessorConfig(),
        indexer_config=LocalIndexerConfig(input_path="/content/drive/MyDrive/Colab Notebooks/unstructured_stream_data"),
        downloader_config=LocalDownloaderConfig(),
        source_connection_config=LocalConnectionConfig(),
        partitioner_config=PartitionerConfig(
            partition_by_api=False,
            strategy="hi_res",
            additional_partition_args={
                "split_pdf_page": True,
                "split_pdf_allow_failed": True,
                "split_pdf_concurrency_level": 15
            }
        ),
        chunker_config=ChunkerConfig(chunking_strategy="by_title"),
        embedder_config=EmbedderConfig(
            embedding_provider="langchain-openai",
            embedding_api_key=os.getenv("OPENAI_API_KEY")
        ),
        destination_connection_config=AstraDBConnectionConfig(
            access_config=AstraDBAccessConfig(
                api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT"),
                token=os.getenv("ASTRA_DB_APPLICATION_TOKEN")
            )
        ),
        stager_config=AstraDBUploadStagerConfig(),
        uploader_config=AstraDBUploaderConfig(
            namespace=os.getenv("ASTRA_DB_KEYSPACE"),
            collection_name=os.getenv("ASTRA_DB_COLLECTION"),
            embedding_dimension=os.getenv("ASTRA_DB_EMBEDDING_DIMENSIONS")
        )
    ).run()

2024-09-18 21:43:48,598 MainProcess INFO     created index with configs: {"input_path": "/content/drive/MyDrive/Colab Notebooks/unstructured_stream_data", "recursive": false}, connection configs: {"access_config": "**********"}
2024-09-18 21:43:48,601 MainProcess INFO     Created download with configs: {"download_dir": null}, connection configs: {"access_config": "**********"}
2024-09-18 21:43:48,603 MainProcess INFO     created partition with configs: {"strategy": "hi_res", "ocr_languages": null, "encoding": null, "additional_partition_args": {"split_pdf_page": true, "split_pdf_allow_failed": true, "split_pdf_concurrency_level": 15}, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructured.io/general/v0/general", "partition_by_api": false, "api_key": null, "hi_res_model_name": null}
2024-09-18 21:43:48,607 MainPro

yolox_l0.05.onnx:   0%|          | 0.00/217M [00:00<?, ?B/s]

2024-09-18 21:44:45,132 MainProcess INFO     partition finished in 39.745754546s, attributes: file_id=e2415d376588
2024-09-18 21:44:45,136 MainProcess INFO     partition finished in 39.761778784s, attributes: file_id=e2415d376588
2024-09-18 21:44:45,138 MainProcess INFO     partition step finished in 39.769300818s
2024-09-18 21:44:45,140 MainProcess INFO     calling ChunkStep with 1 docs
2024-09-18 21:44:45,142 MainProcess INFO     processing content across processes
2024-09-18 21:44:45,144 MainProcess INFO     processing content serially
2024-09-18 21:44:45,201 MainProcess INFO     chunk finished in 0.045958318s, attributes: file_id=e2415d376588
2024-09-18 21:44:45,206 MainProcess INFO     chunk finished in 0.058943921s, attributes: file_id=e2415d376588
2024-09-18 21:44:45,207 MainProcess INFO     chunk step finished in 0.067279292s
2024-09-18 21:44:45,209 MainProcess INFO     calling EmbedStep with 1 docs
2024-09-18 21:44:45,210 MainProcess INFO     processing content across processe

Now we have everything we need to build a Retrieval Augmented Generation (RAG) pipeline based on the ingested document(s)!

In [None]:
from langchain_astradb import AstraDBVectorStore
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vector_store = AstraDBVectorStore(
    collection_name=os.getenv("ASTRA_DB_COLLECTION"),
    embedding=embeddings,
    api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT"),
    token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
    namespace=os.getenv("ASTRA_DB_KEYSPACE"),
)

results = vector_store.similarity_search_with_score(
    "German Translation", k=1,
)

for res, score in results:
    print(f"* [SIM={score:3f}] {res.page_content} [{res.metadata}]")

* [SIM=0.524952] more parallelizable and requiring signiﬁcantly less time to train. Our model achieves 28.4 BLEU on the WMT 2014 English- to-German translation task, improving over the existing best results, including ensembles, by over 2 BLEU. On the WMT 2014 English-to-French translation task, our model establishes a new single-model state-of-the-art BLEU score of 41.8 after training for 3.5 days on eight GPUs, a small fraction of the training costs of the best models from the literature. We show that the [{'type': 'CompositeElement', 'element_id': '8d30a910be883ee4ed1773a224e2fd77', 'metadata': {'data_source': {'record_locator': {'path': '/content/drive/MyDrive/Colab Notebooks/unstructured_stream_data/1706.03762v7_sub.pdf'}, 'date_modified': '1726674414.0', 'date_processed': '1726695845.3220367', 'permissions_data': [{'mode': 33152}]}, 'file_directory': '/content/drive/MyDrive/Colab Notebooks/unstructured_stream_data', 'filename': '1706.03762v7_sub.pdf', 'filetype': 'application/pdf

### Putting it all Together: Unstructured in DataStax Langflow!

<img src="https://drive.google.com/uc?export=view&id=19kDUW2QFuQJYxTrfLeJH1ckMCfWr0pat" alt="Unstructured in Langflow" width="1500"/>