In [1]:
import os
from presidio_analyzer import AnalyzerEngine, PatternRecognizer
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import glob
import pandas as pd
import docx2txt
import textract
from pathlib import Path
import stat
import time
import datetime
import easyocr
from PIL import Image, ImageOps, ImageEnhance
from csv import reader


In [2]:
import copy
import logging
from abc import abstractmethod
from typing import List, Dict

from presidio_analyzer import RecognizerResult
from presidio_analyzer.nlp_engine import NlpArtifacts

logger = logging.getLogger("presidio-analyzer")


class EntityRecognizer:
    """
    A class representing an abstract PII entity recognizer.
    EntityRecognizer is an abstract class to be inherited by
    Recognizers which hold the logic for recognizing specific PII entities.
    :param supported_entities: the entities supported by this recognizer
    (for example, phone number, address, etc.)
    :param supported_language: the language supported by this recognizer.
    The supported langauge code is iso6391Name
    :param name: the name of this recognizer (optional)
    :param version: the recognizer current version
    """

    MIN_SCORE = 0
    MAX_SCORE = 1.0
    CONTEXT_SIMILARITY_THRESHOLD = 0.65
    CONTEXT_SIMILARITY_FACTOR = 0.35
    MIN_SCORE_WITH_CONTEXT_SIMILARITY = 0.4
    CONTEXT_PREFIX_COUNT = 5
    CONTEXT_SUFFIX_COUNT = 0

    def __init__(
        self,
        supported_entities: List[str],
        name: str = None,
        supported_language: str = "en",
        version: str = "0.0.1",
    ):

        self.supported_entities = supported_entities

        if name is None:
            self.name = self.__class__.__name__  # assign class name as name
        else:
            self.name = name

        self.supported_language = supported_language
        self.version = version
        self.is_loaded = False

        self.load()
        logger.info("Loaded recognizer: %s", self.name)
        self.is_loaded = True

    @abstractmethod
    def load(self):
        """
        Initialize the recognizer assets if needed.
        (e.g. machine learning models)
        """

    @abstractmethod
    def analyze(
        self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts
    ):# -> List[RecognizerResult]
        """
        Analyze text to identify entities.
        :param text: The text to be analyzed
        :param entities: The list of entities this recognizer is able to detect
        :param nlp_artifacts: A group of attributes which are the result of
        an NLP process over the input text.
        :return: List of results detected by this recognizer.
        """
        return None

    def get_supported_entities(self): # -> List[str]
        """
        Return the list of entities this recognizer can identify.
        :return: A list of the supported entities by this recognizer
        """
        return self.supported_entities

    def get_supported_language(self): # -> str
        """
        Return the language this recognizer can support.
        :return: A list of the supported language by this recognizer
        """
        return self.supported_language

    def get_version(self): # -> str
        """
        Return the version of this recognizer.
        :return: The current version of this recognizer
        """
        return self.version

    def to_dict(self): # -> Dict
        """
        Serialize self to dictionary.
        :return: a dictionary
        """
        return_dict = {
            "supported_entities": self.supported_entities,
            "supported_language": self.supported_language,
            "name": self.name,
            "version": self.version,
        }
        return return_dict

    @classmethod
    def from_dict(cls, entity_recognizer_dict: Dict): # -> "EntityRecognizer"
        """
        Create EntityRecognizer from a dict input.
        :param entity_recognizer_dict: Dict containing keys and values for instantiation
        """
        return cls(**entity_recognizer_dict)

    def enhance_using_context(
        self,
        text: str,
        raw_results: List[RecognizerResult],
        nlp_artifacts: NlpArtifacts,
        recognizer_context_words: List[str],
    ): # -> List[RecognizerResult]
        """
        Update results in case surrounding words are relevant to the context words.
        Using the surrounding words of the actual word matches, look
        for specific strings that if found contribute to the score
        of the result, improving the confidence that the match is
        indeed of that PII entity type
        :param text: The actual text that was analyzed
        :param raw_results: Recognizer results which didn't take
                            context into consideration
        :param nlp_artifacts: The nlp artifacts contains elements
                              such as lemmatized tokens for better
                              accuracy of the context enhancement process
        :param recognizer_context_words: The words the current recognizer
                                         supports (words to lookup)
        """
        # create a deep copy of the results object so we can manipulate it
        results = copy.deepcopy(raw_results)

        # Sanity
        if nlp_artifacts is None:
            logger.warning("[%s]. NLP artifacts were not provided", self.name)
            return results
        if recognizer_context_words is None or recognizer_context_words == []:
            logger.info(
                "recognizer '%s' does not support context " "enhancement", self.name
            )
            return results

        for result in results:
            # extract lemmatized context from the surrounding of the match

            word = text[result.start : result.end]

            surrounding_words = self.__extract_surrounding_words(
                nlp_artifacts=nlp_artifacts, word=word, start=result.start
            )

            supportive_context_word = self.__find_supportive_word_in_context(
                surrounding_words, recognizer_context_words
            )
            if supportive_context_word != "":
                result.score += self.CONTEXT_SIMILARITY_FACTOR
                result.score = max(result.score, self.MIN_SCORE_WITH_CONTEXT_SIMILARITY)
                result.score = min(result.score, EntityRecognizer.MAX_SCORE)

                # Update the explainability object with context information
                # helped improving the score
                result.analysis_explanation.set_supportive_context_word(
                    supportive_context_word
                )
                result.analysis_explanation.set_improved_score(result.score)
        return results

    @staticmethod
    def __context_to_keywords(context: str): # -> List[str]
        return context.split(" ")

    @staticmethod
    def __find_supportive_word_in_context(
        context_list: List[str], recognizer_context_list: List[str]
    ): # -> str
        """
        Find words in the text which are relevant for context evaluation.
        A word is considered a supportive context word if there's exact match
        between a keyword in context_text and any keyword in context_list.
        :param context_list words before and after the matched entity within
               a specified window size
        :param recognizer_context_list a list of words considered as
                context keywords manually specified by the recognizer's author
        """
        word = ""
        # If the context list is empty, no need to continue
        if context_list is None or recognizer_context_list is None:
            return word

        for predefined_context_word in recognizer_context_list:
            # result == true only if any of the predefined context words
            # is found exactly or as a substring in any of the collected
            # context words
            result = next(
                (
                    True
                    for keyword in context_list
                    if predefined_context_word in keyword
                ),
                False,
            )
            if result:
                logger.debug("Found context keyword '%s'", predefined_context_word)
                word = predefined_context_word
                break

        return word

    @staticmethod
    def __add_n_words(
        index: int,
        n_words: int,
        lemmas: List[str],
        lemmatized_filtered_keywords: List[str],
        is_backward: bool,
    ): # -> List[str]
        """
        Prepare a string of context words.
        Return a list of words which surrounds a lemma at a given index.
        The words will be collected only if exist in the filtered array
        :param index: index of the lemma that its surrounding words we want
        :param n_words: number of words to take
        :param lemmas: array of lemmas
        :param lemmatized_filtered_keywords: the array of filtered
               lemmas from the original sentence,
        :param is_backward: if true take the preceeding words, if false,
                            take the successing words
        """
        i = index
        context_words = []
        # The entity itself is no interest to us...however we want to
        # consider it anyway for cases were it is attached with no spaces
        # to an interesting context word, so we allow it and add 1 to
        # the number of collected words

        # collect at most n words (in lower case)
        remaining = n_words + 1
        while 0 <= i < len(lemmas) and remaining > 0:
            lower_lemma = lemmas[i].lower()
            if lower_lemma in lemmatized_filtered_keywords:
                context_words.append(lower_lemma)
                remaining -= 1
            i = i - 1 if is_backward else i + 1
        return context_words

    def __add_n_words_forward(
        self,
        index: int,
        n_words: int,
        lemmas: List[str],
        lemmatized_filtered_keywords: List[str],
    ): # -> List[str]
        return self.__add_n_words(
            index, n_words, lemmas, lemmatized_filtered_keywords, False
        )

    def __add_n_words_backward(
        self,
        index: int,
        n_words: int,
        lemmas: List[str],
        lemmatized_filtered_keywords: List[str],
    ): # -> List[str]
        return self.__add_n_words(
            index, n_words, lemmas, lemmatized_filtered_keywords, True
        )

    @staticmethod
    def _find_index_of_match_token(
        word: str, start: int, tokens, tokens_indices: List[int]  # noqa ANN001
    ): # -> int
        found = False
        # we use the known start index of the original word to find the actual
        # token at that index, we are not checking for equivilance since the
        # token might be just a substring of that word (e.g. for phone number
        # 555-124564 the first token might be just '555' or for a match like '
        # rocket' the actual token will just be 'rocket' hence the misalignment
        # of indices)
        # Note: we are iterating over the original tokens (not the lemmatized)
        i = -1
        for i, token in enumerate(tokens, 0):
            # Either we found a token with the exact location, or
            # we take a token which its characters indices covers
            # the index we are looking for.
            if (tokens_indices[i] == start) or (start < tokens_indices[i] + len(token)):
                # found the interesting token, the one that around it
                # we take n words, we save the matching lemma
                found = True
                break

        if not found:
            raise ValueError(
                "Did not find word '" + word + "' "
                "in the list of tokens although it "
                "is expected to be found"
            )
        return i

    def __extract_surrounding_words(
        self, nlp_artifacts: NlpArtifacts, word: str, start: int
    ): # -> List[str]
        """Extract words surrounding another given word.
        The text from which the context is extracted is given in the nlp
        doc.
        :param nlp_artifacts: An abstraction layer which holds different
                              items which are the result of a NLP pipeline
                              execution on a given text
        :param word: The word to look for context around
        :param start: The start index of the word in the original text
        """
        if not nlp_artifacts.tokens:
            logger.info("Skipping context extraction due to " "lack of NLP artifacts")
            # if there are no nlp artifacts, this is ok, we can
            # extract context and we return a valid, yet empty
            # context
            return [""]

        # Get the already prepared words in the given text, in their
        # LEMMATIZED version
        lemmatized_keywords = nlp_artifacts.keywords

        # since the list of tokens is not necessarily aligned
        # with the actual index of the match, we look for the
        # token index which corresponds to the match
        token_index = EntityRecognizer._find_index_of_match_token(
            word, start, nlp_artifacts.tokens, nlp_artifacts.tokens_indices
        )

        # index i belongs to the PII entity, take the preceding n words
        # and the successing m words into a context list

        backward_context = self.__add_n_words_backward(
            token_index,
            EntityRecognizer.CONTEXT_PREFIX_COUNT,
            nlp_artifacts.lemmas,
            lemmatized_keywords,
        )
        forward_context = self.__add_n_words_forward(
            token_index,
            EntityRecognizer.CONTEXT_SUFFIX_COUNT,
            nlp_artifacts.lemmas,
            lemmatized_keywords,
        )

        context_list = []
        context_list.extend(backward_context)
        context_list.extend(forward_context)
        context_list = list(set(context_list))
        logger.debug("Context list is: %s", " ".join(context_list))
        return context_list

    @staticmethod
    def remove_duplicates(results: List[RecognizerResult]): # -> List[RecognizerResult]
        """
        Remove duplicate results.
        Remove duplicates in case the two results
        have identical start and ends and types.
        :param results: List[RecognizerResult]
        :return: List[RecognizerResult]
        """
        results = list(set(results))
        results = sorted(results, key=lambda x: (-x.score, x.start, -(x.end - x.start)))
        filtered_results = []

        for result in results:
            if result.score == 0:
                continue

            to_keep = result not in filtered_results  # equals based comparison
            if to_keep:
                for filtered in filtered_results:
                    # If result is contained in one of the other results
                    if (
                        result.contained_in(filtered)
                        and result.entity_type == filtered.entity_type
                    ):
                        to_keep = False
                        break

            if to_keep:
                filtered_results.append(result)

        return filtered_results

