In [None]:
from pathlib import Path
from collections import defaultdict
from typing import Any, NamedTuple

import numpy as np
import json
import re
import string

In [None]:
DATA_PATH = Path('/data/Datasets/usg-kaggle/train/')

DIGITS = string.digits

In [None]:
annotations = list(
    DATA_PATH.rglob("lower_right_annotation.json")
)
len(annotations)

In [None]:
class Rule:
    def compute(self, value: Any) -> bool:
        raise NotImplementedError
     
    def __call__(self, value: Any) -> bool:
        return self.compute(value)
    
    @staticmethod
    def _from_lambda(a_lambda) -> "Rule":
        rule = Rule()
        rule.compute = a_lambda
        return rule
    
    def or_(self, other: "Rule") -> "Rule":
        return Rule._from_lambda(lambda value: self(value) or other(value))
    
    def and_(self, other: "Rule") -> "Rule":
        return Rule._from_lambda(lambda value: self(value) and other(value))
    
    def not_(self) -> "Rule":
        return Rule._from_lambda(lambda value: not self(value))
    
class Entry(NamedTuple):
    attribute: str
    value: float
    
class IsToShort(Rule):
    def compute(self, value: Any) -> bool:
        return len(value) <= 4
    
class HasTextInside(Rule):
    def __init__(self, text: str):
        self._text = text
        
    def compute(self, value: Any) -> bool:
        return self._text in value

class DoesNotHaveGivenStrings(Rule):
    def __init__(self, t: list):
        self._t = t
        
    def compute(self, value: Any) -> bool:
        return all([val not in value for val in self._t])
    
class CheckTexts:
    def __init__(self, drop_condition: Rule):
        self.drop_condition = drop_condition
        
    def __call__(self, texts: list) -> list:
        new_data = []
        for sample in texts:
            if self.drop_condition.compute(sample):
                continue
            new_data.append(sample)
        return new_data
    
class ExtractText:
    def __call__(self, all_texts: list) -> list:
        new_data = []
        for sample in all_texts:
            for subsample in sample.split('\n'):
                new_data.append(subsample)
        return new_data
    
class SingleLineProcessor:
    def transform_single_line(self, line: str) -> str:
        raise NotImplementedError
        
class ConvertOto0(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        return line.replace('o', '0')
    
class RemoveL(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        return line.replace('l', '')
    
class RemoveUnits(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        return line.replace('kpa', '').replace('k', '').replace('mm', '')
    
class ReplaceChar(SingleLineProcessor):
    def __init__(self, from_, to_):
        self._from_ = from_
        self._to_ = to_
        
    def transform_single_line(self, line: str) -> str:
        return line.replace(self._from_, self._to_)
    
class RemoveFirstCharIfDIgit(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        if line[0] in DIGITS:
            return line[1:]
        return line
    
class RemoveUnnecessaryForwardSlash(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        new_string = []
        for i, char in enumerate(line):
            if char == '/':
                if line[i - 1] == '.' and not (i < len(line) - 1 and line[i + 1] in DIGITS):
                    char = "7"
                elif i >= 2 and line[i - 1] in DIGITS and line[i - 2] in DIGITS:
                    char = ""
                elif (
                    i >= 2 
                    and line[i - 1] in DIGITS 
                    and line[i - 2] == " "
                    and line[i + 1] not in DIGITS
                    and "max" in line
                ):
                    char = "7"   
                else:
                    char = ""
            new_string.append(char)
        return ''.join(new_string)
    
class LeaveSingleDot(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        new_string = []
        for i, char in enumerate(line):
            if char == '.':
                if i == 0:
                    char = ""
                elif i > 0 and line[i - 1] == '.':
                    char = ""
                elif i > 0 and line[i - 1] == ' ':
                    char = ""
            new_string.append(char)
        return ''.join(new_string)
    
class JoinNumberAfterDot(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        new_string = []
        for i, char in enumerate(line):
            if char == ' ':
                if i > 0 and i < len(line) - 1 and line[i - 1] == '.' and line[i + 1] in DIGITS:
                    char = ""
            new_string.append(char)
        return ''.join(new_string)
    
class Strip(SingleLineProcessor):
    def transform_single_line(self, line: str) -> str:
        return line.strip()
    
class ExtractEntry(SingleLineProcessor):
    def transform_single_line(self, line: str) -> Entry:
        components = line.split()
        if len(components) > 2:
            components = components[1:]
        attribute, number_str = components
        number = float(number_str)
        return Entry(attribute, number)
        
class LinesProcessor:
    def __init__(self, processors: list):
        self._processors = processors
        
    def __call__(self, lines: list) -> list:
        new_data = []
        for line in lines:
            for proc in self._processors:
                line = proc.transform_single_line(line)
            new_data.append(line)
        return new_data
    
class EntriesSaver:
    def __call__(self, entries: list, path: str):
        a_dict = {
            entry.attribute: entry.value
            for entry in entries
        }
        
        Path(path).write_text(json.dumps(a_dict))

pre_text_clear = CheckTexts(
    DoesNotHaveGivenStrings([
        "diam", "max", "min", "sd", "mean", "digm"
    ])
)

additional_text_extractor = ExtractText()

post_text_clear = CheckTexts(
    DoesNotHaveGivenStrings([
        "diam", "max", "min", "sd", "mean", "digm"
    ])
)

saver = EntriesSaver()


text_processor = LinesProcessor([
    ConvertOto0(),
    ReplaceChar('g', 'a'),
    ReplaceChar('»', ''),
    ReplaceChar('_', ''),
    ReplaceChar('<', ''),
    ReplaceChar(')', ''),
    ReplaceChar('"', ''),
    ReplaceChar('«', ''),
    ReplaceChar('*', ''),
    ReplaceChar('b', ''),
    ReplaceChar('\\', ''),
    ReplaceChar('‘', ''),
    RemoveUnits(),
    RemoveUnnecessaryForwardSlash(),
    LeaveSingleDot(),
    JoinNumberAfterDot(),
    RemoveL(),
    Strip(),
    ExtractEntry()
])

In [None]:
for s in annotations:
    datum = json.loads(s.read_text())
    ts = [d["text"].lower() for d in datum]
    pre_ts = pre_text_clear(ts)
    int_ts = additional_text_extractor(pre_ts)
    post_ts = post_text_clear(int_ts)
    try:
        final_ts = text_processor(post_ts)
        saver(final_ts, (s.parent / "regression_ground_truth.json").as_posix())
    except Exception as e:
        print(s, e, post_ts)