From 68a8ae4ac202af90a0181fcc11371f45b8869c5a Mon Sep 17 00:00:00 2001 From: mikeqfu Date: Tue, 15 Nov 2022 09:54:47 +0000 Subject: [PATCH] Fix bugs in the class `LocationIdentifiers` --- pyrcs/line_data/loc_id.py | 545 +++++++++++++++++++++++++++----------- 1 file changed, 392 insertions(+), 153 deletions(-) diff --git a/pyrcs/line_data/loc_id.py b/pyrcs/line_data/loc_id.py index e9d5753..ef59c7d 100644 --- a/pyrcs/line_data/loc_id.py +++ b/pyrcs/line_data/loc_id.py @@ -14,7 +14,7 @@ from pyhelpers.store import load_data, save_data from ..parser import get_catalogue, get_hypertext, get_last_updated_date, get_page_catalogue, \ - parse_date, parse_location_name, parse_tr + parse_date, parse_tr from ..utils import collect_in_fetch_verbose, confirm_msg, fetch_data_from_file, home_page_url, \ init_data_dir, is_home_connectable, print_collect_msg, print_conn_err, print_inst_conn_err, \ print_void_msg, save_data_to_file, validate_initial @@ -185,11 +185,11 @@ def __init__(self, data_dir=None, update=False, verbose=True): >>> lid = LocationIdentifiers() - >>> print(lid.NAME) - CRS, NLC, TIPLOC and STANOX codes + >>> lid.NAME + 'CRS, NLC, TIPLOC and STANOX codes' - >>> print(lid.URL) - http://www.railwaycodes.org.uk/crs/crs0.shtm + >>> lid.URL + 'http://www.railwaycodes.org.uk/crs/crs0.shtm' """ print_conn_err(verbose=verbose) @@ -299,25 +299,17 @@ def collect_explanatory_note(self, confirmation_required=True, verbose=False): >>> lid.KEY_TO_MSCEN 'Multiple station codes explanatory note' - >>> exp_note_dat = exp_note[lid.KEY_TO_MSCEN] + >>> exp_note_dat = exp_note[lid.KEY_TO_MSCEN] >>> type(exp_note_dat) pandas.core.frame.DataFrame - >>> exp_note_dat - Location CRS CRS_alt1 CRS_alt2 - 0 Glasgow Central GLC GCL - 1 Glasgow Queen Street GLQ GQL - 2 Heworth HEW HEZ - 3 Highbury & Islington HHY HII XHZ - 4 Lichfield Trent Valley LTV LIF - 5 Liverpool Lime Street LIV LVL - 6 Liverpool South Parkway LPY ALE - 7 London St Pancras STP SPL SPX - 8 Retford RET XRO - 9 Smethwick Galton Bridge SGB GTI - 10 Tamworth TAM TAH - 11 Willesden Junction WIJ WJH WJL - 12 Worcestershire Parkway WOP WPH + >>> exp_note_dat.head() + Location CRS CRS_alt1 CRS_alt2 + 0 Glasgow Central GLC GCL + 1 Glasgow Queen Street GLQ GQL + 2 Heworth HEW HEZ + 3 Highbury & Islington HHY HII XHZ + 4 Lichfield Trent Valley LTV LIF """ cfm_msg = confirm_msg(data_name=self.KEY_TO_MSCEN) @@ -367,7 +359,7 @@ def collect_explanatory_note(self, confirmation_required=True, verbose=False): verbose=verbose) except Exception as e: - print("Failed. {}.".format(e)) + print(f"Failed. {e}") explanatory_note = None return explanatory_note @@ -400,24 +392,17 @@ def fetch_explanatory_note(self, update=False, dump_dir=None, verbose=False): >>> lid.KEY_TO_MSCEN 'Multiple station codes explanatory note' + >>> exp_note_dat = exp_note[lid.KEY_TO_MSCEN] >>> type(exp_note_dat) pandas.core.frame.DataFrame - >>> exp_note_dat - Location CRS CRS_alt1 CRS_alt2 - 0 Glasgow Central GLC GCL - 1 Glasgow Queen Street GLQ GQL - 2 Heworth HEW HEZ - 3 Highbury & Islington HHY HII XHZ - 4 Lichfield Trent Valley LTV LIF - 5 Liverpool Lime Street LIV LVL - 6 Liverpool South Parkway LPY ALE - 7 London St Pancras STP SPL SPX - 8 Retford RET XRO - 9 Smethwick Galton Bridge SGB GTI - 10 Tamworth TAM TAH - 11 Willesden Junction WIJ WJH WJL - 12 Worcestershire Parkway WOP WPH + >>> exp_note_dat.head() + Location CRS CRS_alt1 CRS_alt2 + 0 Glasgow Central GLC GCL + 1 Glasgow Queen Street GLQ GQL + 2 Heworth HEW HEZ + 3 Highbury & Islington HHY HII XHZ + 4 Lichfield Trent Valley LTV LIF """ explanatory_note = fetch_data_from_file( @@ -426,50 +411,337 @@ def fetch_explanatory_note(self, update=False, dump_dir=None, verbose=False): return explanatory_note + # -- CRS, NLC, TIPLOC and STANOX --------------------------------------------------------------- + @staticmethod - def _collect_others_note(other_note_x): - """ Collect notes about the code columns """ - if other_note_x is not None: - # Search for notes - n1 = re.search(r'(?<=[\[(\'])[\w,? ]+(?=[)\]\'])', other_note_x) - note = n1.group(0) if n1 is not None else '' - - # Strip redundant characters - n2 = re.search(r'[\w ,]+(?= [\[(\'])', note) - if n2 is not None: - note = n2.group(0) + def _parse_location_name(x): + """ + Parse location name (and its associated note). + + :param x: location name (in raw data) + :type x: str or None + :return: location name and note (if any) + :rtype: tuple + + **Examples**:: + + >>> from pyrcs.line_data import LocationIdentifiers + >>> # from pyrcs import LocationIdentifiers + + >>> lid = LocationIdentifiers() + + >>> dat = lid._parse_location_name(None) + >>> dat + ('', '') + + >>> dat = lid._parse_location_name('Abbey Wood') + >>> dat + ('Abbey Wood', '') + + >>> dat = lid._parse_location_name('Abercynon (formerly Abercynon South)') + >>> dat + ('Abercynon', 'formerly Abercynon South') + + >>> dat = lid._parse_location_name('Allerton (reopened as Liverpool South Parkway)') + >>> dat + ('Allerton', 'reopened as Liverpool South Parkway') + + >>> dat = lid._parse_location_name('Ashford International [domestic portion]') + >>> dat + ('Ashford International', 'domestic portion') + + >>> dat = lid._parse_location_name('Ayr [unknown feature]') + >>> dat + ('Ayr', 'unknown feature') + + >>> dat = lid._parse_location_name('Birkenhead Hamilton Square [see Hamilton Square]') + >>> dat + ('Birkenhead Hamilton Square', 'see Hamilton Square') + """ + + if not x: + x_, note = '', '' else: - note = '' + # Location name + d = re.search(r'.*(?= \[[\"\']\()', x) + if d is not None: + x_ = d.group() + elif ' [unknown feature' in x: # ' [unknown feature, labelled "do not use"]' in x + x_ = re.search(r'\w.*(?= \[unknown feature(, )?)', x).group(0) + elif ') [formerly' in x: + x_ = re.search(r'.*(?= \[formerly)', x).group(0) + else: + x_pat = re.compile( + r'[Oo]riginally |' + r'[Ff]ormerly |' + r'[Ll]ater |' + r'[Pp]resumed |' + r' \(was |' + r' \(in |' + r' \(at |' + r' \(also |' + r' \(second code |' + r'\?|' + r'\n|' + r' \(\[\'|' + r' \(definition unknown\)|' + r' \(reopened |' + r'( portion])$|' + r'[Ss]ee ' + ) + x_tmp = re.search(r'(?=[\[(]).*(?<=[])])|(?=\().*(?<=\) \[)', x) + x_tmp = x_tmp.group(0) if x_tmp is not None else x + x_ = ' '.join(x.replace(x_tmp, '').split()) if re.search(x_pat, x) else x + + # Note + y_ = x.replace(x_, '', 1).strip() + if y_ == '': + note = '' + else: + note_ = re.search(r'(?<=[\[(])[\w ,?]+(?=[])])', y_) + if note_ is None: + note_ = re.search( + r'(?<=(\[[\'\"]\()|(\([\'\"]\[)|(\) \[)).*(?=(\)[\'\"]])|(][\'\"]\))|])', y_) + elif '"now deleted"' in y_ and y_.startswith('(') and y_.endswith(')'): + note_ = re.search(r'(?<=\().*(?=\))', y_) - return note + note = note_.group(0) if note_ is not None else '' + if note.endswith('\'') or note.endswith('"'): + note = note[:-1] + + if 'STANOX ' in x_ and 'STANOX ' in x and note == '': + x_ = x[0:x.find('STANOX')].strip() + note = x[x.find('STANOX'):] + + return x_, note + + def parse_location_name(self, data): + """ + Parse the location names of the preprocessed data. + + :param data: preprocessed data of the location codes + :type data: pandas.DataFrame + """ + + # Collect additional information as note + data[['Location', 'Location_Note']] = pd.DataFrame( + data['Location'].map(self._parse_location_name).to_list()) + + # # Debugging + # for i, x in enumerate(data['Location']): + # try: + # _parse_location_name(x) + # except Exception: + # print(i) + # break + + # Regulate location names + data.replace(_amendment_to_location_names(), regex=True, inplace=True) + + @staticmethod + def cleanse_mult_alt_codes(data): + """ + Cleanse multiple alternatives for every code column. + + :param data: preprocessed data of the location codes + :type data: pandas.DataFrame + :return: cleansed data of the location codes where multiple alternatives are replicated + :rtype: pandas.DataFrame + """ + + data_ = data.copy() + + code_col_names = ['CRS', 'NLC', 'TIPLOC', 'STANME', 'STANOX'] + + def _count_sep(x): + if '\r\n' in x: + y = x.count('\r\n') + elif '\r' in x: + y = x.count('\r') + else: + y = x.count('\n') + return y + + r_n_counts = data_[code_col_names].applymap(_count_sep) + # r_n_counts = pd.concat([data[c].str.count(r'\r(\n)?') for c in code_col_names], axis=1) + r_n_counts_ = r_n_counts.mul(-1).add(r_n_counts.max(axis=1), axis='index') + + for col in code_col_names: + for i in data_.index: + d = r_n_counts_.loc[i, col] + if d > 0: + dat = data_.loc[i, col] + if '\r\n' in dat: + data_.loc[i, col] = dat + ''.join(['\r\n'] * d) + elif '\r' in dat: + data_.loc[i, col] = dat + ''.join(['\r'] * d) + else: # '\n' in dat: + data_.loc[i, col] = dat + ''.join(['\n'] * d) + + def _split_dat_and_note(x): + if '\r\n' in x: + x_ = x.split('\r\n') + elif '\r' in x: + x_ = x.split('\r') + elif '\n' in x: + x_ = x.split('\n') + else: + x_ = x + return x_ + + data_[code_col_names] = data_[code_col_names].applymap(_split_dat_and_note) + + return data_.explode(code_col_names, ignore_index=True) + + @staticmethod + def _get_code_note(x): + """ + Get note for every code column. + + :param x: raw data of a given code + :type x: str or None + :return: extra information (if any) about the given code + :rtype: str + + **Examples**:: + + >>> from pyrcs.line_data import LocationIdentifiers + >>> # from pyrcs import LocationIdentifiers + + >>> lid = LocationIdentifiers() + + >>> lid._get_code_note('860260✖Earlier code') + ('860260', 'Earlier code') + """ + + if x: + if '✖' in x: + x_, note = x.split('✖') + + else: # Search for notes + n1 = re.search(r'(?<=[\[(])[\w,? ]+(?=[)\]])', x) + + if n1 is not None: + note = n1.group(0) + x_ = x.replace(note, '').strip('[(\')] ') + + n2 = re.search(r'[\w ,]+(?= [\[(\'])', note) # Strip redundant characters + if n2 is not None: + note = n2.group(0) + + else: + x_, note = x, '' + + else: + x_, note = x, '' + + return x_, note + + def get_code_notes(self, data): + """ + Get notes for every code column. + + :param data: preprocessed data of the location codes + :type data: pandas.DataFrame + """ + + # drop_pat = re.compile(r'[Ff]ormerly|[Ss]ee[ also]|Also .[\w ,]+') + # idx = [data[data['CRS'] == x].index[0] for x in data['CRS'] if re.match(drop_pat, x)] + # data.drop(labels=idx, axis=0, inplace=True) + + codes_col_names = ['CRS', 'NLC', 'TIPLOC', 'STANME', 'STANOX'] + # notes_col_names = [x + '_Note' for x in codes_col_names] + # data[notes_col_names] = data[codes_col_names].applymap(self._get_code_note) + for col in codes_col_names: + data[[col, col + '_Note']] = pd.DataFrame( + data[col].map(self._get_code_note).to_list(), index=data.index) + + # # Debugging: + # for i, x in enumerate(data[col]): + # try: + # _get_code_note(x) + # except Exception: + # print(i) + # break @staticmethod def _parse_stanox_note(x): # Parse STANOX note + """ + Parse STANOX note. + + :param x: STANOX note + :type x: str or None + :return: stanox and its corresponding note + :rtype: tuple + """ + if x in ('-', '') or x is None: - data, note = '', '' + stanox, note = '', '' else: if re.match(r'\d{5}$', x): - data = x + stanox = x note = '' + elif re.match(r'\d{5}\*$', x): - data = x.rstrip('*') + stanox = x.rstrip('*') note = 'Pseudo STANOX' + elif re.match(r'\d{5} \w.*', x): - data = re.search(r'\d{5}', x).group() + stanox = re.search(r'\d{5}', x).group() note = re.search(r'(?<= )\w.*', x).group() + else: d = re.search(r'[\w *,]+(?= [\[(\'])', x) - data = d.group() if d is not None else x - note = 'Pseudo STANOX' if '*' in data else '' + stanox = d.group() if d is not None else x + note = 'Pseudo STANOX' if '*' in stanox else '' n = re.search(r'(?<=[\[(\'])[\w, ]+.(?=[)\]\'])', x) + if n is not None: note = '; '.join(x for x in [note, n.group()] if x != '') + if '(' not in note and note.endswith(')'): note = note.rstrip(')') - return data, note + return stanox, note + + def parse_stanox_note(self, data): + """ + Parse the note for STANOX. + + :param data: preprocessed data of the location codes + :type data: pandas.DataFrame + """ + + col_name = 'STANOX' + note_col_name = col_name + '_Note' + + if not data.empty: + parsed_dat = data[col_name].map(self._parse_stanox_note).to_list() + data[[col_name, note_col_name]] = pd.DataFrame(parsed_dat, index=data.index) + else: + # No data is available on the web page for the given 'key_word' + data[note_col_name] = data[col_name] + + data[col_name] = data[col_name].str.replace('-', '') + + def _get_additional_notes(self, data, beginning_with, soup): + if any('see note' in crs_note for crs_note in data['CRS_Note']): + loc_idx = [i for i, crs_n in enumerate(data['CRS_Note']) if 'see note' in crs_n] + + # web_page_text = bs4.BeautifulSoup(markup=source.text, features='html.parser') + + note_urls = [ + urllib.parse.urljoin(self.catalogue[beginning_with], x['href']) + for x in soup.find_all('a', href=True, string='note')] + add_notes = [_parse_note_page(note_url) for note_url in note_urls] + + additional_notes = dict(zip(data['CRS'].iloc[loc_idx], add_notes)) + else: + additional_notes = None + + return additional_notes def collect_codes_by_initial(self, initial, update=False, verbose=False): """ @@ -500,17 +772,15 @@ def collect_codes_by_initial(self, initial, update=False, verbose=False): ['A', 'Additional notes', 'Last updated date'] >>> loc_a_codes = loc_a['A'] - >>> type(loc_a_codes) pandas.core.frame.DataFrame >>> loc_a_codes.head() - Location CRS ... STANME_Note STANOX_Note - 0 Aachen ... - 1 Abbeyhill Junction ... - 2 Abbeyhill Signal E811 ... - 3 Abbeyhill Turnback Sidings ... - 4 Abbey Level Crossing (Staffordshire) ... - + Location CRS ... STANME_Note STANOX_Note + 0 A1 ... + 1 A463 Traded In ... + 2 A483 Road Scheme Supervisors Closed ... + 3 Aachen ... + 4 AA Holidays S524 ... [5 rows x 12 columns] """ @@ -542,93 +812,63 @@ def collect_codes_by_initial(self, initial, update=False, verbose=False): else: try: + # Get a raw DataFrame soup = bs4.BeautifulSoup(markup=source.content, features='html.parser') thead, tbody = soup.find('thead'), soup.find('tbody') - - column_names = [th.text for th in thead.find_all('th')] - len_of_cols = len(column_names) - list_of_rows = [[td for td in tr.find_all('td')] for tr in tbody.find_all('tr')] - - list_of_row_data = [] - for row in list_of_rows: - dat = [x.text for x in row] - list_of_row_data.append(dat[:len_of_cols] if len(row) > len_of_cols else dat) - - # Get a raw DataFrame - rep = {'\b-\b': '', '\xa0\xa0': ' ', '½': ' and 1/2'} - pat = re.compile("|".join(rep.keys())) - tbl = [[pat.sub(lambda x: rep[x.group(0)], z) for z in y] for y in list_of_row_data] - location_codes = pd.DataFrame(data=tbl, columns=column_names) - location_codes.replace({'\xa0': ''}, regex=True, inplace=True) - - # Collect additional information as note - location_codes[['Location', 'Location_Note']] = \ - location_codes.Location.map(parse_location_name).apply(pd.Series) - - # CRS, NLC, TIPLOC, STANME - drop_pattern = re.compile(r'[Ff]ormerly|[Ss]ee[ also]|Also .[\w ,]+') - idx = [ - location_codes[location_codes.CRS == x].index[0] for x in location_codes.CRS - if re.match(drop_pattern, x) - ] - location_codes.drop(labels=idx, axis=0, inplace=True) - - # Collect notes about the code columns - codes_col_names = location_codes.columns[1:-1] - location_codes[[x + '_Note' for x in codes_col_names]] = \ - location_codes[codes_col_names].applymap(self._collect_others_note) - - # Parse STANOX note - if not location_codes.empty: - location_codes[['STANOX', 'STANOX_Note']] = location_codes.STANOX.map( - self._parse_stanox_note).apply(pd.Series) - else: - # No data is available on the web page for the given 'key_word' - location_codes['STANOX_Note'] = location_codes.STANOX - - if any('see note' in crs_note for crs_note in location_codes.CRS_Note): - loc_idx = [ - i for i, crs_note in enumerate(location_codes.CRS_Note) - if 'see note' in crs_note - ] - - web_page_text = bs4.BeautifulSoup(source.text, 'html.parser') - - note_urls = [ - urllib.parse.urljoin(self.catalogue[beginning_with], x['href']) - for x in web_page_text.find_all('a', href=True, string='note') - ] - add_notes = [_parse_note_page(note_url) for note_url in note_urls] - - additional_notes = dict(zip(location_codes.CRS.iloc[loc_idx], add_notes)) - - else: - additional_notes = None - - location_codes = location_codes.replace(_amendment_to_location_names(), regex=True) - - location_codes.STANOX = location_codes.STANOX.replace({'-': ''}) - - location_codes.index = range(len(location_codes)) # Rearrange index - - last_updated_date = get_last_updated_date(url=url) - - parsed_data = { - beginning_with: location_codes, + ths = [th.text.strip() for th in thead.find_all(name='th')] + trs = tbody.find_all(name='tr') + + # column_names = [th.text for th in thead.find_all('th')] + # len_of_cols = len(column_names) + # list_of_rows = [[td for td in tr.find_all('td')] for tr in tbody.find_all('tr')] + # + # list_of_row_data = [] + # for row in list_of_rows: + # dat = [x.text for x in row] + # list_of_row_data.append(dat[:len_of_cols] if len(row) > len_of_cols else dat) + # + # rep = {'\b-\b': '', '\xa0\xa0': ' ', '½': ' and 1/2'} + # pat = re.compile("|".join(rep.keys())) + # tbl = [ + # [pat.sub(lambda x: rep[x.group(0)], z) for z in y] for y in list_of_row_data] + # data = pd.DataFrame(data=tbl, columns=column_names) + # data.replace({'\xa0': ''}, regex=True, inplace=True) + + dat = parse_tr(trs=trs, ths=ths, sep=None, as_dataframe=True) + repl = {'\xa0': '', '\b-\b': '', '\xa0\xa0': ' ', '½': ' and 1/2'} + data = dat.replace(repl, regex=True) + + # Parse location names and their corresponding notes + self.parse_location_name(data=data) + + # Cleanse multiple alternatives for every code column + data = self.cleanse_mult_alt_codes(data) + + # Get note for every code column + self.get_code_notes(data) + + # Further parse STANOX note + self.parse_stanox_note(data) + + additional_notes = self._get_additional_notes( + data=data, beginning_with=beginning_with, soup=soup) + + # data.index = range(len(data)) # Rearrange index + + location_codes_initial = { + beginning_with: data, self.KEY_TO_ADDITIONAL_NOTES: additional_notes, - self.KEY_TO_LAST_UPDATED_DATE: last_updated_date, + self.KEY_TO_LAST_UPDATED_DATE: get_last_updated_date(url=url), } - location_codes_initial.update(parsed_data) if verbose == 2: print("Done.") - os.makedirs(os.path.dirname(path_to_pickle), exist_ok=True) save_data(location_codes_initial, path_to_pickle, verbose=verbose) except Exception as e: - print("Failed. {}.".format(e)) + print(f"Failed. {e}") return location_codes_initial @@ -663,6 +903,8 @@ def _parse_code(x): return tbl + # -- Other systems ----------------------------------------------------------------------------- + def collect_other_systems_codes(self, confirmation_required=True, verbose=False): """ Collect data of `other systems' station codes`_ from source web page. @@ -761,7 +1003,7 @@ def collect_other_systems_codes(self, confirmation_required=True, verbose=False) ext=".pickle", verbose=verbose) except Exception as e: - print("Failed. {}.".format(e)) + print(f"Failed. {e}") return other_systems_codes @@ -788,7 +1030,6 @@ def fetch_other_systems_codes(self, update=False, dump_dir=None, verbose=False): >>> lid = LocationIdentifiers() >>> os_codes = lid.fetch_other_systems_codes() - >>> type(os_codes) dict >>> list(os_codes.keys()) @@ -848,18 +1089,16 @@ def fetch_codes(self, update=False, dump_dir=None, verbose=False): >>> lid.KEY 'LocationID' - >>> loc_codes_dat = loc_codes['LocationID'] - + >>> loc_codes_dat = loc_codes[lid.KEY] >>> type(loc_codes_dat) pandas.core.frame.DataFrame >>> loc_codes_dat.head() - Location CRS ... STANME_Note STANOX_Note - 0 Aachen ... - 1 Abbeyhill Junction ... - 2 Abbeyhill Signal E811 ... - 3 Abbeyhill Turnback Sidings ... - 4 Abbey Level Crossing (Staffordshire) ... - + Location CRS ... STANME_Note STANOX_Note + 0 A1 ... + 1 A463 Traded In ... + 2 A483 Road Scheme Supervisors Closed ... + 3 Aachen ... + 4 AA Holidays S524 ... [5 rows x 12 columns] """ @@ -994,7 +1233,7 @@ def make_xref_dict(self, keys, initials=None, main_key=None, as_dict=False, drop """ valid_keys = {'CRS', 'NLC', 'TIPLOC', 'STANOX', 'STANME'} - assert_msg = "`keys` must be one of {}.".format(valid_keys) + assert_msg = f"`keys` must be one of {valid_keys}." if isinstance(keys, str): assert keys in valid_keys, assert_msg @@ -1063,7 +1302,7 @@ def make_xref_dict(self, keys, initials=None, main_key=None, as_dict=False, drop dump_dir=dump_dir_, verbose=verbose) except Exception as e: - print("Failed. {}.".format(e)) + print(f"Failed. {e}") location_codes_dictionary = None return location_codes_dictionary