In [3]:
import copy
import logging
from typing import Optional, List, Iterable, Union, Type

from presidio_analyzer import EntityRecognizer
from presidio_analyzer.nlp_engine import NlpEngine, SpacyNlpEngine, StanzaNlpEngine
from presidio_analyzer.predefined_recognizers import (
    CreditCardRecognizer,
    CryptoRecognizer,
    DateRecognizer,
    DomainRecognizer,
    EmailRecognizer,
    IbanRecognizer,
    IpRecognizer,
    MedicalLicenseRecognizer,
    NhsRecognizer,
    PhoneRecognizer,
    UsBankRecognizer,
    UsLicenseRecognizer,
    UsItinRecognizer,
    UsPassportRecognizer,
    UsSsnRecognizer,
    SgFinRecognizer,
    SpacyRecognizer,
    EsNifRecognizer,
    StanzaRecognizer,
    AuAbnRecognizer,
    AuAcnRecognizer,
    AuTfnRecognizer,
    AuMedicareRecognizer,
)

logger = logging.getLogger("presidio-analyzer")


class RecognizerRegistry:
    """
    Detect, register and hold all recognizers to be used by the analyzer.

    :param recognizers: An optional list of recognizers,
    that will be available instead of the predefined recognizers
    """

    def __init__(self, recognizers: Optional[Iterable[EntityRecognizer]] = None):

        if recognizers:
            self.recognizers = recognizers
        else:
            self.recognizers = []

    def load_predefined_recognizers(
        self, languages: Optional[List[str]] = None, nlp_engine: NlpEngine = None
    ): # -> None
        """
        Load the existing recognizers into memory.

        :param languages: List of languages for which to load recognizers
        :param nlp_engine: The NLP engine to use.
        :return: None
        """
        if not languages:
            languages = ["en"]

        nlp_recognizer = self._get_nlp_recognizer(nlp_engine)
        recognizers_map = {
            "en": [
                UsBankRecognizer,
                UsLicenseRecognizer,
                UsItinRecognizer,
                UsPassportRecognizer,
                UsSsnRecognizer,
                NhsRecognizer,
                SgFinRecognizer,
                AuAbnRecognizer,
                AuAcnRecognizer,
                AuTfnRecognizer,
                AuMedicareRecognizer,
            ],
            "es": [EsNifRecognizer],
            "ALL": [
                CreditCardRecognizer,
                CryptoRecognizer,
                DateRecognizer,
                DomainRecognizer,
                EmailRecognizer,
                IbanRecognizer,
                IpRecognizer,
                MedicalLicenseRecognizer,
                nlp_recognizer,
                PhoneRecognizer,
            ],
        }
        for lang in languages:
            lang_recognizers = [rc() for rc in recognizers_map.get(lang, [])]
            self.recognizers.extend(lang_recognizers)
            all_recognizers = [
                rc(supported_language=lang) for rc in recognizers_map.get("ALL", [])
            ]
            self.recognizers.extend(all_recognizers)

    @staticmethod
    def _get_nlp_recognizer(
        nlp_engine: NlpEngine,
    ): # -> Union[Type[SpacyRecognizer], Type[StanzaRecognizer]]
        """Return the recognizer leveraging the selected NLP Engine."""

        if not nlp_engine or type(nlp_engine) == SpacyNlpEngine:
            return SpacyRecognizer
        if isinstance(nlp_engine, StanzaNlpEngine):
            return StanzaRecognizer
        else:
            logger.warning(
                "nlp engine should be either SpacyNlpEngine or StanzaNlpEngine"
            )
            # Returning default
            return SpacyRecognizer

    def get_recognizers(
        self,
        language: str,
        entities: Optional[List[str]] = None,
        all_fields: bool = False,
        ad_hoc_recognizers: Optional[List[EntityRecognizer]] = None,
    ):  #-> List[EntityRecognizer]
        """
        Return a list of recognizers which supports the specified name and language.

        :param entities: the requested entities
        :param language: the requested language
        :param all_fields: a flag to return all fields of a requested language.
        :param ad_hoc_recognizers: Additional recognizers provided by the user
        as part of the request
        :return: A list of the recognizers which supports the supplied entities
        and language
        """
        if language is None:
            raise ValueError("No language provided")

        if entities is None and all_fields is False:
            raise ValueError("No entities provided")

        all_possible_recognizers = copy.copy(self.recognizers)
        if ad_hoc_recognizers:
            all_possible_recognizers.extend(ad_hoc_recognizers)

        # filter out unwanted recognizers
        to_return = set()
        if all_fields:
            to_return = [
                rec
                for rec in all_possible_recognizers
                if language == rec.supported_language
            ]
        else:
            for entity in entities:
                subset = [
                    rec
                    for rec in all_possible_recognizers
                    if entity in rec.supported_entities
                    and language == rec.supported_language
                ]

                if not subset:
                    logger.warning(
                        "Entity %s doesn't have the corresponding"
                        " recognizer in language : %s",
                        entity,
                        language,
                    )
                else:
                    to_return.update(set(subset))

        logger.debug(
            "Returning a total of %s recognizers",
            str(len(to_return)),
        )

        if not to_return:
            raise ValueError("No matching recognizers were found to serve the request.")

        return list(to_return)

    def add_recognizer(self, recognizer: EntityRecognizer): # -> None
        """
        Add a new recognizer to the list of recognizers.

        :param recognizer: Recognizer to add
        """
        if not isinstance(recognizer, EntityRecognizer):
            raise ValueError("Input is not of type EntityRecognizer")

        self.recognizers.append(recognizer)

    def remove_recognizer(self, recognizer_name: str): # -> None
        """
        Remove a recognizer based on its name.

        :param recognizer_name: Name of recognizer to remove
        """
        new_recognizers = [
            rec for rec in self.recognizers if rec.name != recognizer_name
        ]
        logger.info(
            "Removed %s recognizers which had the name %s",
            str(len(self.recognizers) - len(new_recognizers)),
            recognizer_name,
        )
        self.recognizers = new_recognizers


