This Notebook contains information on how to extract content from an Azure Cognitive Search index

If you have not already done so, please install the Auzre Cognitive Search Python SDK:
    !pip install azure-search-documents

Important - Please Read
Search indexes are different from other datastores in that it is really hard to extract all content from the store. Due to the way that search indexes are constantly ranking and scoring results, paging through search results or even using continuation tokes as this tool does has the possibility of missing data during data extraction. As an example, lets say you search for all documents, and there is a document with ID 101 that is part of page 5 of the search results. As you start extracting data from page to page as you move from page 4 to page 5, it is possible that now ID 101 is actually now part of page 4, meaning that when you look at page 5, it is no longer there and you just missed that document.

For that reason, this tool keeps a count of the ID's of the keys extracted and will do a comparison to the count of documents in the Azure Search index to make sure they match. Although this does not provide a perfect solution, it does help reduce the chance of missing data.

Also, as an extra precaution, it is best if there are no changes being made and the search index is in a steady state during this extraction phase.

In [None]:
import os
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes import SearchIndexClient 
from azure.search.documents import SearchClient
from azure.search.documents.indexes.models import (
    ComplexField,
    CorsOptions,
    SearchIndex,
    ScoringProfile,
    SearchFieldDataType,
    SimpleField,
    SearchableField
)
from pathlib import Path
from shutil import rmtree

import math
import base64

from joblib import Parallel, delayed
import multiprocessing
import threading

import pickle
import json


In [None]:
# This sample uses version: 11.1.0b3
!pip show azure-search-documents

In [None]:
# Set the service endpoint and API key from the environment

# service_name = "SEARCH_ENDPOINT"
# admin_key = "SEARCH_API_KEY"
# index_name = "SEARCH_INDEX_NAME_TO_BE_RESTORED"

# Set the location where data will be backed up - this will be deleted and re-created
# output_dir = "/datadrive2/search-backup"

# Set the facet field - this should be a facetable field where no one single value has more than
# 100K documents associated with it
# facet_field = "FACET_FIELD"


In [None]:
api_version = '2020-06-30'


# Create an SDK client
endpoint = "https://{}.search.windows.net/".format(service_name)
admin_client = SearchIndexClient(endpoint=endpoint,
                      index_name=index_name,
                      credential=AzureKeyCredential(admin_key),
                      api_version=api_version)

search_client = SearchClient(endpoint=endpoint,
                      index_name=index_name,
                      credential=AzureKeyCredential(admin_key),
                      api_version=api_version)

valid_facet_types = ['Edm.String']


In [None]:
# Reset the output directory where data will be backed up
output_path = Path(output_dir)
if output_path.exists():
    rmtree(output_path)

output_path_data = Path(os.path.join(output_dir, 'data'))
output_path_data.mkdir(parents=True)                   

In [None]:
# Get all fields in the index
index_schema = admin_client.get_index(index_name)
fields = []
for field in index_schema.fields:
    fields.append(field.name)

print ('Found Fields:', fields)

In [None]:
# Validate the chosen facet is a correct type    
facet_values = {}
facet_too_large = False
large_facet_str = ''

valid_facet = False
for field in index_schema.fields:
    if field.name == facet_field:
        if field.type in valid_facet_types:
            valid_facet = True
            break
            
if valid_facet == False:
    print ('Error: Please choose a facet field that is one of', valid_facet_types)
else:
    print ('Validated facet field is of correct type')   


In [None]:
#Write the schema to the output dir
with open(os.path.join(output_dir, 'schema.pkl'), 'wb') as schema_out:
    pickle.dump(index_schema, schema_out)

In [None]:
# Get all the possible data values for this facet
results = search_client.search(search_text="*", facets=[facet_field + ",count:0"], top=0)

facet_values = {}
facet_too_large = False
large_facet_str = ''

for facet in results.get_facets()[facet_field]:
    if facet['count'] > 100000:
        facet_too_large = True
        large_facet_str = '"' + facet['value'] + '" has ' + str(facet['count']) + ' documents'
        break
    facet_values[facet['value']] = facet['count']

if facet_too_large == False:
    print ('Found', len(facet_values), 'facet values')
else:
    print ('Error - Facet has to many documents:', large_facet_str)

In [None]:
# Extract Data to data output dir
def process_facet(f):
    estimated_docs_to_extract = facet_values[f]
    total_pages = math.ceil(estimated_docs_to_extract / page_size)
    print ('Extracting', estimated_docs_to_extract, 'values for facet:', f, 'with', total_pages, 'total page(s)...')
    for page in range(total_pages):
        results =  search_client.search(search_text="*",filter=facet_field + " eq '" + f + "'", top=page_size)
        jsonobj = []
        for result in results:
            del result['@search.score']
            del result['@search.highlights']
            jsonobj.append(result)
            file_name = base64.urlsafe_b64encode(f.encode()).decode()
            with open(os.path.join(output_path_data, file_name + '-' + str(page) + '.json'), 'w') as data_out:
                json.dump(jsonobj, data_out)


In [None]:
page_size = 1000

num_processes = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_processes, verbose=50)(delayed(
    process_facet)(f) for f in facet_values)

# for f in facet_values:
#     process_facet(f)


In [None]:
# Go through the extracted content and validate they can be loaded, as well as to get counts
files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(os.path.join(output_dir, 'data')) for f in filenames]
data_files = []
error_files = []
for file in files:
    if 'error-' in file:
        error_files.append(file)
    else:
        data_files.append(file)

# Get count of data extracted
doc_counter = 0
error_counter = len(error_files)

for file in data_files:
    with open(file, "r") as f_in:
        data = json.loads(f_in.read())
        doc_counter += len(data)

print ('Total Documents Exported:', doc_counter)
print ('Total Documents Failed to Export:', error_counter)
