# Redaction in a PDF using Textract, Comprehend Medical, and muPDF 

Begin by installing muPDF library

In [None]:
! pip install PyMuPDF --upgrade

Load libraries will be using and set up initial variables 

In [None]:
import fitz
import boto3
import time
import json
import sagemaker
from IPython.display import IFrame

ON_SAGEMAKER_NOTEBOOK = True

# Curent AWS Region. Use this to choose corresponding S3 bucket with sample content
mySession = boto3.session.Session()
awsRegion = mySession.region_name
# Amazon S3 client
s3 = boto3.client('s3')

# Amazon Textract client
textract = boto3.client('textract')

if ON_SAGEMAKER_NOTEBOOK:
    role = sagemaker.get_execution_role()
else:
    role = "[YOUR ROLE]"

In [None]:
role

In [None]:
filename = 'data/sample_doctors_report.pdf'

IFrame(filename, 900, 400)

In [None]:
doc = fitz.open(filename)

## Create an S3 bucket
And put your bucket name to replace: **your-s3-bucket-name**

In [None]:
bucket = 'your-s3-bucket-name'
prefix = 'your-s3-prefix

In [None]:
file_path = f"{prefix}/source_doc/{filename}"
doc_uri = f"s3://{bucket}/{file_path}"

In [None]:
! aws s3 cp {filename} {doc_uri}

In [None]:
def startJob(s3BucketName, objectName):
    response = None
    response = textract.start_document_analysis(
    DocumentLocation={
        'S3Object': {
            'Bucket': s3BucketName,
            'Name': objectName
        }
    },
    FeatureTypes = [
        'FORMS', 'TABLES'
    ],
    )

    return response["JobId"]

def isJobComplete(jobId):
    response = textract.get_document_analysis(JobId=jobId)
    status = response["JobStatus"]
    print("Job status: {}".format(status))

    while(status == "IN_PROGRESS"):
        time.sleep(5)
        response = textract.get_document_analysis(JobId=jobId)
        status = response["JobStatus"]
        print("Job status: {}".format(status))

    return status

def getJobResults(jobId):

    pages = []
    response = textract.get_document_analysis(JobId=jobId)
    
    pages.append(response)
    print("Resultset page recieved: {}".format(len(pages)))
    nextToken = None
    if('NextToken' in response):
        nextToken = response['NextToken']

    while(nextToken):
        response = textract.get_document_analysis(JobId=jobId, NextToken=nextToken)

        pages.append(response)
        print("Resultset page recieved: {}".format(len(pages)))
        nextToken = None
        if('NextToken' in response):
            nextToken = response['NextToken']

    return pages

In [None]:
jobId = startJob(bucket, file_path)

In [None]:
if(isJobComplete(jobId)):
    textract_response = getJobResults(jobId)

In [None]:
#textract_response

In [None]:
with open("txtract_output.json", "w") as txtract_output:
    json.dump(textract_response, txtract_output)

In [None]:
# Write detected text
txtbuf = ""
with open("textract_text.txt", "w") as wfd:
    for resultPage in textract_response:
        for item in resultPage["Blocks"]:
            if item["BlockType"] == "LINE":
                wfd.write(item["Text"] + ' ')
                txtbuf += item["Text"] + ' '

In [None]:
textract_txt_uri = f"s3://{bucket}/{prefix}/textract_txt/textract_text.txt"
! aws s3 cp "textract_text.txt" {textract_txt_uri}

In [None]:
comprehend = boto3.client(service_name='comprehendmedical', region_name=awsRegion)

# setup input and output and job id
import uuid

job_uuid = uuid.uuid1()
job_name = f"comprehend-medical-job-{job_uuid}"
print("job_name = "+job_name)

#### myDataAccessRole
Replace the string **your-data-access-role-arn** with the ARN of the Role you created

In [None]:
response = comprehend.start_entities_detection_v2_job(
    InputDataConfig={
        'S3Bucket': bucket,
        'S3Key': f'{prefix}/textract_txt'
    },
    OutputDataConfig={
        'S3Bucket': bucket,
        'S3Key': f'{prefix}/results'
    },
    DataAccessRoleArn = "your-data-access-role-arn",
    JobName=job_name,
    LanguageCode='en',
)


In [None]:
# Get the job ID
events_job_id = response['JobId']
print("events_job_id = "+events_job_id)
job = comprehend.describe_entities_detection_v2_job(JobId=events_job_id)
print(job)

In [None]:
# using datetime module
import datetime;
from time import sleep

while True:
    job = comprehend.describe_entities_detection_v2_job(JobId=events_job_id)
    status = job['ComprehendMedicalAsyncJobProperties']['JobStatus']
    if status in ['COMPLETED', 'FAILED']:
        break
    sleep(10)
    # ct stores current time
    ct = datetime.datetime.now()
    print("-- still processing --> " + str(ct))
print("-- done --")

In [None]:
job

In [None]:
res_bucket = job['ComprehendMedicalAsyncJobProperties']['OutputDataConfig']['S3Bucket']
res_key = job['ComprehendMedicalAsyncJobProperties']['OutputDataConfig']['S3Key']
res_file = f'{res_key}textract_text.txt.out'


In [None]:
res_file

In [None]:
# download the Comprehend pii output file to process locally
import boto3
s3 = boto3.client('s3')
s3.download_file(res_bucket, res_file, 'compmed_output.json')

In [None]:
with open('compmed_output.json', "r") as rfd:
    wom = json.load(rfd)

In [None]:
red_list = []
for f in wom['Entities']:
    if f['Category'] == 'ANATOMY':
        text = txtbuf[f['BeginOffset']:f['EndOffset']]
        if text not in red_list:
            red_list.append(text)

In [None]:
red_list

In [None]:
with open("txtract_output.json", "r") as txtract_output:
    response = json.load(txtract_output)

In [None]:
resultPage = response[0]

In [None]:
bbox_items = []
for resultPage in textract_response:
    for item in resultPage["Blocks"]:
        if item['BlockType'] == 'WORD':
            if item['Text'] in red_list:
               # print(item['Text'])
                bbox_items.append(item)


In [None]:
bbox_items

In [None]:
doc = fitz.open(filename)
debug = False

# pixels per inch
ppi = 72

# padding for the highlight for the PII annotations
pad = 1

# the units for Rect are in pixels

for item in bbox_items:
    page = doc[item['Page']-1]
    
    # Get page mediabox size in pixels
    mediabox_width, mediabox_height = page.mediabox_size

    page_height = mediabox_height / ppi
    page_width = mediabox_width / ppi
    
    text = item['Text']
    bbox = item['Geometry']['BoundingBox']

    # note: each of the bbox values is a ratio of the overall document page height or width
    top = bbox['Top'] * page_height
    left = bbox['Left'] * page_width
    bottom = top + (bbox['Height'] * page_height)
    right = left + (bbox['Width'] * page_width)

    rect = fitz.Rect(left*ppi-pad, top*ppi-pad, right*ppi+pad, bottom*ppi+pad)

    red = (1, 0, 0)
    annot = page.add_rect_annot(rect)
    #annot.set_border(width=1, dashes=[1, 2])
    annot.set_colors(stroke=red, fill=red)
    annot.update(opacity=1)

doc.save('output.pdf')

In [None]:
redacted_filename = 'output.pdf'
IFrame(redacted_filename, 900, 400)