<a href="https://colab.research.google.com/github/edponce/DoyleInvestigators2/blob/main/Doyle_Corpus_Extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#$\color{brown}{\rm Team~Members}$
## Jerry Duncan
## Fabian Fallas
## Chris Gropp
## Maofeng Tang
## Quan Zhou
## Eduardo Ponce

#$\color{brown}{\rm Imports}$

In [None]:
import os
import re
import copy
import json
import gensim
import urllib.request
import urllib.parse
from google.colab import files

#$\color{brown}{\rm Corpus~Selection}$

In [184]:
CORPUS_URL = {
    # Doyle
    'The Valley of Fear': 'http://www.gutenberg.org/files/3776/3776.txt',
    'A Study in Scarlet': 'http://www.gutenberg.org/files/244/244.txt',
    'The Sign of the Four': 'http://www.gutenberg.org/files/2097/2097.txt',
    'The Hound of the Baskervilles': 'http://www.gutenberg.org/files/2852/2852.txt',
    # NOTE: These stories are part of a compilation, so include (URL, story # in compilation)
    'The Boscombe Valley Mystery': ('https://www.gutenberg.org/files/1661/1661.txt', 4),
    'The Five Orange Pips': ('https://www.gutenberg.org/files/1661/1661.txt', 5),
    'The Adventure of the Speckled Band': ('https://www.gutenberg.org/files/1661/1661.txt', 8),
    'The Adventure of the Cardboard Box': ('https://www.gutenberg.org/files/834/834-0.txt', 2),
    'The Musgave Ritual': ('https://www.gutenberg.org/files/834/834-0.txt', 6),
    'The Reigate Squires': ('https://www.gutenberg.org/files/834/834-0.txt', 7),
    'The Adventure of the Dancing Men': ('https://www.gutenberg.org/files/221/221.txt', 3),
    'The Adventure of the Second Stain': ('https://www.gutenberg.org/files/221/221.txt', 13),
    # Christie
    'The Secret Adversary': 'https://www.gutenberg.org/files/1155/1155-0.txt',
    'The Man in the Brown Suite': 'https://www.gutenberg.org/files/61168/61168-0.txt',
    'The Murder on the Links': 'https://www.gutenberg.org/files/58866/58866-0.txt',
    'The Mysterious Affair at Styles': 'https://www.gutenberg.org/files/863/863-0.txt',
    # Rinehart
    'The Circular Staircase': 'https://www.gutenberg.org/cache/epub/434/pg434.txt',
    'The Case of Jennie Brice': 'https://www.gutenberg.org/cache/epub/11127/pg11127.txt',
    'The After House': 'https://www.gutenberg.org/cache/epub/2358/pg2358.txt',
    'The Window at the White Cat': 'https://www.gutenberg.org/cache/epub/34020/pg34020.txt',
    'The Man in Lower Ten': 'https://www.gutenberg.org/files/1869/1869-0.txt',
}

#$\color{brown}{\rm Load~Corpus}$
Read a corpus from web page or file to start processing.

In [87]:
def get_corpus_from_url(url):
    with urllib.request.urlopen(url) as fd:
        text = fd.read()
        try:
            return text.decode('utf-8')
        except UnicodeDecodeError:
            return text.decode('iso-8859-1')


def get_corpus_from_file(file):
    with open(file) as fd:
        return fd.read()


def get_corpus(key):
    def validate_url(url):
        parsed_url = urllib.parse.urlparse(url)
        return all([parsed_url.scheme, parsed_url.netloc, parsed_url.path])

    # Check if a filename was provided
    if os.path.isfile(key):
        return get_corpus_from_file(key)
    else:
        if key in CORPUS_URL:
            fn = CORPUS_URL[key]
            if isinstance(fn, (list, tuple)):
                fn = fn[0]
            for file in (fn, os.path.basename(fn)):
                if os.path.isfile(fn):
                    return get_corpus_from_file(file)

    # Check if a URL was provided
    if validate_url(key):
        return get_corpus_from_url(key)
    else:
        if key in CORPUS_URL:
            url = CORPUS_URL[key]
            if isinstance(url, (list, tuple)):
                url = url[0]
            if validate_url(url):
                return get_corpus_from_url(url)

    raise Exception(f"corpus '{key}' not found")

#$\color{brown}{\rm Headings~Detection~(Regex)}$
Functions to get spans of headings:
* Gutenberg tags
* Named headings - parts, chapters, adventures
* Numbered headings
* Epilogue

In [175]:
indented_labels = True
indent = r'[\t ]*' if indented_labels else ''


