In [None]:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
# Insert your project token block by clicking the vertical ellipses on the notebook and click "Insert project token"
from project_lib import Project
project = Project(project_id='{project-id}', project_access_token='{project-access-token}')
pc = project.project_context

# Watson Discovery Service Document Reingestion
By [Morgan Langlais](https://github.com/modlanglais/)


In [None]:
%%capture
!pip install ibm-watson
!pip install bs4
!pip install pandas

In [None]:
from ibm_watson import DiscoveryV1

import json
import csv
import json
import threading
from bs4 import BeautifulSoup
import pandas
import time
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
import http.client
import socket
from ibm_watson import ApiException

beginTime = time.time()

# @hidden_cell
# Insert the appropriate Discovery credentials here
environmentId = "{environment-id}"
collectionId = "{collection-id}"
discovery = DiscoveryV1(
    '2019-04-30',
    iam_apikey="{api-key}")

collection = discovery.get_collection(environmentId, collectionId).get_result()
totalDocuments = collection['document_counts']['available']
print("**Total number of documents in collection " + collectionId + ": " + str(totalDocuments))

## This section gets a list of all the document IDs in a given collection

In [None]:
def pmap_helper(fn, output_list, input_list, i):
    output_list[i] = fn(input_list[i])

def pmap(fn, input):
    input_list = list(input)
    output_list = [None for _ in range(len(input_list))]
    threads = [threading.Thread(target=pmap_helper,
                                args=(fn, output_list, input_list, i),
                                daemon=True)
               for i in range(len(input_list))]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return output_list

def all_document_ids(discovery,
                     environmentId,
                     collectionId):
    """
    Return a list of all of the document ids found in a
    Watson Discovery collection.

    The arguments to this function are:
    discovery      - an instance of DiscoveryV1
    environment_id - an environment id found in your Discovery instance
    collection_id  - a collection id found in the environment above
    """
    doc_ids = []
    alphabet = "0123456789abcdef"   # Hexadecimal digits, lowercase
    chunk_size = 10000

    def maybe_some_ids(prefix):
        """
        A helper function that does the query and returns either:
        1) A list of document ids
        2) The `prefix` that needs to be subdivided into more focused queries
        """
        need_results = True
        while need_results:
            try:
                response = discovery.query(environmentId,
                                           collectionId,
                                           count=chunk_size,
                                           filter="extracted_metadata.sha1::"
                                           + prefix + "*",
                                           return_fields="extracted_metadata.sha1").get_result()
                need_results = False
            except Exception as e:
                print("will retry after error", e)

        if response["matching_results"] > chunk_size:
            return prefix
        else:
            return [item["id"] for item in response["results"]]

    prefixes_to_process = [""]
    while prefixes_to_process:
        prefix = prefixes_to_process.pop(0)
        prefixes = [prefix + letter for letter in alphabet]
        # `pmap` here does the requests to Discovery concurrently to save time.
        results = pmap(maybe_some_ids, prefixes)
        for result in results:
            if isinstance(result, list):
                doc_ids += result
            else:
                prefixes_to_process.append(result)

    return doc_ids


allDocIds = all_document_ids(discovery,
                           environmentId,
                           collectionId)
    
df = pandas.DataFrame(allDocIds)
projectFileName = 'allDocIds-' + collectionId +  '.csv'
project.save_data(projectFileName, df.to_csv(header=None, index=None), set_project_asset=True, overwrite=True)

## Iterates through each document in the collection and transforms the document according to the block of code below

In [None]:
# To iterate over every collection in an environment, uncomment and wrap the below code in this:
# allCollections = discovery.list_collections(environmentId).get_result()['collections']
# for collection in allCollections:
    # collectionId = collection['collection_id']
    # Move the code block below to here, be mindful of indentions to keep inside this loop
    
#################################################
startTime = time.time()
counter = 0
filesNotAdded = ['These documents encountered an error during the document transformation process. Note there may be duplicates. See notebook for more information.']

def doStuff(documentId):
    global counter
    counter = counter + 1
    print("Starting (" + str(counter) + "/" + str(len(allDocIds)) + ")...")
    filterId = '_id:' + documentId

    # 1.) Get document from Discovery collection
    try:
        discQuery = discovery.query(environmentId, collectionId, filter=filterId).get_result()['results'][0]
    except ApiException as ex:
        print("Some error occured on document #" + str(documentId))
        print("Query failed with status code " + str(ex.code) + ": " + ex.message)
        filesNotAdded.append(documentId + ": " + str(ex.code) + " " + str(ex.message))

    filename = discQuery['extracted_metadata']['filename']
    filetype = discQuery['extracted_metadata']['file_type']

    #****************TRANSFORM HERE****************#
    # Make modifications to change your document however you would like.
    transformedDoc = discQuery
    # In this case, removing the metadata field
    if 'metadata' in transformedDoc:
        metadatafield = transformedDoc['metadata']
        for field in metadatafield:
            transformedDoc[field] = metadatafield[field]
        del transformedDoc['metadata']

  #******************END TRANSFORM******************#

    # This can be deleted from the data assets AFTER processing the notebook is completed. It will be re-created with each run.
    projectFileName = 'doNotDelete' + collectionId + '.json'
    project.save_data(projectFileName, json.dumps(transformedDoc), set_project_asset=True, overwrite=True)

    # 3.) Push updated document to Discovery.
    try:
        discUpdate = discovery.update_document(environmentId, collectionId, documentId, file=project.get_file(projectFileName), filename=filename, accept_json=True).get_result()
    except ApiException as ex:
        print("Some error occured on document #" + str(documentId))
        print("Update Document failed with status code " + str(ex.code) + ": " + ex.message)
        filesNotAdded.append(documentId + ": " + str(ex.code) + " " + str(ex.message))
        
with PoolExecutor(max_workers=32) as executor:
    for _ in executor.map(doStuff, allDocIds):
        pass
    
dataf = pandas.DataFrame(filesNotAdded)
projectNotAddedFileName = 'FilesNotAdded' + collectionId + '.csv'
project.save_data(projectNotAddedFileName, dataf.to_csv(header=None, index=None), set_project_asset=True, overwrite=True)
#################################################

endTime = time.time()
print("Updating " + str(len(allDocIds)) + " documents took " + str(endTime - startTime) + " seconds")

## Get each document status
#### Note: Filename may appear as `null` in the results, but this is normal and expected as the Discovery .update() function takes some time

In [None]:
documentSuccesses = ['Successfully updated']
documentFailures = ['Failed to update']

def getStatus(documentId):
    filterId = '_id:' + documentId
    try:
        discQueryStatus = discovery.get_document_status(environmentId, collectionId, documentId).get_result()
    except ApiException as ex:
        print("Query failed with status code " + str(ex.code) + ": " + ex.message)
        
    if discQueryStatus['status'] == 'failed':
        print (documentId + "failure. See Discovery UI for details.")
        documentFailures.append(documentId)
    else:
        documentSuccesses.append(documentId)
        
with PoolExecutor(max_workers=16) as executor:
    for _ in executor.map(getStatus, allDocIds):
        pass
        
dataf = pandas.DataFrame(documentFailures)
datas = pandas.DataFrame(documentSuccesses)
projectSuccFileName = 'DocumentSuccess' + collectionId + '.csv'
projectFailFileName = 'DocumentFail' + collectionId + '.csv'
project.save_data(projectSuccFileName, datas.to_csv(header=None, index=None), set_project_asset=True, overwrite=True)
project.save_data(projectFailFileName, dataf.to_csv(header=None, index=None), set_project_asset=True, overwrite=True)