# Data Sinks, Document Stores, and 

Sinks and document stores are used to capture Kodexa documents as they pass through pipelines and can be used to save the processed Kodexa documents in memory or on disk.

When performing processess that result in multiple documents


## Setup our imports
1. We'll be build pipelines to process our document, so we'll import Kodexa's Pipeline module
2. Import all the connector types that we plan to try out
3. Some of our parsing will need to happen in the could, so we'll import the KodexaPlatform and KodexaAction modules
4. All files that have been processed/parsed in Kodexa become Kodexa Documents, so we'll import that module as well.

We're also setting the CLOUD_URL value to the platform environment on which we want to perform our processing.

In [1]:
from kodexa import Document, Pipeline, KodexaPlatform, KodexaAction, InMemoryDocumentSink, JsonDocumentStore, TableDataStore, DictDataStore

CLOUD_URL = 'https://platform.kodexa.com' 

## Set Platform Environment and Access Token Credential

In the next cell, you'll be prompted to enter your access token that you've created in the environment specified by the CLOUD_URL.
If you haven't created a token already, follow the steps in our [Getting Started](https://developer.kodexa.com/org-management/manage-access-token) guide.

* Note:  The text you enter in the prompt field will be masked.  Once you're done entering the access token value, hit enter to complete the action in the cell.

In [2]:
import getpass

ACCESS_TOKEN = getpass.getpass("Enter access token:")

KodexaPlatform.set_url(CLOUD_URL)
KodexaPlatform.set_access_token(ACCESS_TOKEN)

Enter access token: ································


In [3]:
import os

# Setting up location of data folders and files
DATA_FOLDER = '_data'
TEXT_FOLDER = 'texts'
JSON_STORE_FOLDER = 'json_doc_stores'
TEXT_DATA_FILE = 'tongue_twister.txt'

TEXT_FOLDER_PATH = os.path.join(os.getcwd(), '..', DATA_FOLDER, TEXT_FOLDER)
FULL_PATH = os.path.join(TEXT_FOLDER_PATH, TEXT_DATA_FILE)
JSON_STORE_PATH = os.path.join(os.getcwd(), '..', DATA_FOLDER, JSON_STORE_FOLDER, 'text_json_store')

## Using an InMemoryDocumentSink

In many of our other examples where we're only processing one document, we don't provide a sink, but instead
retrieve the last processed document from the pipeline's context (pipeline.context.output_document).  If you're only
processing one document and choose to provide a sink, the document avaiable from that sink and the document available
on the pipeline's context are the same.


In [4]:

sink = InMemoryDocumentSink()

pipeline = Pipeline.from_text("Well hello there!  It's so nice to see you!")
pipeline.set_sink(sink)
pipeline.run()

kodexa_doc = sink.get_document(0)
same_doc = pipeline.context.output_document

print('Is the document on the pipeline\'s context the same as that in the sink?\n')

if kodexa_doc.uuid == same_doc.uuid:
    print('Yes!  They are the same!')
else:
    print('Nope - totally different documents.\n')
    
print(f'\t{kodexa_doc.uuid}')
print(f'\t{same_doc.uuid}\n')


Is the document on the pipeline's context the same as that in the sink?

Yes!  They are the same!
	faae1970-ba4f-5436-b4e4-88dfc001d971
	faae1970-ba4f-5436-b4e4-88dfc001d971



### Using the InMemoryDocumentSink to hold multiple documents

In most of the Getting Started examples, we're processing only one document at a time.  This example
demonstrates that you can read all of files from a folder, process them with the same pipeline actions,
and return all of them in the pipeline's sink.

Here, we read all of the *.txt files from our sample data folder and verify that they are both available on the
sink.  We can also see that the pipeline's context only contains the last document processsed.


In [5]:
from kodexa import FolderConnector

sink = InMemoryDocumentSink()
pipeline = Pipeline(FolderConnector(path=TEXT_FOLDER_PATH, file_filter='*.txt'))
pipeline.add_step(KodexaAction(slug='kodexa/text-parser', options={}, attach_source=True))
pipeline.set_sink(sink)

# Run the pipeline and get the pipeline's context.  We'll then get the last processed document from the context
pipeline.run()
context_kodexa_doc = pipeline.context.output_document

# check the number of documents returned on the sink
print(f'There are {len(sink.documents)} documents in the sink.')

print('Is the document on the pipeline\'s context the same as the last one in the sink?\n')