def get_newline_index(text):
    """Find the index of the first newline in the text.
    This is used to skip/correct one newline at beginning of headings.
    """
    match = re.match(r'[ \t\r]*\n', text)
    return match.end() if match else 0


def get_gutenberg_start_heading(text, span=None):
    """Find Gutenberg's start tag (and producer, if available).

    Notes:
        * re.match() searches at the beginning of strings, but there are
          certain character combinations that are not considered strings,
          and thus need to use re.search(), even if it is at the beginning
          of line. An example are the asterisks in the Gutenberg START
          tag.
    """
    if not span:
        span = (0, len(text))

    match = re.search(
        r'(^\s*|(\s*\n){2,})'  # pre-whitespace, no indentation
        r'\*{3}\s*'  # 3 asterisks
        r'start[^\r\n]+'  # tag text
        r'\s*\*{3}'  # 3 asterisks
        r'(\s*\nproduced by.+)?'  # producer line
        r'(\s*\n){2,}',  # post-whitespace
        text[span[0]:span[1]],
    )

    if match:
        span = match.span()
        offs = get_newline_index(text[span[0]:span[1]])
        return span[0] + offs, span[1]


def get_gutenberg_end_heading(text, span=None):
    """Find Gutenberg's end tag (and transcriber's notes, if available).

    Notes:
        * Duplicate/similar Gutenberg end tags.
        * Use a newline before transcriber note to prevent matching similar
          (but indented) notes at beginning of text.
        * Use DOTALL flag to match transcriber's notes across multiple lines.
          But be wary that using DOTALL prevents the use of '.+' for other
          cases, so use '[^\r\n]' instead.
    """
    if not span:
        span = (0, len(text))

    match = re.search(
        r'('
        r'(\s*\n){2,}'  # pre-whitespace, no indentation
        r'(original transcriber.+\s*\n)?'  # transcriber notes
        r'end[^\r\n]+'  # duplicate/similar tag text
        r')?'
        r'(\s*\n){2,}'  # pre-whitespace, no indentation
        + indent +
        r'(the end.*)?'
        r'('
        r'end of the project gutenberg.+'  # tag text
        r'(\s*\n){2,}'  # post-whitespace
        r')?'
        r'\*{3}\s*'  # 3 asterisks
        r"end[^\r\n]+"  # tag text
        r'\s*\*{3}'  # 3 asterisks
        r'(\s*\n){2,}',  # post-whitespace
        text[span[0]:span[1]],
        flags=re.DOTALL,
    )

    if match:
        span = match.span()
        offs = get_newline_index(text[span[0]:span[1]])
        return span[0] + offs, span[1]


def get_named_headings(text, name, span=None):
    """Find named headings with title."""
    if not span:
        span = (0, len(text))

    spans = [
        (match.start() + span[0], match.end() + span[0])
        for match in re.finditer(
            r'(^(\s*)|(\s*\n){2,})'  # pre-whitespace, no indentation
            + indent +
            r'('
            fr'{name}[ \t]+(\d+|[ivxlcd]+)'  # label with Arabic/Roman number
            r'(-+|\.)?'  # label-title delimiter
            r'((\s*\n){2})?'  # whitespace for titles two line apart
            r'.*(\r?\n.*)?'  # title (muti-line support)
            r'|'  # cases: name # \s* label, # name/label
            r'(\d+|[ivxlcd]+)'  # label with Arabic or Roman numbering
            r'(-+|\.)?'  # label-title delimiter
            fr'.*{name}.*'  # label with name
            r')'
            r'(\s*\n){2,}',  # post-whitespace
            text[span[0]:span[1]],
        )
    ]

    _spans = []
    for _span in spans:
        offs = get_newline_index(text[_span[0]:_span[1]])
        _spans.append((_span[0] + offs, _span[1]))
    return _spans


def get_numbered_headings(text, span=None):
    """Find numbered headings with no title."""
    if not span:
        span = (0, len(text))

    spans = [
        (match.start() + span[0], match.end() + span[0])
        for match in re.finditer(
            r'(^\s*|(\s*\n){2,})'  # pre-whitespace, no indentation
            + indent +
            fr'(\d+|[ivxlcd]+)'  # label with Arabic or Roman numbering
            r'(-+|\.)?'  # label-title delimiter
            r'([ \t]+\w+.*)?'  # label
            r'(\s*\n){2,}',  # post-whitespace
            text[span[0]:span[1]],
        )
    ]

    _spans = []
    for _span in spans:
        offs = get_newline_index(text[_span[0]:_span[1]])
        _spans.append((_span[0] + offs, _span[1]))
    return _spans


