# Rust spanner regex
> This module contains implementation of regex ie functions using the rust package `enum-spanner-rs`

In [1]:
#| default_exp src.rgxlog_interpreter.src.rgxlog.stdlib.rust_spanner_regex

In [2]:
#| hide
from __future__ import annotations

In [3]:
#| hide
from nbdev.showdoc import *

In [4]:
#| export
import logging
import re
import tempfile
from pathlib import Path
from subprocess import Popen, PIPE
from sys import platform
from typing import Tuple, List, Union, Iterable, Sequence, no_type_check, Callable, Optional
import os

from spanner_workbench.src.rgxlog_interpreter.src.rgxlog.engine.datatypes.primitive_types import DataTypes, Span
from spanner_workbench.src.rgxlog_interpreter.src.rgxlog.stdlib.utils import run_cli_command

In [5]:
#| hide
#| exec_doc
import inspect

In [6]:
#| hide
#| exec_doc
imported_classes = []
for name, obj in list(globals().items()):
    if inspect.isclass(obj) and inspect.getmodule(obj) is not None:
        imported_classes.append(obj)
for cls in imported_classes:
    if 'spanner_workbench' in cls.__module__:
        cls.__module__ = cls.__module__.split('.')[-1]

#| hide
# types

In [7]:
#| export
#| hide
RUST_RGX_IN_TYPES = [DataTypes.string, DataTypes.string]

#| hide
# rust

In [8]:
#| export
DOWNLOAD_RUST_URL = "https://rustup.rs/"

#| hide
# package info
@niv: i use my fork here because it's more stable than the original

In [9]:
#| export
PACKAGE_GIT_URL = "https://github.com/NNRepos/enum-spanner-rs"
PACKAGE_NAME = "enum-spanner-rs"
PACKAGE_WIN_FILENAME = PACKAGE_NAME + ".exe"
REGEX_FOLDER_NAME = "enum_spanner_regex"

#| hide
# installation paths

In [10]:
#| export
REGEX_FOLDER_PATH = Path(os.path.join(os.path.dirname(os.getcwd()),'spanner_workbench','src','rgxlog_interpreter','src','rgxlog','stdlib')) / REGEX_FOLDER_NAME
REGEX_TEMP_PATH = Path(REGEX_FOLDER_PATH) / "temp{}.txt"
REGEX_EXE_PATH_POSIX = Path(REGEX_FOLDER_PATH) / "bin" / PACKAGE_NAME
REGEX_EXE_PATH_WIN = Path(REGEX_FOLDER_PATH) / "bin" / PACKAGE_WIN_FILENAME

#| hide
# commands

In [11]:
#| export
RUSTUP_TOOLCHAIN = "1.34"
CARGO_CMD_ARGS: Sequence[Union[Path, str]] = ["cargo", "+" + RUSTUP_TOOLCHAIN, "install", "--root", REGEX_FOLDER_PATH, "--git", PACKAGE_GIT_URL]
RUSTUP_CMD_ARGS = ["rustup", "toolchain", "install", RUSTUP_TOOLCHAIN]
SHORT_TIMEOUT = 3
CARGO_TIMEOUT = 300
RUSTUP_TIMEOUT = 300
TIMEOUT_MINUTES = (CARGO_TIMEOUT + RUSTUP_TIMEOUT) // 60

#| hide
# os-dependent variables

In [12]:
#| export
WINDOWS_OS = "win32"
WHICH_WORD = "where" if platform == WINDOWS_OS else "which"
REGEX_EXE_PATH = REGEX_EXE_PATH_WIN if platform == WINDOWS_OS else REGEX_EXE_PATH_POSIX

#| hide
# patterns
taken from https://stackoverflow.com/questions/5452655/

In [13]:
#| export
#| hide
ESCAPED_STRINGS_PATTERN = re.compile(r'"([^"\\]*(?:\\.[^"\\]*)*)"', re.DOTALL)
SPAN_PATTERN = re.compile(r"(?P<start>\d+), ?(?P<end>\d+)")

In [14]:
#| export
#| hide
# etc
TEMP_FILE_NAME = "temp"

logger = logging.getLogger(__name__)

In [15]:
#| export
#| hide
def _download_and_install_rust_regex() -> None:
    # don't use "cargo -V" because it starts downloading stuff sometimes
    with Popen([WHICH_WORD, "cargo"], stdout=PIPE, stderr=PIPE) as cargo:
        errcode = cargo.wait(SHORT_TIMEOUT)

    with Popen([WHICH_WORD, "rustup"], stdout=PIPE, stderr=PIPE) as rustup:
        errcode |= rustup.wait(SHORT_TIMEOUT)

    if errcode:
        raise IOError(f"cargo or rustup are not installed in $PATH. please install rust: {DOWNLOAD_RUST_URL}")

    logger.warning(f"{PACKAGE_NAME} was not found on your system")
    logger.warning(f"installing package. this might take up to {TIMEOUT_MINUTES} minutes...")

    # there's no pipe here to let the user to see the output
    with Popen(RUSTUP_CMD_ARGS) as rustup:
        rustup.wait(RUSTUP_TIMEOUT)

    with Popen(CARGO_CMD_ARGS) as cargo:
        cargo.wait(CARGO_TIMEOUT)

    if not _is_installed_package():
        raise Exception("installation failed - check the output")

    logger.warning("installation completed")