In [4]:
import io

def ocr_pii(img):
   
    img_gray = ImageOps.grayscale(img)
    # increase image contrast
    img_gray_contrast_incr = ImageEnhance.Contrast(img_gray).enhance(1)
    languages = ['en']
    reader = easyocr.Reader(languages)
    img_byte_arr = io.BytesIO()
    img_gray_contrast_incr.save(img_byte_arr, format='png')
    img_byte_arr = img_byte_arr.getvalue()
    return reader.readtext(img_byte_arr, detail= 0, paragraph=True)
    # we set detail to 0 because we do not care about positioning data
  

### Path to fetch files

In [5]:
#Change this path to your folder containing files to detect PII from. 

path = r"/Users/nirav/Documents/SPU/DS-630 Machine Learning/ProjectInputFiles/"
  
# Change the directory
os.chdir(path)
  
# Read text File  
""""  
def read_text_file(file_path):
    with open(file_path, 'r') as f:
        f.read()
  
  """

# iterate through all file
for file in os.listdir():
    # Check whether file is in text format or not
    if file.endswith(".txt"):
        #file_path = f"{path}/{file}"
        txtfiles = []
        for file in glob.glob("*.txt"):
            txtfiles.append(file)

        ff = open(path + txtfiles[0])
        # call read text file function
        txt_f = ff.read()    

    elif file.endswith(".csv"):
        csvfiles = []
        for file2 in glob.glob("*.csv"):
            csvfiles.append(file2)

        ff2 = pd.read_csv(path + csvfiles[0])

        csv_f = str(ff2)

    elif file.endswith(".docx"):
        docfiles = []
        for file4 in glob.glob("*.docx"):
            docfiles.append(file4)

        ff4 = docx2txt.process(path + docfiles[0])

        doc_f = ff4

    elif file.endswith(".pdf"):
        pdffiles = []
        for file5 in glob.glob("*.pdf"):
            pdffiles.append(file5)

        ff5 = textract.process(path + pdffiles[0], method='pdfminer')

        pdf_f = str(ff5)


    elif file.endswith(".jpg"):
        jpgfiles = []
        for file6 in glob.glob("*.jpg"):
            jpgfiles.append(file6)

        ff6 = Image.open(path + jpgfiles[0])
        jpg_f = ocr_pii(ff6)
        #jpg_f = ff6


    elif file.endswith(".png"):
        pngfiles = []
        for file7 in glob.glob("*.png"):
            pngfiles.append(file7)

        ff7 = Image.open(path + pngfiles[0])
        png_f = ocr_pii(ff7)
        #png_f = ff7