def get_prologue_heading(text, span=None):
    if not span:
        span = (0, len(text))

    match = re.search(
        r'(^\s*|(\s*\n){2,})'  # pre-whitespace, no indentation
        + indent +
        r'prologue'  # tag text
        r'(\s*\n){2,}',  # post-whitespace
        text[span[0]:span[1]],
    )

    if match:
        span = match.span()
        offs = get_newline_index(text[span[0]:span[1]])
        return span[0] + offs, span[1]


def get_epilogue_heading(text, span=None):
    if not span:
        span = (0, len(text))

    match = re.search(
        r'(^\s*|(\s*\n){2,})'  # pre-whitespace, no indentation
        + indent +
        r'epilogue'  # tag text
        r'(\s*\n){2,}',  # post-whitespace
        text[span[0]:span[1]],
    )

    if match:
        span = match.span()
        offs = get_newline_index(text[span[0]:span[1]])
        return span[0] + offs, span[1]

#$\color{brown}{\rm Regions~of~Interest~(ROI)}$
Functions to get spans of text between headings.

In [194]:
def get_headings_map(
    text,
    headings=['part', 'chapter', 'adventure', 'prologue', 'epilogue', 'numbered'],
):
    """Create a list of all heading spans, guarantees at least one set
    of bounding spans.

    Args:
        headings (str, List[str]): Heading names to search for.
    """
    if not isinstance(headings, (list, tuple, set)):
        _headings = [headings]
    else:
        _headings = copy.deepcopy(headings)

    headings_map = {}
    _headings_map = {}

    # Always available heading, all text
    text_heading = '_text_'

    # Ensure there is always a begin "span"
    start_span = get_gutenberg_start_heading(text)
    if not start_span:
        start_span = 0, 0

    # Ensure there is always an end "span"
    end_span = get_gutenberg_end_heading(text)
    if not end_span:
        end_span = len(text), len(text)
    headings_map[text_heading] = [start_span, end_span]
    if text_heading in _headings:
        _headings.remove(text_heading)

    # Optional
    span = get_prologue_heading(text)
    if span:
        heading = 'prologue'
        _headings_map[heading] = [span, headings_map[text_heading][1]]
        if heading in _headings:
            headings_map[heading] = _headings_map[heading]
            _headings.remove(heading)

    # Optional
    span = get_epilogue_heading(text)
    if span:
        heading = 'epilogue'
        _headings_map[heading] = [span, headings_map[text_heading][1]]
        if heading in _headings:
            headings_map[heading] = _headings_map[heading]
            _headings.remove(heading)

    # Optional
    # spans = get_numbered_headings(text)
    spans = None
    if spans:
        heading = 'numbered'
        _headings_map[heading] = [*spans, headings_map[text_heading][1]]
        if heading in _headings:
            headings_map[heading] = _headings_map[heading]
            _headings.remove(heading)

    # Optional
    for heading in _headings:
        spans = get_named_headings(text, heading)
        if spans:
            headings_map[heading] = spans
            if 'prologue' in _headings_map:
                headings_map[heading].append(_headings_map['prologue'][0])
            elif 'epilogue' in _headings_map:
                headings_map[heading].append(_headings_map['epilogue'][0])
            else:
                headings_map[heading].append(headings_map[text_heading][1])
    return headings_map


def select_rois_spans(spans, n=None):
    if n is None:
        _spans = [
            (spans[i][1], spans[i + 1][0])
            for i in range(len(spans) - 1)
        ]
    else:
        _spans = [
            (spans[i - 1][1], spans[i][0])
            for i in ([n] if isinstance(n, int) else n)
            if i >= 1 and i < (len(spans))
        ]
    return _spans


def remove_embedded_spans(spans):
    non_embedded_spans = copy.deepcopy(spans)
    for i in range(len(spans)):
        span = spans[i]
        for j in range(i + 1, len(spans)):
            _span = spans[j]
            if span[0] >= _span[0] and span[1] <= _span[1]:
                non_embedded_spans.remove(span)
                break
            elif span[1] > _span[1]:
                break
    non_embedded_spans.sort()
    return non_embedded_spans


def get_nonoverlapped_spans(spans, *, join=True):
    """Remove fully embedded spans and join overlapped spans."""
    non_embedded_spans = remove_embedded_spans(spans)
    non_embedded_spans = remove_embedded_spans(non_embedded_spans[::-1])
    if not join:
        return non_embedded_spans

    joined_spans = []
    for span in non_embedded_spans:
        for _span in non_embedded_spans:
            if span != _span:
                joined_span = None
                if span[0] >= _span[0] and span[0] <= _span[1]:
                    joined_span = (_span[0], span[1])
                elif span[1] >= _span[0] and span[1] <= _span[1]:
                    joined_span = (span[0], _span[1])
                if joined_span:
                    if joined_span not in joined_spans:
                        joined_spans.append(joined_span)
                    break
        else:
            joined_spans.append(span)

    nonoverlap_spans = sorted(joined_spans)

    # Recurse until condition is satisfied
    if nonoverlap_spans == spans:
        return nonoverlap_spans
    return get_nonoverlapped_spans(nonoverlap_spans)


