In [1]:
import os
import json
import requests
from pprint import pprint
from app.credentials import (
    DATASOURCE_CONNECTION_STRING,
    AZURE_SEARCH_API_VERSION,
    AZURE_SEARCH_ENDPOINT,
    AZURE_SEARCH_KEY,
    COG_SERVICES_NAME,
    COG_SERVICES_KEY
)

In [2]:
# Define the names for the data source, skillset, index and indexer
blob_container_name = "snowflake-data"
datasource_name = "cogsrch-snowflake-datasource-files"
skillset_name = "cogsrch-snowflake-skillset-files"
index_name = "cogsrch-snowflake-index-files"
indexer_name = "cogsrch-snowflake-indexer-files"

In [3]:
# Setup the Payloads header
headers = {'Content-Type': 'application/json','api-key': AZURE_SEARCH_KEY}
params = {'api-version': AZURE_SEARCH_API_VERSION}

## Create Data Source (Blob container with the Arxiv CS pdfs)

In [4]:
# Create a data source
# You should already have a blob container that contains the sample data, see app/credentials.py

datasource_payload = {
    "name": datasource_name,
    "description": "Demo files to demonstrate cognitive search capabilities.",
    "type": "azureblob",
    "credentials": {
        "connectionString": DATASOURCE_CONNECTION_STRING
    },
    "container": {
        "name": blob_container_name
    }
}
r = requests.put(AZURE_SEARCH_ENDPOINT + "/datasources/" + datasource_name,
                 data=json.dumps(datasource_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


In [5]:
# If you have a 403 code, probably you have a wrong endpoint or key, you can debug by uncomment this
# r.text

## Create Skillset - OCR, Text Splitter, Language Detection, KeyPhrase extraction, Entity Recognition

We need to create now the skillset. This is a set of steps in which we use many Cognitive Services to enrich the documents by extracting information, applying OCR, translating, etc.

https://learn.microsoft.com/en-us/azure/search/cognitive-search-working-with-skillsets

https://learn.microsoft.com/en-us/azure/search/cognitive-search-predefined-skills


In [6]:
# Create a skillset
skillset_payload = {
    "name": skillset_name,
    "description": "Extract entities, detect language and extract key-phrases",
    "skills":
    [
        {
            "@odata.type": "#Microsoft.Skills.Vision.OcrSkill",
            "description": "Extract text (plain and structured) from image.",
            "context": "/document/normalized_images/*",
            "defaultLanguageCode": "en",
            "detectOrientation": True,
            "inputs": [
                {
                  "name": "image",
                  "source": "/document/normalized_images/*"
                }
            ],
                "outputs": [
                {
                  "name": "text",
                  "targetName" : "images_text"
                }
            ]
        },
        {
            "@odata.type": "#Microsoft.Skills.Text.MergeSkill",
            "description": "Create merged_text, which includes all the textual representation of each image inserted at the right location in the content field. This is useful for PDF and other file formats that supported embedded images.",
            "context": "/document",
            "insertPreTag": " ",
            "insertPostTag": " ",
            "inputs": [
                {
                  "name":"text", "source": "/document/content"
                },
                {
                  "name": "itemsToInsert", "source": "/document/normalized_images/*/images_text"
                },
                {
                  "name":"offsets", "source": "/document/normalized_images/*/contentOffset"
                }
            ],
            "outputs": [
                {
                  "name": "mergedText", 
                  "targetName" : "merged_text"
                }
            ]
        },
        {
            "@odata.type": "#Microsoft.Skills.Text.LanguageDetectionSkill",
            "context": "/document",
            "description": "If you have multilingual content, adding a language code is useful for filtering",
            "inputs": [
                {
                  "name": "text",
                  "source": "/document/content"
                }
            ],
            "outputs": [
                {
                  "name": "languageCode",
                  "targetName": "language"
                }
            ]
        },
        {
            "@odata.type": "#Microsoft.Skills.Text.SplitSkill",
            "context": "/document",
            "textSplitMode": "pages",
            "maximumPageLength": 5000, # 5000 is default
            "defaultLanguageCode": "en",
            "inputs": [
                {
                    "name": "text",
                    "source": "/document/content"
                },
                {
                    "name": "languageCode",
                    "source": "/document/language"
                }
            ],
            "outputs": [
                {
                    "name": "textItems",
                    "targetName": "pages"
                }
            ]
        },
        {
            "@odata.type": "#Microsoft.Skills.Text.KeyPhraseExtractionSkill",
            "context": "/document/pages/*",
            "maxKeyPhraseCount": 2,
            "defaultLanguageCode": "en",
            "inputs": [
                {
                    "name": "text", 
                    "source": "/document/pages/*"
                },
                {
                    "name": "languageCode",
                    "source": "/document/language"
                }
            ],
            "outputs": [
                {
                    "name": "keyPhrases",
                    "targetName": "keyPhrases"
                }
            ]
        },
        {
            "@odata.type": "#Microsoft.Skills.Text.V3.EntityRecognitionSkill",
            "context": "/document/pages/*",
            "categories": ["Person", "Location", "Organization", "DateTime", "URL", "Email"],
            "minimumPrecision": 0.5, 
            "defaultLanguageCode": "en",
            "inputs": [
                {
                    "name": "text", 
                    "source":"/document/pages/*"
                },
                {
                    "name": "languageCode",
                    "source": "/document/language"
                }
            ],
            "outputs": [
                {
                    "name": "persons", 
                    "targetName": "persons"
                },
                {
                    "name": "locations", 
                    "targetName": "locations"
                },
                {
                    "name": "organizations", 
                    "targetName": "organizations"
                },
                {
                    "name": "dateTimes", 
                    "targetName": "dateTimes"
                },
                {
                    "name": "urls", 
                    "targetName": "urls"
                },
                {
                    "name": "emails", 
                    "targetName": "emails"
                }
            ]
        }
    ],
    "cognitiveServices": {
        "@odata.type": "#Microsoft.Azure.Search.CognitiveServicesByKey",
        "description": COG_SERVICES_NAME,
        "key": COG_SERVICES_KEY
    }
}

r = requests.put(AZURE_SEARCH_ENDPOINT + "/skillsets/" + skillset_name,
                 data=json.dumps(skillset_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


## Create Index

In Azure Cognitive Search, a search index is your searchable content, available to the search engine for indexing, full text search, and filtered queries. An index is defined by a schema and saved to the search service. This content exists within your search service, apart from your primary data stores, which is necessary for the millisecond response times expected in modern applications. Except for specific indexing scenarios, the search service will never connect to or query your local data.

The body of the request defines the schema of the search index. A fields collection requires one field to be designated as the key. For blob type, this field is often the "metadata_storage_path" that uniquely identifies each file in the container.

Reference:

https://learn.microsoft.com/en-us/azure/search/search-what-is-an-index

In [7]:
# Create an index
# Queries operate over the searchable fields and filterable fields in the index
index_payload = {
    "name": index_name,
    "fields": [
        {"name": "id", "type": "Edm.String", "key": "true", "searchable": "false", "retrievable": "true", "sortable": "false", "filterable": "false","facetable": "false"},
        {"name": "title", "type": "Edm.String", "searchable": "true", "retrievable": "true", "facetable": "false", "filterable": "true", "sortable": "false"},
        {"name": "content", "type": "Edm.String", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false","facetable": "false"},
        {"name": "language", "type": "Edm.String", "searchable": "false", "retrievable": "true", "sortable": "true", "filterable": "true", "facetable": "true"},
        {"name": "pages","type": "Collection(Edm.String)", "searchable": "false", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "images_text", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "keyPhrases", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "true", "facetable": "true"},
        {"name": "persons", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "locations", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "true", "facetable": "true"},
        {"name": "organizations", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "true", "facetable": "true"},
        {"name": "dateTimes", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "urls", "type": "Collection(Edm.String)", "searchable": "false", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "emails", "type": "Collection(Edm.String)", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "true", "facetable": "false"},
        {"name": "metadata_storage_name", "type": "Edm.String", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "metadata_storage_path", "type": "Edm.String", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"}
    ],
    "semantic": {
      "configurations": [
        {
          "name": "my-semantic-config",
          "prioritizedFields": {
            "prioritizedContentFields": [
                {
                    "fieldName": "content"
                }
                ]
          }
        }
      ]
    }
}

r = requests.put(AZURE_SEARCH_ENDPOINT + "/indexes/" + index_name,
                 data=json.dumps(index_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


## Create and Run the Indexer - (runs the pipeline)
This process takes about 120 mins to load all the Arxiv CS pds

The three components you have created thus far (data source, skillset, index) are inputs to an indexer. Creating the indexer on Azure Cognitive Search is the event that puts the entire pipeline into motion.

In [8]:
# Create an indexer
indexer_payload = {
    "name": indexer_name,
    "dataSourceName": datasource_name,
    "targetIndexName": index_name,
    "skillsetName": skillset_name,
    "schedule" : { "interval" : "PT2H"}, # How often do you want to check for new content in the data source
    "fieldMappings": [
        {
          "sourceFieldName" : "metadata_storage_path",
          "targetFieldName" : "id",
          "mappingFunction" : { "name" : "base64Encode" }
        },
        {
          "sourceFieldName" : "metadata_title",
          "targetFieldName" : "title"
        }
    ],
    "outputFieldMappings":
    [
        {
            "sourceFieldName": "/document/content",
            "targetFieldName": "content"
        },
        {
            "sourceFieldName": "/document/pages/*",
            "targetFieldName": "pages"
        },
        {
            "sourceFieldName" : "/document/normalized_images/*/images_text",
            "targetFieldName" : "images_text"
        },
        {
            "sourceFieldName": "/document/language",
            "targetFieldName": "language"
        },
        {
            "sourceFieldName": "/document/pages/*/keyPhrases/*",
            "targetFieldName": "keyPhrases"
        },
        {
          "sourceFieldName" : "/document/pages/*/persons/*", 
          "targetFieldName" : "persons"
        },
        {
          "sourceFieldName" : "/document/pages/*/locations/*", 
          "targetFieldName" : "locations"
        },
        {
            "sourceFieldName": "/document/pages/*/organizations/*",
            "targetFieldName": "organizations"
        },
        {
            "sourceFieldName": "/document/pages/*/dateTimes/*",
            "targetFieldName": "dateTimes"
        },
        {
            "sourceFieldName": "/document/pages/*/urls/*",
            "targetFieldName": "urls"
        },
        {
            "sourceFieldName": "/document/pages/*/emails/*",
            "targetFieldName": "emails"
        }
    ],
    "parameters":
    {
        "maxFailedItems": -1,
        "maxFailedItemsPerBatch": -1,
        "configuration":
        {
            "dataToExtract": "contentAndMetadata",
            "imageAction": "generateNormalizedImages"
        }
    }
}

r = requests.put(AZURE_SEARCH_ENDPOINT + "/indexers/" + indexer_name,
                 data=json.dumps(indexer_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


In [9]:
# Optionally, get indexer status to confirm that it's running
r = requests.get(AZURE_SEARCH_ENDPOINT + "/indexers/" + indexer_name +
                 "/status", headers=headers, params=params)
# pprint(json.dumps(r.json(), indent=1))
print(r.status_code)
print("Status:",r.json().get('lastResult').get('status'))
print("Items Processed:",r.json().get('lastResult').get('itemsProcessed'))
print(r.ok)

200
Status: success
Items Processed: 3
True


**When the indexer finishes running we will have all 9.8k documents indexed in our Search Engine!.**

# References

https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/search/azure-search-documents/samples

https://learn.microsoft.com/en-us/azure/search/search-get-started-python

https://github.com/Azure-Samples/azure-search-python-samples/blob/main/Tutorial-AI-Enrichment/PythonTutorial-AzureSearch-AIEnrichment.ipynb