CUDA not available - defaulting to CPU. Note: This module is much faster with a GPU.
CUDA not available - defaulting to CPU. Note: This module is much faster with a GPU.


In [6]:
class NumbersRecognizer(EntityRecognizer):
    
    expected_confidence_level = 0.7 # expected confidence level for this recognizer
    
    def load(self): # -> None
        """No loading is required."""
        pass

    def analyze(
        self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts
    ): # -> List[RecognizerResult]
        """
        Analyzes test to find tokens which represent numbers (either 123 or One Two Three).
        """
        results = []
        
        # iterate over the spaCy tokens, and call `token.like_num`
        for token in nlp_artifacts.tokens:
            if token.like_num:
                result = RecognizerResult(
                    entity_type="NUMBER",
                    start=token.idx,
                    end=token.idx + len(token),
                    score=self.expected_confidence_level
                )
                results.append(result)
        return results

In [7]:
try:

    analyzer = AnalyzerEngine()



    titles_recognizer = PatternRecognizer(supported_entity="TITLE",
                                        deny_list=["Mr.","Mrs.","Miss"])

    new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

    #text3 = "Nirav lives in Five 10 Broad st."
    analyzer.registry.add_recognizer(new_numbers_recognizer)

    numbers_results_txt = analyzer.analyze(text=txt_f, language="en")
    #print("Results:")
    #print("\n".join([str(res) for res in numbers_results_txt]))

    phone_recognizer = PatternRecognizer(supported_entity="PHONE_NUMBER",
                                        deny_list="1234567890" or "123-456-7890")

    analyzer.registry.add_recognizer(titles_recognizer)
    analyzer.registry.add_recognizer(phone_recognizer)

    analyzer_results_txt = analyzer.analyze(text=txt_f,
                                entities=["TITLE", "PHONE_NUMBER"],
                                language='en')
    #print(analyzer_results)

    analyzer_dict_txt = analyzer_results_txt



    if analyzer_results_txt and numbers_results_txt is not None:
        print("File name " + txtfiles[0] + " contains PII data." )