def contains_span(spans, span):
    """Validate if a span is contained in a collection of spans."""
    for _span in spans:
        if span[0] >= _span[0] and span[1] <= _span[1]:
            return True
    return False


def get_rois(text, name=None, *, n=None, headings_map=None):
    """Get span bounding a ROI.

    Args:
        name (str): ROI

        n (int, Iterable[int]): Number of ROI, [1,N]
    """
    if not headings_map:
        headings_map = get_headings_map(text)

    # Always available heading, all text
    text_heading = '_text_'

    rois = []
    if not name:
        rois = [(
            headings_map[text_heading][0][1],
            headings_map[text_heading][1][0],
        )]
    elif name in headings_map:
        rois = select_rois_spans(headings_map[name], n)

    # If necessary, skip last inner heading
    _rois = []
    for roi in rois:
        value = roi[1]
        for spans in headings_map.values():
            for span in spans:
                if roi[1] > span[0] and roi[1] <= span[1]:
                    value = span[0]
        _rois.append((roi[0], value))
    return _rois



def get_roi(text, name, span=None, *, n=None):
    if not span:
        spans = get_rois(text, name, n=n)
    else:
        spans = [
            (_span[0] + span[0], _span[1] + span[0])
            for _span in get_rois(text[span[0]:span[1]], name, n=n)
        ]
    return spans


def get_end_of_roi(text, regex, span=None):
    if not span:
        span = (0, len(text))
    return [
        (match.start() + span[0], match.end() + span[0])
        for match in re.finditer(
            regex,
            text[span[0]:span[1]],
        )
    ]


def get_text_from_span(text, span=None):
    if not span:
        span = (0, len(text))
    elif isinstance(span[0], int):
        span = [span]

    roi = ''
    for _span in span:
        roi += text[_span[0]:_span[1]]
    return roi


def get_text(text, span=None, *, n=None):
    return get_rois(text)


def get_parts(text, span=None, *, n=None):
    return get_roi(text, 'part', span, n=n)


def get_chapters(text, span=None, *, n=None):
    return get_roi(text, 'chapter', span, n=n)


def get_adventures(text, span=None, *, n=None):
    return get_roi(text, 'adventure', span, n=n)


def get_numbered_sections(text, span=None, *, n=None):
    return get_roi(text, 'numbered', span, n=n)


def get_prologue(text, span=None):
    return get_roi(text, 'prologue', span)


def get_epilogue(text, span=None):
    return get_roi(text, 'epilogue', span)

In [None]:
def clean_text_ws(text):
    # Remove newlines and extra whitespaces
    return re.sub(r'\s+', ' ', text)

#$\color{brown}{\rm Drivers}$

# Doyle

## Get text from stories

In [None]:
texts = {}

In [None]:
for story in ('The Valley of Fear', 'A Study in Scarlet'):
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    text = ''
    for part_span in get_parts(corpus_l):
        for chp_span in get_chapters(corpus_l, part_span):
            text += get_text_from_span(corpus, chp_span)
            text += '\n'
    epilogue_span = get_epilogue(corpus_l)
    if epilogue_span:
        text += get_text_from_span(corpus, epilogue_span)
    texts[story] = text

In [None]:
for story in ('The Sign of the Four', 'The Hound of the Baskervilles'):
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    text = ''
    for chp_span in get_chapters(corpus_l):
        text += get_text_from_span(corpus, chp_span)
        text += '\n'
    texts[story] = text

In [None]:
for story in ('The Boscombe Valley Mystery', 'The Five Orange Pips', 'The Adventure of the Speckled Band'):
    corpus = get_corpus(story)
    texts[story] = get_text_from_span(corpus, get_adventures(corpus.lower(), n=CORPUS_URL[story][1]))

In [None]:
for story in ('The Adventure of the Cardboard Box', 'The Musgave Ritual', 'The Reigate Squires'):
    corpus = get_corpus(story)
    texts[story] = get_text_from_span(corpus, get_numbered_sections(corpus.lower(), n=CORPUS_URL[story][1]))

