Skip to content

Commit

Permalink
Enhance classes with fixes for potential bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeqfu committed Sep 15, 2023
1 parent 806665b commit df087f2
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 387 deletions.
307 changes: 148 additions & 159 deletions pyrcs/line_data/elec.py
Expand Up @@ -21,165 +21,10 @@
init_data_dir, print_collect_msg, print_conn_err, print_inst_conn_err, save_data_to_file


def _collect_notes(h3):
notes_ = []

next_p = h3.find_next('p')
if next_p is not None:
h3_ = next_p.find_previous('h3')
while h3_ == h3:
notes_2 = get_hypertext(hypertext_tag=next_p, hyperlink_tag_name='a')
if notes_2:
notes_2 = notes_2. \
replace(' Section codes known at present are:', ''). \
replace('Known prefixes are:', ' ')
notes_.append(notes_2)

next_p = next_p.find_next('p')
if next_p is None:
break
else:
h3_ = next_p.find_previous('h3')

notes = ' '.join(notes_).replace(' ', ' ')

if not notes:
notes = None

# if isinstance(notes, list):
# notes = ' '.join(notes)

# ex_note_tag = note_tag.find_next('ol')
# if ex_note_tag:
# previous_h3 = ex_note_tag.find_previous('h3')
# if previous_h3 == h3:
# dat = list(
# re.sub(r'[()]', '', x.text).split(' ', 1) for x in ex_note_tag.find_all('li'))
# li = pd.DataFrame(data=dat, columns=['Initial', 'Code'])
# notes = [notes, li]

return notes


def _collect_codes_without_list(h3):
"""
h3 = h3.find_next('h3')
"""
sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')

notes = _collect_notes(h3=h3)

data = {sub_heading: {'Codes': None, 'Notes': notes}}

return data


def _collect_list_only(h3, ul):
sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')

list_data = [get_hypertext(x) for x in ul.findChildren('li')]
notes = _collect_notes(h3).strip().replace(' were:', '.')

codes_with_list = {sub_heading: {'Codes': list_data, 'Notes': notes}}

return codes_with_list


def _collect_codes_with_list(h3, ul):
"""
'Blackpool Tramway',
"""

sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')

data_1_key, data_2_key = 'Section codes', 'Known prefixes'
table_1, table_2 = {data_1_key: None}, {data_2_key: None}

# data_1
table_1[data_1_key] = pd.DataFrame(
data=[re.sub(r'[()]', '', x.text).split(' ', 1) for x in ul.findChildren('li')],
columns=['Code', 'Area'])

# data_2
thead, tbody = h3.find_next('thead'), h3.find_next('tbody')
ths, trs = [th.text for th in thead.find_all('th')], tbody.find_all('tr')
dat_2 = parse_tr(trs=trs, ths=ths, as_dataframe=True)

table_2[data_2_key] = dat_2

# Notes
notes = _collect_notes(h3)

codes_with_list = {sub_heading: {**table_1, **table_2, 'Notes': notes}}

return codes_with_list


def _collect_codes_and_notes(h3):
"""
elec = Electrification()
url = elec.catalogue[elec.KEY_TO_INDEPENDENT_LINES]
source = requests.get(url=url, headers=fake_requests_headers())
soup = bs4.BeautifulSoup(markup=source.content, features='html.parser')
h3 = soup.find('h3')
h3 = h3.find_next('h3')
"""

codes_and_notes = None

_, ul, table = h3.find_next(name='p'), h3.find_next(name='ul'), h3.find_next(name='table')

if ul is not None:
if ul.find_previous('h3') == h3:
if table is not None:
codes_and_notes = _collect_codes_with_list(h3=h3, ul=ul)
else:
codes_and_notes = _collect_list_only(h3=h3, ul=ul)

if table is not None and codes_and_notes is None:
if table.find_previous(name='h3') == h3:
h3_ = table.find_previous(name='h3')
codes = None
thead, tbody = table.find_next(name='thead'), table.find_next(name='tbody')

while h3_ == h3:
ths, trs = [x.text for x in thead.find_all('th')], tbody.find_all('tr')
dat = parse_tr(trs=trs, ths=ths, as_dataframe=True)
codes_ = dat.applymap(
lambda x: re.sub(
pattern=r'\']\)?', repl=']',
string=re.sub(r'\(?\[\'', '[', x)).replace(
'\\xa0', '').replace('\r ', ' ').strip())

codes = codes_ if codes is None else [codes, codes_]

thead, tbody = thead.find_next(name='thead'), tbody.find_next(name='tbody')

if tbody is None:
break
else:
h3_ = tbody.find_previous(name='h3')

notes = _collect_notes(h3=h3)

sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')
codes_and_notes = {sub_heading: {'Codes': codes, 'Notes': notes}}

if codes_and_notes is None:
codes_and_notes = _collect_codes_without_list(h3=h3)

return codes_and_notes


class Electrification:
"""
A class for collecting `section codes for overhead line electrification (OLE) installations`_.
.. _`section codes for overhead line electrification (OLE) installations`:
http://www.railwaycodes.org.uk/electrification/mast_prefix0.shtm
A class for collecting `section codes for overhead line electrification (OLE) installations
<http://www.railwaycodes.org.uk/electrification/mast_prefix0.shtm>`_.
"""