except NameError:
    pass

File name pii.txt contains PII data.


In [8]:
try:
    
    analyzer = AnalyzerEngine()
    titles_recognizer = PatternRecognizer(supported_entity="TITLE",
                                        deny_list=["Mr.","Mrs.","Miss"])

    new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

    #text3 = "Nirav lives in Five 10 Broad st."
    analyzer.registry.add_recognizer(new_numbers_recognizer)

    numbers_results_csv = analyzer.analyze(text=csv_f, language="en")
    #print("Results:")
    #print("\n".join([str(res) for res in numbers_results_csv]))

    phone_recognizer = PatternRecognizer(supported_entity="PHONE_NUMBER",
                                        deny_list="1234567890" or "123-456-7890")

    analyzer.registry.add_recognizer(titles_recognizer)
    analyzer.registry.add_recognizer(phone_recognizer)

    analyzer_results_csv = analyzer.analyze(text=csv_f,
                                entities=["TITLE", "PHONE_NUMBER"],
                                language='en')
    #print(analyzer_results)

    if analyzer_results_csv and numbers_results_csv is not None:
        print("File name " + csvfiles[0] + " contains PII data." )

except NameError:
    pass

In [9]:
try:

    analyzer = AnalyzerEngine()
    titles_recognizer = PatternRecognizer(supported_entity="TITLE",
                                        deny_list=["Mr.","Mrs.","Miss"])

    new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

    #text3 = "Nirav lives in Five 10 Broad st."
    analyzer.registry.add_recognizer(new_numbers_recognizer)

    numbers_results_doc = analyzer.analyze(text=doc_f, language="en")
    #print("Results:")
    #print("\n".join([str(res) for res in numbers_results_doc]))

    phone_recognizer = PatternRecognizer(supported_entity="PHONE_NUMBER",
                                        deny_list="1234567890" or "123-456-7890")

    analyzer.registry.add_recognizer(titles_recognizer)
    analyzer.registry.add_recognizer(phone_recognizer)

    analyzer_results_doc = analyzer.analyze(text=doc_f,
                                entities=["TITLE", "PHONE_NUMBER"],
                                language='en')
    #print(analyzer_results)

    if analyzer_results_doc and numbers_results_doc is not None:
        print("File name " + docfiles[0] + " contains PII data." )

