In [1]:
from pathlib import Path
import pandas as pd
import regex as re
import json

In [2]:
from playhouse.migrate import migrate, PostgresqlMigrator
from playhouse.postgres_ext import ForeignKeyField

In [3]:
from covid19app.data import *

In [88]:
# FileType, Language, Tag, Jurisdiction, DocumentType
# is_terminating_version

In [None]:

from uuid import uuid5, NAMESPACE_URL, UUID
from tempfile import NamedTemporaryFile
from pathlib import Path
from typing import Any
import pandas as pd
import regex as re
import subprocess


texts_folder = Path('single_texts/')

def hash_args(*args: Any) -> UUID:
    args_str = '.'.join(str(a) for a in args)

    return uuid5(NAMESPACE_URL, args_str)

def powershell_search(
    search_string: int,
    case_sensitive: bool=False,
    context_lines: int=5,
    use_regex: bool=True
) -> pd.DataFrame:
    args_hash = hash_args(search_string, case_sensitive, context_lines, use_regex)

    temp_output_file = Path(f'.{args_hash}.txt')

    if not use_regex:
        search_string = re.escape(search_string)

    powershell_command = [
        'powershell.exe', 'Get-ChildItem', '-Path', str(texts_folder.absolute()),
        '-Recurse', '|', 'Select-String', '-Pattern', f"'{search_string}'", '-AllMatches',
        '-Context', str(context_lines), '|', 'Out-File', '-Encoding', 'ascii'
    ]

    if case_sensitive:
        powershell_command.append('-CaseSensitive')

    powershell_command.append(str(temp_output_file))

    subprocess \
        .Popen(powershell_command) \
        .communicate()

    raw_search_results = temp_output_file \
        .read_text(encoding='utf-8')

    temp_output_file.unlink()

    raw_search_results = raw_search_results.splitlines()

    search_results = []

    for i, line in enumerate(raw_search_results):
        if re.match('^ *>.+$', line):
            line_group = raw_search_results[
                slice(i - context_lines, i + context_lines + 1)
            ]

            document_version_id = None
            first_line_number = None
            last_line_number = None
            text = ''

            for j, line_group_item in enumerate(line_group):
                match = re.search(
                    r'\s*>*\s*single_texts\\([0-9]+)\.txt:([0-9]+):(.+)',
                    line_group_item
                )

                if match:
                    file_id, _, line_text = match.groups()

                    if j == context_lines:
                        # Lines numbers are indexed from 1, not 0
                        first_line_number = i - context_lines + 1
                        last_line_number = i + context_lines + 1
                        document_version_id = int(file_id)

                    text += line_text

            search_results.append((
                document_version_id, first_line_number,
                last_line_number, text
            ))

    search_results = pd.DataFrame(search_results, columns=[
        'id', 'start_line_number', 'end_line_number', 'text'
    ])

    search_results = search_results.sort_values('start_line_number')

    regex_flag = 0 if case_sensitive else re.IGNORECASE

    search_results['text_matches'] = search_results \
        .loc[:, 'text'] \
        .str.findall(search_string, flags=regex_flag) \
        .map(set) \
        .map(list)

    search_results = search_results.loc[
        search_results['text_matches'].str.len().gt(0)
    ]

    duplicates = []

    for _, group in search_results.groupby(['text', 'id']):
        if group.shape[0] > 1:
            for i, row in group.iterrows():
                is_duplicate = (
                    (row.start_line_number > group.start_line_number) &
                    (row.start_line_number < group.end_line_number)
                )

                if is_duplicate.any():
                    duplicates.append(i)

    search_results = search_results.drop(duplicates)

    return search_results

In [None]:
search_results = powershell_search('cohort', case_sensitive=False, context_lines=5, use_regex=True)

In [None]:
class PowershellSearchResult(BaseModel):
    search_string = TextField()
    case_sensitive = BooleanField(default=False)
    context_lines = SmallIntegerField(default=5)
    use_regex = BooleanField(default=True)
    results = BinaryJSONField()

In [None]:
document_version_ids = set(
    search_results.id
)

document_versions = DocumentVersion \
    .select(
        DocumentVersion.id,
        DocumentVersion.title,
        DocumentVersion.slug,
        DocumentVersion.effective_date,
        DocumentVersion.termination_date,
        Document.id.alias('document_id'),
        Document.title.alias('document_title'),
        Document.slug.alias('document_slug')
    ) \
    .join(Document) \
    .where(DocumentVersion.id.in_(document_version_ids)) \
    .dicts()

document_versions = pd \
    .DataFrame(document_versions) \
    .set_index('id')

search_results = search_results \
    .set_index('id') \
    .join(document_versions)

documents_dates = search_results \
    .groupby('document_id') \
    .agg({
        'effective_date': [min, max]
    }) \
    .set_axis([
        'document_first_effective_date',
        'document_last_effective_date'
    ], axis=1)

search_results = search_results \
    .join(documents_dates, on='document_id')

In [None]:
8519