#: Name of the data
Expand Down Expand Up @@ -265,13 +110,157 @@ def _cfm_msg(key):
return cfm_msg

@staticmethod
def _collect_data(source):
def _collect_notes(h3):
notes_ = []

next_p = h3.find_next('p')
if next_p is not None:
h3_ = next_p.find_previous('h3')
while h3_ == h3:
notes_2 = get_hypertext(hypertext_tag=next_p, hyperlink_tag_name='a')
if notes_2:
notes_2 = notes_2. \
replace(' Section codes known at present are:', ''). \
replace('Known prefixes are:', ' ')
notes_.append(notes_2)

next_p = next_p.find_next('p')
if next_p is None:
break
else:
h3_ = next_p.find_previous('h3')

notes = ' '.join(notes_).replace(' ', ' ')

if not notes:
notes = None

# if isinstance(notes, list):
# notes = ' '.join(notes)

# ex_note_tag = note_tag.find_next('ol')
# if ex_note_tag:
# previous_h3 = ex_note_tag.find_previous('h3')
# if previous_h3 == h3:
# dat = list(
# re.sub(r'[()]', '', x.text).split(' ', 1) for x in ex_note_tag.find_all('li'))
# li = pd.DataFrame(data=dat, columns=['Initial', 'Code'])
# notes = [notes, li]

return notes

def _collect_codes_without_list(self, h3):
"""
h3 = h3.find_next('h3')
"""
sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')

notes = self._collect_notes(h3=h3)

data = {sub_heading: {'Codes': None, 'Notes': notes}}

return data

def _collect_list_only(self, h3, ul):
sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')

list_data = [get_hypertext(x) for x in ul.findChildren('li')]
notes = self._collect_notes(h3).strip().replace(' were:', '.')

codes_with_list = {sub_heading: {'Codes': list_data, 'Notes': notes}}

return codes_with_list

def _collect_codes_with_list(self, h3, ul):
sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')

data_1_key, data_2_key = 'Section codes', 'Known prefixes'
table_1, table_2 = {data_1_key: None}, {data_2_key: None}

# data_1
table_1[data_1_key] = pd.DataFrame(
data=[re.sub(r'[()]', '', x.text).split(' ', 1) for x in ul.findChildren('li')],
columns=['Code', 'Area'])

# data_2
thead, tbody = h3.find_next('thead'), h3.find_next('tbody')
ths, trs = [th.text for th in thead.find_all('th')], tbody.find_all('tr')
dat_2 = parse_tr(trs=trs, ths=ths, as_dataframe=True)

table_2[data_2_key] = dat_2

# Notes
notes = self._collect_notes(h3)

codes_with_list = {sub_heading: {**table_1, **table_2, 'Notes': notes}}

return codes_with_list

def _collect_codes_and_notes(self, h3):
"""
elec = Electrification()
url = elec.catalogue[elec.KEY_TO_INDEPENDENT_LINES]
source = requests.get(url=url, headers=fake_requests_headers())
soup = bs4.BeautifulSoup(markup=source.content, features='html.parser')
h3 = soup.find('h3')
h3 = h3.find_next('h3')
"""

codes_and_notes = None

_, ul, table = h3.find_next(name='p'), h3.find_next(name='ul'), h3.find_next(name='table')

if ul is not None:
if ul.find_previous('h3') == h3:
if table is not None:
codes_and_notes = self._collect_codes_with_list(h3=h3, ul=ul)
else:
codes_and_notes = self._collect_list_only(h3=h3, ul=ul)

if table is not None and codes_and_notes is None:
if table.find_previous(name='h3') == h3:
h3_ = table.find_previous(name='h3')
codes = None
thead, tbody = table.find_next(name='thead'), table.find_next(name='tbody')

while h3_ == h3:
ths, trs = [x.text for x in thead.find_all('th')], tbody.find_all('tr')
dat = parse_tr(trs=trs, ths=ths, as_dataframe=True)
codes_ = dat.map(
lambda x: re.sub(
pattern=r'\']\)?', repl=']',
string=re.sub(r'\(?\[\'', '[', x)).replace(
'\\xa0', '').replace('\r ', ' ').strip())

codes = codes_ if codes is None else [codes, codes_]

thead, tbody = thead.find_next(name='thead'), tbody.find_next(name='tbody')

if tbody is None:
break
else:
h3_ = tbody.find_previous(name='h3')

notes = self._collect_notes(h3=h3)

sub_heading = get_heading_text(heading_tag=h3, elem_tag_name='em')
codes_and_notes = {sub_heading: {'Codes': codes, 'Notes': notes}}

if codes_and_notes is None:
codes_and_notes = self._collect_codes_without_list(h3=h3)

return codes_and_notes

def _collect_data(self, source):
soup = bs4.BeautifulSoup(markup=source.content, features='html.parser')

data = {}
h3 = soup.find('h3')
while h3:
data_ = _collect_codes_and_notes(h3=h3)
data_ = self._collect_codes_and_notes(h3=h3)

if data_ is not None:
data.update(data_)
Expand Down

0 comments on commit df087f2

Please sign in to comment.