In [None]:
# Take all JSON from Blob Container and upload to Azure Search

In [1]:
import globals

import json
import requests
from pprint import pprint

from azure.storage.blob import BlockBlobService

from joblib import Parallel, delayed

In [2]:
def processFile(blob_name):
    json_content = {}
    content = block_blob_service.get_blob_to_text(globals.blob_container_name, blob_name).content
    try:
        json_content = json.loads(content)
        docID = json_content["paper_id"]
        title = json_content["metadata"]["title"]

        body = {"documents": []}
        
        abstractContent = ''
        id_counter = 1
        if "abstract" in json_content:
            for c in json_content["abstract"]:
                abstractContent += c["text"] + ' '
                body["documents"].append({
                      "language": "en",
                      "id": str(id_counter),
                      "text": c["text"]
                    })
                id_counter += 1

        abstractContent = abstractContent.strip()

        body = ''
        if "body_text" in json_content:
            for c in json_content["body_text"]:
                body += c["text"] + ' '
            body = body.strip()

        contributors = []
        for c in json_content["metadata"]["authors"]:
            midInitial = ''
            for mi in c["middle"]:
                midInitial += mi + ' '
            if len(((c["first"] + ' ' + midInitial + c["last"]).strip())) > 2:
                contributors.append((c["first"] + ' ' + midInitial + c["last"]).strip()) 

        return {"@search.action": "upload", "docID": docID, "title":title, "abstractContent": abstractContent, "body": body, "contributors": contributors}


    except Exception as ex:
        print (blob_name, " - Error:", str(ex))
        return "Error"

def checkIfDocExistsInIndex(blob_batch):
    try:
        data = {
            "select":"docID",
            "top":1000,
            "filter": "search.in(docID, '" + blob_batch + "', ',')"
        }
        url = globals.endpoint + "indexes/" + globals.indexName + "/docs/search" + globals.api_version
#         print (url)
        response  = requests.post(url, headers=globals.headers, json=data)
        return response.json()
#         if response.status_code == 200:
#             return True
    except Exception as ex:
        return None
    return None

In [3]:
# Get all files to be procesed
block_blob_service = BlockBlobService(account_name=globals.blob_account_name, account_key=globals.blob_account_key)
generator = block_blob_service.list_blobs(globals.blob_container_name, globals.blob_container_path)

# Add Blobs to a list
blobs = []
for blob in generator:
    blobs.append(blob.name)

In [4]:
# Get a list of blobs that are not already in the index
print ("Finding documents not already in index...")
new_blobs = []
counter = 0
total_blobs = len(blobs)
blob_batch = []
blob_batch_full_path = []
documents = {"value": []}

for blob in blobs:
    counter += 1
    docID = blob[blob.rindex('/')+1:].replace('.json', '')
    blob_batch.append(docID)
    blob_batch_full_path.append(blob)
    if len(blob_batch) % 100 == 0:
        print ("Checking", counter, "of", total_blobs)
        existing_docs = []
        json_resp = checkIfDocExistsInIndex(','.join(blob_batch))
        for doc in json_resp['value']:
            existing_docs.append(doc['docID'])
        # Get all the docIDs that are new
        new_docs = list(set(blob_batch) - set(existing_docs))
        if len(new_docs) > 0:
            # new map these 
            documents = {"value": []}
            for b1 in blob_batch_full_path:
                if b1[b1.rindex('/')+1:].replace('.json', '') in new_docs:
                    doc_json = processFile(b1)
                    if doc_json != "Error":
                        documents["value"].append(doc_json)
            if len(documents["value"]) > 0:
                print ("Applying docs...")
                url = globals.endpoint + "indexes/" + globals.indexName + "/docs/index" + globals.api_version
                response  = requests.post(url, headers=globals.headers, json=documents)
        documents = {"value": []}
        blob_batch = []
        blob_batch_full_path = []
        
if len(blob_batch) > 0:
    print ("Checking", counter, "of", total_blobs)
    existing_docs = []
    json_resp = checkIfDocExistsInIndex(','.join(blob_batch))
    for doc in json_resp['value']:
        existing_docs.append(doc['docID'])
    # Get all the docIDs that are new
    new_docs = list(set(blob_batch) - set(existing_docs))
    if len(new_docs) > 0:
        # new map these 
        documents = {"value": []}
        for b1 in blob_batch_full_path:
            if b1[b1.rindex('/')+1:].replace('.json', '') in new_docs:
                doc_json = processFile(b1)
                if doc_json != "Error":
                    documents["value"].append(doc_json)
        if len(documents["value"]) > 0:
            print ("Applying docs...")
            url = globals.endpoint + "indexes/" + globals.indexName + "/docs/index" + globals.api_version
            response  = requests.post(url, headers=globals.headers, json=documents)
    documents = {"value": []}
    blob_batch = []
    blob_batch_full_path = []

print ("Checked", counter, "of", total_blobs)


Finding documents not already in index...
Checking 100 of 106289
Applying docs...
Checking 200 of 106289
Applying docs...
Checking 300 of 106289
Applying docs...
Checking 400 of 106289


KeyboardInterrupt: 