# Load CSVs (one-to-many) to Azure Cognitive Search

In this Jupyter Notebook, we create and run steps to index a CSV file in which each row is an indivual and independent record/document. Each row then becomes searchable in Azure Cognitive Search. 
The reference documentation can be found at [Indexing blobs and files to produce multiple search documents](https://learn.microsoft.com/en-us/azure/search/search-howto-index-one-to-many-blobs).

By default, an indexer will treat the contents of a blob or file as a single search document. If you want a more granular representation in a search index, you can set parsingMode values to create multiple search documents from one blob or file.

We are going to be using the same private Blob Storage account but a different container that has abstracts of 90k Medical publications about COVID-19 published in 2020-2022. This file is a subset of a much bigger dataset (770k articles) called CORD19 [HERE](https://github.com/allenai/cord19)

In [1]:
import os
import json
import requests
from dotenv import load_dotenv
load_dotenv("credentials.env")

# Name of the container in your Blob Storage Datasource ( in credentials.env)
BLOB_CONTAINER_NAME = "cord19"

In [2]:
# Define the names for the data source, index and indexer
datasource_name = "cogsrch-datasource-csv"
skillset_name = "cogsrch-skillset-csv"
index_name = "cogsrch-index-csv"
indexer_name = "cogsrch-indexer-csv"

In [3]:
# Setup the Payloads header
headers = {'Content-Type': 'application/json','api-key': os.environ['AZURE_SEARCH_KEY']}
params = {'api-version': os.environ['AZURE_SEARCH_API_VERSION']}

## Create Data Source (Blob container with the Litcovid CSV data file)

In [4]:
# Create a data source

datasource_payload = {
    "name": datasource_name,
    "description": "Demo files to demonstrate cognitive search capabilities of one-to-many.",
    "type": "azureblob",
    "credentials": {
        "connectionString": os.environ['BLOB_CONNECTION_STRING']
    },
    "container": {
        "name": BLOB_CONTAINER_NAME
    }
}
r = requests.put(os.environ['AZURE_SEARCH_ENDPOINT'] + "/datasources/" + datasource_name,
                 data=json.dumps(datasource_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

204
True


In [5]:
datasource_payload

{'name': 'cogsrch-datasource-csv',
 'description': 'Demo files to demonstrate cognitive search capabilities of one-to-many.',
 'type': 'azureblob',
 'credentials': {'connectionString': 'DefaultEndpointsProtocol=https;AccountName=neudeveastus2azgptstg;AccountKey=NcA5+AOuB9jq2pWFGAJ6qQJZ0UQHAOWhUTCajerQTc1gulD/QWB1OYyjz+BA92CHkcv74RPqXDgq+AStAdfuOw==;EndpointSuffix=core.windows.net'},
 'container': {'name': 'cord19'}}

## Inspect CSV file so we can understand the column types before creating the Index

In our private dataset we have place a smaller version of the original the metadata.csv file in the cord19 dataset. 
Let's see what the file looks like:

In [9]:
#Download the csv files to disk and inspect using pandas
import pandas as pd
remote_file_path =  "https://" + os.environ["BLOB_STORAGE_ACCOUNT_NAME"] + ".blob.core.windows.net/cord19/metadata.csv"

In [10]:
print(remote_file_path + os.environ['BLOB_SAS_TOKEN'])

https://neudeveastus2azgptstg.blob.core.windows.net/cord19/metadata.csv?sv=2015-04-05&ss=bfqt&srt=sco&sp=rwdlacup&se=2025-01-01T00%3A00%3A00.0000000Z&spr=https,http&sig=0QeWjR3O%2FS6FQSLejCaYSXI5j7u2Q8cahy5yzTgsYGY%3D


In [11]:
metadata = pd.read_csv(remote_file_path + os.environ['BLOB_SAS_TOKEN'])
print("No. of lines:",metadata.shape[0])

simple_schema = ['cord_uid', 'source_x', 'title', 'abstract', 'authors', 'url']

def make_clickable(address):
    '''Make the url clickable'''
    return '<a href="{0}">{0}</a>'.format(address)

def preview(text):
    '''Show only a preview of the text data.'''
    return text[:30] + '...'

format_ = {'title': preview, 'abstract': preview, 'authors': preview, 'url': make_clickable}

metadata[simple_schema].head().style.format(format_)

No. of lines: 51078


Unnamed: 0,cord_uid,source_x,title,abstract,authors,url
0,xqhn0vbp,PMC,Airborne rhinovirus detection ...,"BACKGROUND: Rhinovirus, the mo...","Myatt, Theodore A; Johnston, S...",https://www.ncbi.nlm.nih.gov/pmc/articles/PMC140314/
1,gi6uaa83,PMC,Discovering human history from...,Recent analyses of human patho...,"Disotell, Todd R...",https://www.ncbi.nlm.nih.gov/pmc/articles/PMC156578/
2,le0ogx1s,PMC,A new recruit for the army of ...,"The army of the men of death, ...","Petsko, Gregory A...",https://www.ncbi.nlm.nih.gov/pmc/articles/PMC193621/
3,fy4w7xz8,PMC,Association of HLA class I wit...,BACKGROUND: The human leukocyt...,"Lin, Marie; Tseng, Hsiang-Kuan...",https://www.ncbi.nlm.nih.gov/pmc/articles/PMC212558/
4,0qaoam29,PMC,A double epidemic model for th...,BACKGROUND: An epidemic of a S...,"Ng, Tuen Wai; Turinici, Gabrie...",https://www.ncbi.nlm.nih.gov/pmc/articles/PMC222908/


In [None]:
remote_file_path + os.environ['BLOB_SAS_TOKEN']

'https://blobstoragengdmjylk4biva.blob.core.windows.net/cord19/metadata.csv?sp=r&st=2023-08-18T17:06:35Z&se=2023-08-19T01:06:35Z&spr=https&sv=2022-11-02&sr=c&sig=%2FCROMTNwhiVrDR7egZEhimqBzN4Idjb6qudFm59oXSQ%3D'

## Create Skillset - Text Splitter, Language Detection
We will use cognitive services enrichment for spliting the text of each content field into chunks (pages) and for language detection. We should always split the text since we don't know how big the content of each row might be.

In [12]:
# Create a skillset
skillset_payload = {
    "name": skillset_name,
    "description": "Splits Text and detect language",
    "skills":
    [
        {
            "@odata.type": "#Microsoft.Skills.Text.LanguageDetectionSkill",
            "description": "If you have multilingual content, adding a language code is useful for filtering",
            "context": "/document",
            "inputs": [
                {
                  "name": "text",
                  "source": "/document/abstract"
                }
            ],
            "outputs": [
                {
                  "name": "languageCode",
                  "targetName": "language"
                }
            ]
        },
        {
            "@odata.type": "#Microsoft.Skills.Text.SplitSkill",
            "context": "/document",
            "textSplitMode": "pages",
            "maximumPageLength": 5000, # 5000 is default
            "defaultLanguageCode": "en",
            "inputs": [
                {
                    "name": "text",
                    "source": "/document/abstract"
                },
                {
                    "name": "languageCode",
                    "source": "/document/language"
                }
            ],
            "outputs": [
                {
                    "name": "textItems",
                    "targetName": "pages"
                }
            ]
        }
    ],
    "cognitiveServices": {
        "@odata.type": "#Microsoft.Azure.Search.CognitiveServicesByKey",
        "description": os.environ['COG_SERVICES_NAME'],
        "key": os.environ['COG_SERVICES_KEY']
    }
}

r = requests.put(os.environ['AZURE_SEARCH_ENDPOINT'] + "/skillsets/" + skillset_name,
                 data=json.dumps(skillset_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


## Create the Index
In Azure Cognitive Search, both blob indexers and file indexers support a delimitedText parsing mode for CSV files that treats each line in the CSV as a separate search document.

### **Important**:
As you can see below and from the prior Notebook, there are 7 mandatory fields in the schema: `id, title, content, chunks, language, name, location`. These fields must exist in any index that you create regardles of the datasource. Any additional fields are good to add so the engine can search relevant documents, however the mandatory fields must exist for all the code downstream work with no issues.

In [13]:
index_payload = {
    "name": index_name,  
    "fields": [
        {"name": "id", "type": "Edm.String", "key": "true", "searchable": "false", "retrievable": "true", "facetable": "false", "filterable": "false", "sortable": "false"},
        {"name": "title", "type": "Edm.String", "searchable": "true", "retrievable": "true", "facetable": "false", "filterable": "true", "sortable": "false"},
        {"name": "content", "type": "Edm.String", "searchable": "true", "retrievable": "true", "facetable": "false", "filterable": "false", "sortable": "false"},
        {"name": "chunks","type": "Collection(Edm.String)", "searchable": "false", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "language", "type": "Edm.String", "searchable": "false", "retrievable": "true", "sortable": "true", "filterable": "true", "facetable": "true"},
        {"name": "name", "type": "Edm.String", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "location", "type": "Edm.String", "searchable": "false", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "authors", "type": "Edm.String", "searchable": "true", "retrievable": "true", "facetable": "false", "filterable": "false", "sortable": "false"},
        {"name": "metadata_storage_name", "type": "Edm.String", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "metadata_storage_path", "type":"Edm.String", "searchable": "false", "retrievable": "true", "filterable": "false", "sortable": "false"},
        {"name": "metadata_storage_last_modified", "type":"Edm.DateTimeOffset", "searchable": "false", "retrievable": "false", "filterable": "false", "sortable": "false"}
    ],
    "semantic": {
        "configurations": [
            {
                "name": "my-semantic-config",
                "prioritizedFields": {
                    "titleField": 
                        {
                            "fieldName": "title"
                        },
                    "prioritizedContentFields": [
                        { 
                            "fieldName":"content" 
                        }
                    ]
                }
            }
        ]
    }
}

r = requests.put(os.environ['AZURE_SEARCH_ENDPOINT'] + "/indexes/" + index_name,
                 data=json.dumps(index_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


## Create and Run the Indexer - (runs the pipeline)
To create one-to-many indexers with CSV blobs, create or update an indexer definition with the delimitedText parsing mode

In [14]:
indexer_payload = {
    "name": indexer_name,
    "dataSourceName": datasource_name,
    "targetIndexName": index_name,
    "skillsetName": skillset_name,
    "schedule" : { "interval" : "PT2H"},
    "fieldMappings": [
        {
          "sourceFieldName" : "cord_uid",
          "targetFieldName" : "id"
        },
        {
          "sourceFieldName" : "abstract",
          "targetFieldName" : "content"
        },
        {
          "sourceFieldName" : "metadata_storage_name",
          "targetFieldName" : "name"
        },
        {
          "sourceFieldName" : "url",
          "targetFieldName" : "location"
        }
    ],
    "outputFieldMappings":
    [
        {
            "sourceFieldName": "/document/language",
            "targetFieldName": "language"
        },
        {
            "sourceFieldName": "/document/pages/*",
            "targetFieldName": "chunks"
        }
    ],
    "parameters" : { 
        "configuration" : { 
            "dataToExtract": "contentAndMetadata",
            "parsingMode" : "delimitedText", 
            "firstLineContainsHeaders" : True,
            "delimitedTextDelimiter": ","
        } 
    }
}
r = requests.put(os.environ['AZURE_SEARCH_ENDPOINT'] + "/indexers/" + indexer_name,
                 data=json.dumps(indexer_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


In [16]:
# Uncomment if you find an error
r.text



In [17]:
# Optionally, get indexer status to confirm that it's running
r = requests.get(os.environ['AZURE_SEARCH_ENDPOINT'] + "/indexers/" + indexer_name +
                 "/status", headers=headers, params=params)
print(json.dumps(r.json(), indent=1))
print(r.status_code)
print("Status:",r.json().get('lastResult').get('status'))
print("Items Processed:",r.json().get('lastResult').get('itemsProcessed'))
print(r.ok)

{
 "@odata.context": "https://neu-dev-eastus2-azgptsrch-srch.search.windows.net/$metadata#Microsoft.Azure.Search.V2023_07_01_Preview.IndexerExecutionInfo",
 "name": "cogsrch-indexer-csv",
 "status": "running",
 "lastResult": {
  "status": "inProgress",
  "statusDetail": null,
  "errorMessage": null,
  "startTime": "2023-09-21T19:06:21.109Z",
  "endTime": null,
  "itemsProcessed": 35000,
  "itemsFailed": 0,
  "initialTrackingState": null,
  "finalTrackingState": "{\r\n  \"lastFullEnumerationStartTime\": \"0001-01-01T00:00:00Z\",\r\n  \"lastAttemptedEnumerationStartTime\": \"2023-09-21T19:06:21.187Z\",\r\n  \"nameHighWaterMark\": \"https://neudeveastus2azgptstg.blob.core.windows.net/cord19/metadata.csv\",\r\n  \"rowOrdinal\": 35000\r\n}",
  "mode": "indexingAllDocs",
  "errors": [],
   {
    "key": "localId=metadata.csv&documentPosition=2219&documentKey=jvhiafg9",
    "name": "Projection.SearchIndex.OutputFieldMapping.language",
    "message": "Could not map output field 'language' to se

**When the indexer finishes running we will have all 90,000 rows indexed properly as separate documents in our Search Engine!.**

# Reference

- https://learn.microsoft.com/en-us/azure/search/search-howto-index-csv-blobs
- https://learn.microsoft.com/en-us/azure/search/knowledge-store-create-rest



# NEXT
Now that we have two separete indexes loaded with two different types of information, In the next notebook 3, we will do a Multi-Index query, sort the results based on the reranker semantic score of Azure Search, and then use OpenAI to understand this results and give the best answer possible