if context_kodexa_doc.uuid == sink.get_document(1).uuid:
    print('Yes!  They are the same!')
else:
    print('Nope - totally different documents.\n')
    
print(f'\t{context_kodexa_doc.uuid}')
print(f'\t{sink.get_document(1).uuid}\n')


There are 2 documents in the sink.
Is the document on the pipeline's context the same as the last one in the sink?

Yes!  They are the same!
	8228120f-666a-4255-ab91-e37c702b07d1
	8228120f-666a-4255-ab91-e37c702b07d1



## Using the JsonDocumentStore

The JsonDocumentStore allows you to store parsed or processed documents and can be used as a connector, a sink, or as a stand-alone saved file.  When persisted to disk, the JsonDocumentStore is saved in JSON format.

If you want to delete stored documents in an existing, populated JsonDocumentStore, use the force_initialize parameter.  It will remove all of the documents within the store and clear out the index of document ids.

### The JsonDocumentStore as a sink

In this example, we'll use the JsonDocumentStore in the same way we used the InMemoryDocumentSink. 
We'll read all of the text (*.txt) files from our sample data and verify the store contains our documents.


In [6]:

# instantiate the store and provide the path to which we'd want to persist the file.
# Passing in force_initialize as TRUE to ensure any old data is overwritten
json_doc_store = JsonDocumentStore(store_path=JSON_STORE_PATH, force_initialize=True)

pipeline = Pipeline(FolderConnector(path=TEXT_FOLDER_PATH, file_filter='*.txt'))
pipeline.add_step(KodexaAction(slug='kodexa/text-parser', options={}, attach_source=True))
pipeline.set_sink(json_doc_store)

# Run the pipeline and get the pipeline's context.  We'll then get the last processed document from the context
pipeline.run()
context_kodexa_doc = pipeline.context.output_document

# check the number of documents returned on the sink
print(f'There are {json_doc_store.count()} documents in the store.')

print('Is the document on the pipeline\'s context the same as the last one in the sink?\n')

if context_kodexa_doc.uuid == json_doc_store.get_document(1).uuid:
    print('Yes!  They are the same!')
else:
    print('Nope - totally different documents.\n')

print(f'\t{context_kodexa_doc.uuid}')
print(f'\t{json_doc_store.get_document(1).uuid}\n')



There are 2 documents in the sink.
Is the document on the pipeline's context the same as the last one in the sink?

Yes!  They are the same!
	dac2ba48-f28d-4e9f-b786-14c950cc7d37
	dac2ba48-f28d-4e9f-b786-14c950cc7d37



### View JsonDocumentStore output

Once you've processed the pipeline above, you will be able to view the folder containing the JsonDocumentStore and inspect the output.  You'll find a file named index.json which contains the uuids for each of the documents in the store, as well as a file for each processed Kodexa document.

## Use the JsonDocumentStore as a Connector

With our text files parsed and saved in the JSON_STORE_PATH, we can now construct a new pipeline and read those documents.
Since the documents were parsed prior to them being stored in the JsonDocumentStore, we won't have to parse them again.

In [10]:

# Creating a connector using the JsonDocumentStore with the path to which we've already written our ouput
connector = JsonDocumentStore(store_path=JSON_STORE_PATH)

# how many documents are in the store?
print(f'There are {connector.count()} documents in the connector.')

# Since our JsonDocumentStore contains more than one document, we'll need a sink to access all of them.
sink = InMemoryDocumentSink()


pipeline = Pipeline(connector)
pipeline.set_sink(sink)
pipeline.run()

# check the number of documents returned on the sink
print(f'There are {len(sink.documents)} documents in the sink.')

print('Is the document on the pipeline\'s context the same as the last one in the sink?\n')

if context_kodexa_doc.uuid == sink.get_document(1).uuid:
    print('Yes!  They are the same!')
else:
    print('Nope - totally different documents.\n')
    
print(f'\t{context_kodexa_doc.uuid}')
print(f'\t{sink.get_document(1).uuid}\n')


There are 2 documents in the connector.
There are 2 documents in the sink.
Is the document on the pipeline's context the same as the last one in the sink?

Yes!  They are the same!
	dac2ba48-f28d-4e9f-b786-14c950cc7d37
	dac2ba48-f28d-4e9f-b786-14c950cc7d37



## Using a TableDataStore


In [1]:
## Demo coming soon!

## Using a DictDataStore

In [2]:
## Demo coming soon!