except NameError:
    pass

File name pii.docx contains PII data.


In [10]:
try:

    analyzer = AnalyzerEngine()
    titles_recognizer = PatternRecognizer(supported_entity="TITLE",
                                        deny_list=["Mr.","Mrs.","Miss"])

    new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

    #text3 = "Nirav lives in Five 10 Broad st."
    analyzer.registry.add_recognizer(new_numbers_recognizer)

    numbers_results_pdf = analyzer.analyze(text=pdf_f, language="en")
    #print("Results:")
    #print("\n".join([str(res) for res in numbers_results_pdf]))

    phone_recognizer = PatternRecognizer(supported_entity="PHONE_NUMBER",
                                        deny_list="1234567890" or "123-456-7890")

    analyzer.registry.add_recognizer(titles_recognizer)
    analyzer.registry.add_recognizer(phone_recognizer)

    analyzer_results_pdf = analyzer.analyze(text=pdf_f,
                                entities=["TITLE", "PHONE_NUMBER"],
                                language='en')
    #print(analyzer_results)

    if analyzer_results_pdf and numbers_results_pdf is not None:
        print("File name " + pdffiles[0] + " contains PII data." )

except NameError:
    pass

In [11]:
try:

    analyzer = AnalyzerEngine()
    titles_recognizer = PatternRecognizer(supported_entity="TITLE",
                                        deny_list=["Mr.","Mrs.","Miss"])

    new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

    #text3 = "Nirav lives in Five 10 Broad st."
    analyzer.registry.add_recognizer(new_numbers_recognizer)

    numbers_results_jpg = analyzer.analyze(text=str(jpg_f), language="en")
    #print("Results:")
    #print("\n".join([str(res) for res in numbers_results_pdf]))

    phone_recognizer = PatternRecognizer(supported_entity="PHONE_NUMBER",
                                        deny_list="1234567890" or "123-456-7890")

    analyzer.registry.add_recognizer(titles_recognizer)
    analyzer.registry.add_recognizer(phone_recognizer)

    analyzer_results_jpg = analyzer.analyze(text=str(jpg_f),
                                entities=["TITLE", "PHONE_NUMBER"],
                                language='en')
    #print(analyzer_results)

    if analyzer_results_jpg and numbers_results_jpg is not None:
        print("File name " + jpgfiles[0] + " contains PII data." )