In [16]:
#| export
#| hide
def _is_installed_package() -> bool:
    return Path(REGEX_EXE_PATH).is_file()

In [17]:
#| export
#| hide
@no_type_check
def rgx_span_out_type(output_arity: int) -> Tuple[DataTypes]:
    return tuple([DataTypes.span] * output_arity)

In [18]:
#| export
#| hide
@no_type_check
def rgx_string_out_type(output_arity: int) -> Tuple[DataTypes]:
    return tuple([DataTypes.string] * output_arity)

In [19]:
#| export
#| hide
def _format_spanner_string_output(output: Iterable[str]) -> List[List[str]]:
    output_lists = []
    for out in output:
        out_list = []
        matches = ESCAPED_STRINGS_PATTERN.findall(out)
        for match in matches:
            # the pattern leaves the backslashes
            escaped_match = re.sub(r'\\"', '"', match)
            out_list.append(escaped_match)
        output_lists.append(out_list)

    return output_lists

In [20]:
#| export
#| hide
def _format_spanner_span_output(output: Iterable[str]) -> List[List[Span]]:
    output_lists = []
    for out in output:
        out_list = []
        matches = SPAN_PATTERN.finditer(out)
        for match in matches:
            start, end = int(match.group("start")), int(match.group("end"))
            out_list.append(Span(start, end))
        output_lists.append(out_list)

    return output_lists

In [21]:
#| export
def rgx(regex_pattern: str, # the pattern to run
        out_type: str, # string/span - decides which one will be returned
        text: Optional[str] = None, # the string on which regex is run
        text_file: Optional[str] = None # use text from this file instead of `text`. default: None
        ) -> Iterable[Iterable[Union[str, Span]]]: # a tuple of strings/spans
    """
    An IE function which runs regex using rust's `enum-spanner-rs` and yields tuples of strings/spans (not both).
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        if text_file:
            rgx_temp_file_name = Path(text_file)
        else:
            assert text is not None, "at least one of text/text_file must have a value"
            rgx_temp_file_name = Path(temp_dir) / TEMP_FILE_NAME
            with open(rgx_temp_file_name, "w+") as f:
                f.write(text)

        if out_type == "string":
            rust_regex_args = f"{REGEX_EXE_PATH} {regex_pattern} {rgx_temp_file_name}"
            format_function: Callable = _format_spanner_string_output
        elif out_type == "span":
            rust_regex_args = f"{REGEX_EXE_PATH} {regex_pattern} {rgx_temp_file_name} --bytes-offset"
            format_function = _format_spanner_span_output
        else:
            assert False, "illegal out_type"

        regex_output = format_function(run_cli_command(rust_regex_args, stderr=True))

        for out in regex_output:
            yield out

In [22]:
#| export
def rgx_span(text: str, # The input text for the regex operation
             regex_pattern: str # The pattern of the regex operation
             ) -> Iterable[Iterable[Union[str, Span]]]: # tuples of spans that represents the results
    return rgx(regex_pattern, "span", text=text)

In [23]:
#| export
#| hide
RGX = dict(ie_function=rgx_span,
           ie_function_name='rgx_span',
           in_rel=RUST_RGX_IN_TYPES,
           out_rel=rgx_span_out_type)

In [24]:
#| export
def rgx_string(text: str, # The input text for the regex operation
               regex_pattern: str # he pattern of the regex operation
               ) -> Iterable[Iterable[Union[str, Span]]]: # tuples of strings that represents the results
    return rgx(regex_pattern, "string", text=text)

In [25]:
#| export
#| hide
RGX_STRING = dict(ie_function=rgx_string,
                  ie_function_name='rgx_string',
                  in_rel=RUST_RGX_IN_TYPES,
                  out_rel=rgx_string_out_type)

In [26]:
#| export
def rgx_span_from_file(text_file: str, # The input file for the regex operation
                       regex_pattern: str # The pattern of the regex operation
                       ) -> Iterable[Iterable[Union[str, Span]]]: # tuples of spans that represents the results
    return rgx(regex_pattern, "span", text_file=text_file)

In [27]:
#| export
#| hide
RGX_FROM_FILE = dict(ie_function=rgx_span_from_file,
                     ie_function_name='rgx_span_from_file',
                     in_rel=RUST_RGX_IN_TYPES,
                     out_rel=rgx_span_out_type)

In [28]:
#| export
def rgx_string_from_file(text_file: str, # The input file for the regex operation
                         regex_pattern: str # The pattern of the regex operation
                         ) -> Iterable[Iterable[Union[str, Span]]]: # tuples of strings that represents the results
    return rgx(regex_pattern, "string", text_file=text_file)

In [29]:
#| export
#| hide
RGX_STRING_FROM_FILE = dict(ie_function=rgx_string_from_file,
                            ie_function_name='rgx_string_from_file',
                            in_rel=RUST_RGX_IN_TYPES,
                            out_rel=rgx_string_out_type)

In [30]:
#| hide
#| export

# the package is installed when this module is imported
if not _is_installed_package():
    _download_and_install_rust_regex()