## Save IHC case statement in a Dictionary

Two case statements in IHC - 2021 v19 20221014 have been hand edited
- the second appearance of Y has been recoded as Y1 because it has different logic
- the second appearance of BH has been recoded as BH1 because it has different logic

"DL: Other_Work_Sites" was commented out of the IHC on 20190213

In [1]:
import os
# set data directory location
os.environ["DATA_DIR"] = 'data'
data_path=os.environ.get("DATA_DIR")
# create data directory if it does not exist
try:
    os.makedirs(data_path, exist_ok = False)
    print(f"Directory {data_path} created successfully")
except OSError as error:
    print(f"Directory {data_path} already exists")

Directory data already exists


In [2]:
import re
infile = f"{data_path}/IHC - 2021 v19 20221014.txt"
outfile = f'{data_path}/scratch1.txt'

case_start = re.compile(r"^WHEN")
case_stop = re.compile(r"THEN\s+'[A-Z1]{1,3}:\s+[\w\s/$&-]+'")

extract_on = False
case_statement_lines = []

# strip non-case statements
with open(infile, 'r') as infile:
    for line in infile:
        if case_start.match(line):
            extract_on = True
        if extract_on:
            case_statement_lines.append(line.rstrip('\n'))
        if case_stop.search(line):
            extract_on = False

# save case statement line to file
with open(outfile, 'w') as out_fh:
    for line in case_statement_lines:
        # normalise non-standard WHEN clauses
        line = re.sub(r"WHEN\s*\n+", r"WHEN ", line)
        out_fh.write(f"{line}\n")


### Strip Comments (inline /\*..\*/ and -- as well as multiline /\*..\*/)

In [3]:
infile  = f'{data_path}/scratch1.txt'
outfile  = f'{data_path}/scratch2.txt'
comment_match1 = re.compile(r"/\*.*?\*/")
comment_match2 = re.compile(r"--+.*$")

# strip single line comments
with open(outfile, 'w') as out_fh:
  with open(infile, 'r') as in_fh:
    for line in in_fh:
      line = comment_match1.sub(r'', line)
      line = comment_match2.sub(r'', line)
      out_fh.write(f"{line}")

infile = f'{data_path}/scratch2.txt'
outfile = f'{data_path}/scratch3.txt'
multiline_comment_match = re.compile(r"^\/\*[\s\S]+?\*\/", re.MULTILINE)

with open(infile, 'r') as in_fh:
    case_statements = in_fh.read()
# strip multi line comments
case_statements = multiline_comment_match.sub(r'', case_statements)
case_statements = multiline_comment_match.sub(r'', case_statements)
with open(outfile, 'w') as out_fh:
    out_fh.write(case_statements)



### Standardise format of Teradata RegExp_Similar statements

In [4]:
infile = f'{data_path}/scratch3.txt'
outfile = f'{data_path}/scratch4.txt'

with open(infile, 'r') as in_fh:
    case_statements = in_fh.read()

case_statements = re.sub(r"WHEN\s+\nRegExp_Similar",
                         r"WHEN RegExp_Similar", case_statements)
case_statements = re.sub(r"WHEN\s+\n\(\nRegExp_Similar",
                         r"WHEN (RegExp_Similar", case_statements)
with open(outfile, 'w') as out_fh:
    out_fh.write(case_statements)

### Generate Raw Case Statements (minus comments, but with uniform formatting)

In [5]:
infile = f'{data_path}/scratch4.txt'
outfile = f'{data_path}/raw_case_statements.txt'

# strip blanklines
with open(outfile, 'w') as out_fh:
  with open(infile, 'r') as in_fh:
    for line in in_fh:
      if not line.isspace():
        out_fh.write(f"{line}")


## Get Key Counts

In [6]:
import re
from collections import Counter
infile = f'{data_path}/raw_case_statements.txt'
outfile = f'{data_path}/wre_code_counts.txt'

with open(infile, 'r') as in_fh:
    case_statements = in_fh.read()

# code_pattern = r"THEN\s+'([A-Z1]{1,3}):\s+[\w\s/$&-]+'"
code_pattern = r"(?:THEN\s+')([A-Z1]{1,3}):(?:\s+[\w\s/$&-]+')"

ptrn = re.compile(code_pattern, re.MULTILINE)
codes = ptrn.findall(case_statements)

# print(codes)

code_map = Counter(codes)

# get the key counts
with open(outfile, 'w') as out_fh:
    for key, value in code_map.items():
        print(f"{key}:{value}")
        out_fh.write(f"{key}:{value}\n")