except NameError:
    pass

File name license.jpg contains PII data.


In [12]:
try:

    analyzer = AnalyzerEngine()
    titles_recognizer = PatternRecognizer(supported_entity="TITLE",
                                        deny_list=["Mr.","Mrs.","Miss"])

    new_numbers_recognizer = NumbersRecognizer(supported_entities=["NUMBER"])

    #text3 = "Nirav lives in Five 10 Broad st."
    analyzer.registry.add_recognizer(new_numbers_recognizer)

    numbers_results_png = analyzer.analyze(text=str(png_f), language="en")
    #print("Results:")
    #print("\n".join([str(res) for res in numbers_results_pdf]))

    phone_recognizer = PatternRecognizer(supported_entity="PHONE_NUMBER",
                                        deny_list="1234567890" or "123-456-7890")

    analyzer.registry.add_recognizer(titles_recognizer)
    analyzer.registry.add_recognizer(phone_recognizer)

    analyzer_results_png = analyzer.analyze(text=str(png_f),
                                entities=["TITLE", "PHONE_NUMBER"],
                                language='en')
    #print(analyzer_results)

    if analyzer_results_png and numbers_results_png is not None:
        print("File name " + pngfiles[0] + " contains PII data." )

except NameError:
    pass

File name license copy.png contains PII data.


In [13]:
try:

    path_for_txt = Path(path + txtfiles[0])
    owner_for_txt = path_for_txt.owner()
    filePath_for_txt = (path + txtfiles[0])
    fileStatsObj_for_txt = os.stat ( filePath_for_txt )
    modificationTime_for_txt = time.ctime ( fileStatsObj_for_txt [ stat.ST_MTIME ] )
    #print(str(path_for_txt.name + ' is owned by ' + owner_for_txt + ' ') + str(modificationTime_for_txt) + str(numbers_results_txt))
    txt_info = [str(path_for_txt.name) + ' is owned by ' + owner_for_txt + ', Last modified on: ' + str(modificationTime_for_txt)]

except IndexError:
    path_for_txt = ''
    owner_for_txt = ''
    filePath_for_txt = ''
    fileStatsObj_for_txt = ''
    modificationTime_for_txt = ''
    txt_nofile = ['Section for TXT output - No TXT file found.']
    numbers_results_txt = ['']

In [14]:
try:
    path_for_csv = Path(path + csvfiles[0])
    owner_for_csv = path_for_csv.owner()
    filePath_for_csv = (path + csvfiles[0])
    fileStatsObj_for_csv = os.stat ( filePath_for_csv )
    modificationTime_for_csv = time.ctime ( fileStatsObj_for_csv [ stat.ST_MTIME ] )
    #print(str(path_for_csv.name + ' is owned by ' + owner_for_csv + ' ') + str(modificationTime_for_csv) + str(numbers_results_csv))
    csv_info = [str(path_for_csv.name) + ' is owned by ' + owner_for_csv + ', Last modified on: ' + str(modificationTime_for_csv)]

except NameError:
    path_for_csv = ''
    owner_for_csv = ''
    filePath_for_csv = ''
    modificationTime_for_csv = ''
    fileStatsObj_for_csv = ''
    csv_info = ['Section for CSV output - No CSV file found.']
    numbers_results_csv = ['']


In [15]:
try:

    path_for_docx = Path(path + docfiles[0])
    owner_for_docx = path_for_docx.owner()
    filePath_for_docx = (path + docfiles[0])
    fileStatsObj_for_docx = os.stat ( filePath_for_docx )
    modificationTime_for_docx = time.ctime ( fileStatsObj_for_docx [ stat.ST_MTIME ] )
    #print(str(path_for_docx.name + ' is owned by ' + owner_for_docx + ' ') + str(modificationTime_for_docx) + str(numbers_results_docx))
    doc_info = [str(path_for_docx.name) + ' is owned by ' + owner_for_docx + ', Last modified on: ' + str(modificationTime_for_docx)]