In [None]:
for story in ('The Adventure of the Dancing Men', 'The Adventure of the Second Stain'):
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    spans = get_adventures(corpus_l, n=CORPUS_URL[story][1])
    end_spans = get_end_of_roi(corpus_l, r'(\s*\n){2,}[*]{5,}(\s*\n){2,}', span=spans[0])
    if end_spans:
        spans = [(spans[0][0], end_spans[0][0])]
    texts[story] = get_text_from_span(corpus, spans)

In [None]:
fn = 'Doyle_novels.json'
with open(fn, 'w') as fd:
    json.dump(texts, fd)
files.download(fn)

## Get merged text from stories

In [None]:
total_token_count = 0
for idx, (story, text) in enumerate(texts.items(), start=1):
    textc = clean_text_ws(text)
    token_count = len(list(gensim.utils.tokenize(textc)))
    total_token_count += token_count
    print(f'{idx}. {story}')
    print('Token count:', token_count)
    print(f'\t{textc[:80]} ...')
    print(f'\t... {textc[-80:]}')
    print()

merged_text = '\n\n'.join(texts.values())
text = clean_text_ws(merged_text)
print(f'\t{text[:80]} ...')
print(f'\t... {text[-80:]}')
print()
print('Total token count (Gensim):', len(list(gensim.utils.tokenize(text))))

In [None]:
fn = 'Doyle.txt'
with open(fn, 'w') as fd:
    fd.write(text)
files.download(fn)

# Christie

## Get text from stories (not working)

In [183]:
texts = {}

In [None]:
for story in ('The Mysterious Affair at Styles',): 
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    text = ''
    for idx, chp_span in enumerate(get_chapters(corpus_l), start=1):
        _text = get_text_from_span(corpus, chp_span)
        text += _text
        text += '\n'
        print('Chapter', idx)
        print(_text[:100])
        print(_text[-100:])
        print()
    texts[story] = text

In [None]:
for story in ('The Man in the Brown Suite',): 
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    text = ''
    # prologue_span = get_prologue(corpus_l)
    # if prologue_span:
    #     _text = get_text_from_span(corpus, prologue_span)
    #     text += _text
    #     text += '\n'
    #     print(_text[:100])
    #     print(_text[-100:])
    #     print()
    for idx, chp_span in enumerate(get_chapters(corpus_l), start=1):
        _text = get_text_from_span(corpus, chp_span)
        text += _text
        text += '\n'
        print('Chapter', idx)
        print(_text[:100])
        print(_text[-100:])
        print()
    texts[story] = text

In [193]:
for story in ('The Murder on the Links',):
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    text = ''
    for idx, chp_span in enumerate(get_numbered_sections(corpus_l), start=1):
        _text = get_text_from_span(corpus, chp_span)
        text += _text
        text += '\n'
        print('Chapter', idx)
        print(_text[:100])
        print(_text[-100:])
        print()
    texts[story] = text

Chapter 1
I believe that a well-known anecdote exists to the effect that a young
writer, determined to make t
. “He disapproves utterly—of me, and
my sister—which last is unfair, because he hasn’t seen her!”


Chapter 2
“Say no more! Nobody loves me! I shall go into the garden and eat
worms! Boohoo! I am crushed!”


companion
seemed to have an intuitive knowledge of what was in my mind.

“Thinking of the War?”


Chapter 3
“You were through it, I suppose?”

“Pretty well. I was wounded once, and after the Somme they inva
Case?” I asked.

“Let me see, was that the old lady who was poisoned? Somewhere down in
Essex?”


Chapter 4
“That was Poirot’s first big case. Undoubtedly, but for him, the
murderer would have escaped scot-f
lla,” she said, and laughed.

But little did I think when and how I should see Cinderella again.


Chapter 5
It was five minutes past nine when I entered our joint sitting-room for
breakfast on the following 
bidden such a loaf—a
loaf haphazard 

# Rinehart

## Get text from stories

In [180]:
texts = {}

In [181]:
# disable "numbered" from 'get_headings_map()'
for story in ('The Circular Staircase', 'The Case of Jennie Brice', 'The After House', 'The Window at the White Cat', 'The Man in Lower Ten'): 
    corpus = get_corpus(story)
    corpus_l = corpus.lower()
    text = ''
    for idx, chp_span in enumerate(get_chapters(corpus_l), start=1):
        _text = get_text_from_span(corpus, chp_span)
        text += _text
        text += '\n'
        # print('Chapter', idx)
        # print(_text[:100])
        # print(_text[-100:])
        # print()
    texts[story] = text

In [182]:
fn = 'Rinehart_novels.json'
with open(fn, 'w') as fd:
    json.dump(texts, fd)
files.download(fn)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# EOF