DV:4
BG:4
A:4
B:4
E:4
CI:4
BS:4
DK:4
C:4
DI:4
F:4
G:4
H:4
J:4
I:4
V:4
D:4
BH:4
K:4
L:4
M:4
N:4
O:4
P:4
Q:4
R:4
U:4
S:4
T:4
CK:4
W:4
X:4
Y:4
Y1:4
Z:4
BA:4
BB:4
BC:4
BF:4
BI:4
BJ:4
BK:4
BL:4
BM:4
BN:4
BO:4
BQ:4
BR:4
AM:4
BT:4
BU:4
BV:4
BW:4
BX:4
BY:4
BZ:4
CA:4
CB:4
CC:4
CD:4
CE:4
CF:4
CG:4
CH:4
CJ:4
CM:4
CN:4
CO:4
CP:4
CQ:4
CR:4
CS:4
CT:4
CU:4
CV:4
CW:4
CX:4
CY:4
CZ:4
DA:4
DB:4
DC:4
DD:4
DE:4
DF:4
DG:4
DH:4
DJ:4
DT:4
CL:4
BD:4
BP:4
DM:4
BH1:4
ZZZ:4
DO:4
DN:4
DP:4
DQ:4
DR:4
DS:4


## Parse Case Statements into clauses and store in a dictionary

In [7]:
infile = f'{data_path}/raw_case_statements.txt'
# extract the case statements
case_statement_pattern = r"(?P<case_statement>^WHEN(?P<logic>[\s\S\n]+?)THEN\s+'(?P<wre_code>[A-Z1]{1,3}):\s+(?P<code_desc>[\w\s/$&-]+)')"
case_ptrn = re.compile(case_statement_pattern, re.MULTILINE)

# extract the case statement wre code
code_pattern = r"^THEN\s+'(?P<wre_code>[A-Z1]{1,3}):\s+[\w\s/$&-]+'"
code_ptrn = re.compile(code_pattern, re.MULTILINE)

# extract the wre code's description
code_descr_rgx = r"THEN\s+'[A-Z1]{1,3}:\s+(?P<code_desc>[\w\s/$&-]+)'"
code_descr_ptrn = re.compile(code_descr_rgx)

# extract the Teradata RegExpSimilar clause
regexp_rgx = r"\('\s*\|\|\s*'(?P<regexp>[\w|\n]+)'\s*\|\|\s*'\)"
regexp_ptrn = re.compile(regexp_rgx, re.MULTILINE)

# extract the case statement's LIKE ANY clause
like_any_rgx = r"LIKE ANY\s*(?P<like_any>\([\w\W\n]+?\))\s*\n(AND|OR|THEN)"
like_any_ptrn = re.compile(like_any_rgx, re.MULTILINE)

# extract the case statement's NOT LIKE ALL clause
not_like_all_rgx = r"NOT LIKE ALL\s*(?P<not_like_all>\([\w\W\n]+?\))[\n]*?\s*?(THEN|OR)"
not_like_all_ptrn = re.compile(not_like_all_rgx, re.MULTILINE)

with open(infile, 'r') as in_fh:
    case_statements_string = in_fh.read()

NUM_CASES = 101  # includes Y, Y1, BH and BH1
#########################################################################
# define IHC_dict dictionary to catalogue IHC case statements
#########################################################################
case_statement_list = [m.groupdict()
                       for m in case_ptrn.finditer(case_statements_string)]
IHC_dict = {statement['wre_code']: statement['case_statement']
            for statement in case_statement_list[:NUM_CASES]}

preprocess1 = re.compile(r"(\w+)(%+)(\w+)")
preprocess2 = re.compile(r"['\n()%]")
def strip_char(text):
    text = preprocess1.sub(r'\1 \3', text)
    text = preprocess2.sub('', text)
    text = text.strip()
    # text = ' '.join(preprocess.findall(text.lower())).strip()
    # text = re.sub(r"['\n\(\)]", ' ', text)
    # text = re.sub(r"(\w+)(%+)(\w+)", r'\1 \3', text)
    # text = re.sub(r"(\w+)(%+)(\w+)", r'\1 \3', text)
    return text


def assemble_clause(search_term):
    search_term = [strip_char(text) for text in re.split(',', search_term)]
    search_term = "|".join(search_term)
    # print(f"{search_term=}")
    return search_term


