Skip to content

Commit

Permalink
Merge pull request #515 from microsoft/geearl/7078-delete-residual-ch…
Browse files Browse the repository at this point in the history
…unks

deletion of chunks on upload
  • Loading branch information
georearl committed Feb 29, 2024
2 parents 1ff167f + bb96dc0 commit e9085af
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
8 changes: 6 additions & 2 deletions app/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,19 +560,23 @@ async def get_all_tags():

@app.post("/retryFile")
async def retryFile(request: Request):
"""
Retries submission of a file
Returns:
dict: A dictionary containing the status of all tags
"""
json_body = await request.json()
filePath = json_body.get("filePath")

try:

file_path_parsed = filePath.replace(ENV["AZURE_BLOB_STORAGE_UPLOAD_CONTAINER"] + "/", "")
blob = blob_client.get_blob_client(ENV["AZURE_BLOB_STORAGE_UPLOAD_CONTAINER"], file_path_parsed)

if blob.exists():
raw_file = blob.download_blob().readall()
# Overwrite the existing blob with new data
blob.upload_blob(raw_file, overwrite=True)

statusLog.upsert_document(document_path=filePath,
status='Resubmitted to the processing pipeline',
status_classification=StatusClassification.INFO,
Expand Down
46 changes: 42 additions & 4 deletions functions/FileUploadedFunc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import time
from shared_code.status_log import StatusLog, State, StatusClassification
import azure.functions as func
from azure.storage.blob import generate_blob_sas
from azure.storage.blob import BlobServiceClient, generate_blob_sas
from azure.storage.queue import QueueClient, TextBase64EncodePolicy
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential


azure_blob_connection_string = os.environ["BLOB_CONNECTION_STRING"]
cosmosdb_url = os.environ["COSMOSDB_URL"]
Expand All @@ -22,10 +25,14 @@
media_submit_queue = os.environ["MEDIA_SUBMIT_QUEUE"]
image_enrichment_queue = os.environ["IMAGE_ENRICHMENT_QUEUE"]
max_seconds_hide_on_upload = int(os.environ["MAX_SECONDS_HIDE_ON_UPLOAD"])
function_name = "FileUploadedFunc"


azure_blob_content_container = os.environ["BLOB_STORAGE_ACCOUNT_OUTPUT_CONTAINER_NAME"]
azure_blob_endpoint = os.environ["BLOB_STORAGE_ACCOUNT_ENDPOINT"]
azure_blob_key = os.environ["AZURE_BLOB_STORAGE_KEY"]
azure_search_service_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]
azure_search_service_index = os.environ["AZURE_SEARCH_INDEX"]
azure_search_service_key = os.environ["AZURE_SEARCH_SERVICE_KEY"]

function_name = "FileUploadedFunc"

def main(myblob: func.InputStream):
""" Function to read supported file types and pass to the correct queue for processing"""
Expand Down Expand Up @@ -70,6 +77,37 @@ def main(myblob: func.InputStream):
}
message_string = json.dumps(message)

# If this is an update to the blob, then we need to delete any residual chunks
# as processing will overlay chunks, but if the new file version is smaller
# than the old, then the residual old chunks will remain. The following
# code handles this for PDF and non-PDF files.
blob_client = BlobServiceClient(
account_url=azure_blob_endpoint,
credential=azure_blob_key,
)
blob_container = blob_client.get_container_client(azure_blob_content_container)
# List all blobs in the container that start with the name of the blob being processed
# first remove the container prefix
myblob_filename = myblob.name.split("/", 1)[1]
blobs = blob_container.list_blobs(name_starts_with=myblob_filename)

# instantiate the search sdk elements
search_client = SearchClient(azure_search_service_endpoint,
azure_search_service_index,
AzureKeyCredential(azure_search_service_key))
search_id_list_to_delete = []

# Iterate through the blobs and delete each one from blob and the search index
for blob in blobs:
blob_client.get_blob_client(container=azure_blob_content_container, blob=blob.name).delete_blob()
search_id_list_to_delete.append({"id": blob.name})

if len(search_id_list_to_delete) > 0:
search_client.delete_documents(documents=search_id_list_to_delete)
logging.debug("Succesfully deleted items from AI Search index.")
else:
logging.debug("No items to delete from AI Search index.")

# Queue message with a random backoff so as not to put the next function under unnecessary load
queue_client = QueueClient.from_connection_string(azure_blob_connection_string, queue_name, message_encode_policy=TextBase64EncodePolicy())
backoff = random.randint(1, max_seconds_hide_on_upload)
Expand Down

0 comments on commit e9085af

Please sign in to comment.