except IndexError:
    path_for_docx = ''
    owner_for_docx = ''
    filePath_for_docx = ''
    modificationTime_for_docx = ''
    fileStatsObj_for_docx = ''
    doc_info = ['Section for DOC output - No DOC file found.']
    numbers_results_doc = ['']

In [16]:
try:

    path_for_pdf = Path(path + pdffiles[0])
    owner_for_pdf = path_for_pdf.owner()
    filePath_for_pdf = (path + pdffiles[0])
    fileStatsObj_for_pdf = os.stat ( filePath_for_pdf )
    modificationTime_for_pdf = time.ctime ( fileStatsObj_for_pdf [ stat.ST_MTIME ] )
    #print(str(path_for_pdf.name + ' is owned by ' + owner_for_pdf + ' ') + str(modificationTime_for_pdf) + str(numbers_results_pdf))
    pdf_info = [str(path_for_pdf.name) + ' is owned by ' + owner_for_pdf + ', Last modified on: ' + str(modificationTime_for_pdf)]

except NameError:
    path_for_pdf = ''
    owner_for_pdf = ''
    filePath_for_pdf = ''
    modificationTime_for_pdf = ''
    fileStatsObj_for_pdf = ''
    pdf_info = ['Section for PDF output - No PDF file found.']
    numbers_results_pdf = ['']
    

In [17]:
try:

    path_for_jpg = Path(path + jpgfiles[0])
    owner_for_jpg = path_for_jpg.owner()
    filePath_for_jpg = (path + jpgfiles[0])
    fileStatsObj_for_jpg = os.stat ( filePath_for_jpg )
    modificationTime_for_jpg = time.ctime ( fileStatsObj_for_jpg [ stat.ST_MTIME ] )
    #print(str(path_for_pdf.name + ' is owned by ' + owner_for_pdf + ' ') + str(modificationTime_for_pdf) + str(numbers_results_pdf))
    jpg_info = [str(path_for_jpg.name) + ' is owned by ' + owner_for_jpg + ', Last modified on: ' + str(modificationTime_for_jpg)]

except IndexError:
    path_for_jpg = ''
    owner_for_jpg = ''
    filePath_for_jpg = ''
    modificationTime_for_jpg = ''
    fileStatsObj_for_jpg = ''
    jpg_info = ['Section for JPG output - No JPG file found.']
    numbers_results_jpg = ['']
    

In [18]:
try:

    path_for_png = Path(path + pngfiles[0])
    owner_for_png = path_for_png.owner()
    filePath_for_png = (path + pngfiles[0])
    fileStatsObj_for_png = os.stat ( filePath_for_png )
    modificationTime_for_png = time.ctime ( fileStatsObj_for_png [ stat.ST_MTIME ] )
    #print(str(path_for_png.name + ' is owned by ' + owner_for_png + ' ') + str(modificationTime_for_png) + str(numbers_results_png))
    png_info = [str(path_for_png.name) + ' is owned by ' + owner_for_png + ', Last modified on: ' + str(modificationTime_for_png)]

except IndexError:
    path_for_png = ''
    owner_for_png = ''
    filePath_for_png = ''
    modificationTime_for_png = ''
    fileStatsObj_for_png = ''
    png_info = ['Section for PNG output - No PNG file found.']
    numbers_results_png = ['']
    

### Output Path

In [19]:
final_op = (csv_info) + (numbers_results_csv + ["",""]) + (txt_info) + (numbers_results_txt + ["",""]) + (pdf_info) + (numbers_results_pdf + ["",""]) + (doc_info) + (numbers_results_doc + ["",""]) + (jpg_info) + (numbers_results_jpg + ["",""]) + (png_info) + (numbers_results_png + ["",""])

final_op

df = pd.DataFrame(final_op)

#Change the path and locate your ouput folder, this is where the CSV containing detected PII will export.

df.to_csv('/Users/nirav/Documents/SPU/DS-630 Machine Learning/ProjectOutputFiles/pii_output.csv')


In [20]:
complete = ['Output generated']

df = pd.DataFrame(complete)

df.to_csv('/Users/nirav/Documents/SPU/DS-630 Machine Learning/ProjectOutputFiles/Output_generated.csv')