# split each case statement into separate clauses
for code, case_statement in IHC_dict.items():
    # print(f"{code=}")
    code_descr = code_descr_ptrn.search(case_statement)
    code_descr = code_descr.group('code_desc') if bool(code_descr) else None

    like_any = like_any_ptrn.search(case_statement)
    like_any = assemble_clause(like_any.group(
        'like_any')) if bool(like_any) else None
    # print(f"{like_any=}")

    regexp = regexp_ptrn.search(case_statement)
    regexp = regexp.group('regexp') if bool(regexp) else None
    # print(f"{regexp=}")

    not_like_all = not_like_all_ptrn.search(case_statement)
    not_like_all = assemble_clause(not_like_all.group(
        'not_like_all')) if bool(not_like_all) else None
    # print(f"{not_like_all=}")

    #########################################################################
    # redefine IHC_dict dictionary fields
    #########################################################################
    IHC_dict[code] = {"code_descr": code_descr,
                      "like_any": like_any,
                      "regexp": regexp,
                      "not_like_all": not_like_all}


# helper function to return case statemnt clauses
def get_case_statement(wre_code, component=None):
    # if component is not None:
    if bool(component):
        return IHC_dict[wre_code][component]
    else:
        return IHC_dict[wre_code]


In [8]:
print(IHC_dict['Y1'])

{'code_descr': 'Parking/Tolls', 'like_any': ' ALBERT PARK PARKING| ERSKIN PARK PARKING| EXL CAR PARKS PARKING| MACQUARIE PARK PARKING| DUDLEY PARK PARKING| NATURE PARK PARKING| PARKES PARKING| PARK MANAGEMENT PARKING| PARKVILLE PARKING| ROSNY PARK PARKING| STUART PARK PARKING| SHIFT PARK PARKING| ARIAH PARK PARKING| PRESIDENTS PARK PARKING| ERSKINE PARK PARKING PARKING ALBERT PARK| PARKING ERSKIN PARK| PARKING EXL CAR PARKS| PARKING MACQUARIE PARK| PARKING DUDLEY PARK| PARKING NATURE PARK| PARKING PARKES| PARKING PARK MANAGEMENT| PARKING PARKVILLE| PARKING ROSNY PARK| PARKING STUART PARK| PARKING SHIFT PARK TO| PARKING ARIAH PARK| PARKING PRESIDENTS PARK| PARKING ERSKINE PARK', 'regexp': None, 'not_like_all': None}


In [9]:
print(get_case_statement('H', 'code_descr'))

Home Office


In [10]:
print(get_case_statement('H', 'like_any'))

home| home office| homeoffice|Home Study| home as office| home based office| work from home|working from home| WORKING AT HOME|Work at home| WFH| WORKFROMHOME| STUDY FURNITURE| STUDY ROOM|HLP| HLP| HO| HOE|HOE| H.O.E | H/O| H/O| H O 1000*0.45| H.O.E|H|HO |H/L/P|L & P|LP| H.O.E.| H O E| H O| HO: 10 HRS/WK| HO5X48X0.52| H.O EX| HM OFF| H.O| H.O 2HRS X 48WKS X $0.52| H.O 8HRS X 48WKS X $0.52| H.O.X.| HOM OFF| H.O.X| HO: 15 H/W| HO: 10 H/W| HO: 20 HRS/WK| HOU 5 X 48 X .52| HO: 5 HRS/WK| HOU 4 X 48 X .52| HOU 3 X 48 X .52| H O 100*0.45| H OFFIS| HO: 15 HRS/WK| H.O 10HRS X 48WKS X $0.52| HOU 10 X 48 X .52| HO: 6HRS P/WK*48WKS*.52| H.O. 2HRS X 48WKS X 0.52| H.O 2*5*48W*.52| H.O.| HO: 5 H/W| H.O 3HRS X 48WKS X $0.52| H.O. 15 HRS X 48 WKS @ 52C| H O EXP| H.O 10H*48W*.52| H.O 10HRS X 40WKS X $0.52| HOU 8 X 48 X .52| H.O 5HRS X 48WKS X $0.52| HO: 5HRS*48*45C| H O 1000*0.55| HMOFF| HO10X48X0.52| H.O 1*5*48W*.52| H.O 5HRS X 40WKS X $0.52| HO: 4HRS P/WK*48WKS*.52| H.O 4HRS X 48WKS X $0.52| HO: 3HRS 

In [11]:
print(get_case_statement('H', 'regexp'))

None


In [12]:
import json
outfile = f'{data_path}/IHC_dict.txt'

# write text description of case statement dictionary to file
with open(outfile, 'w') as case_statement_fh:
    case_statement_fh.write(json.dumps(IHC_dict))

# read text description of case statement dictionary from file
with open(outfile, 'r') as f:
    dictionary_as_text = f.read()

# reconstruct the case statement dictionary from its text description
IHC_dict = json.loads(dictionary_as_text)
