<a href="https://colab.research.google.com/github/rumen-cholakov/SemanticWeb/blob/master/grao_tables_parsing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Imports

In [0]:
import pandas as pd
import requests
import pickle
import regex
import enum
import os

from typing import TypeVar, Callable, Sequence, List, Optional, Tuple
from collections import namedtuple
from bs4 import BeautifulSoup
from functools import reduce

## Type Declarations

In [0]:
class HeaderEnum(enum.Enum):
  Old = 0
  New = 1

class TableTypeEnum(enum.Enum):
  Qarterly = 0
  Yearly = 1

DataTuple = namedtuple('DataTuple', 'data header_type table_type')
MunicipalityIdentifier = namedtuple('MunicipalityIdentifier', 'region municipality')
SettlementInfo = namedtuple('SettlementInfo', 'name permanent_residents current_residents')
FullSettlementInfo = namedtuple('FullSettlementInfo', 'region municipality settlement permanent_residents current_residents')
PopulationInfo = namedtuple('PopulationInfo', 'permanent current')
ParsedLines = namedtuple('ParsedLines', 'municipality_ids settlements_info')

T = TypeVar('T')

## Definition of Data Source

In [0]:
data_source: List[DataTuple] = [
  DataTuple("https://www.grao.bg/tna/t41nm-15-03-2020_2.txt", HeaderEnum.New, TableTypeEnum.Qarterly), 
  DataTuple("https://www.grao.bg/tna/tadr2019.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr2018.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr2017.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2016.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2015.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2014.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2013.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2012.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2011.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2010.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2009.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2008.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2007.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2006.txt", HeaderEnum.New, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2005.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2004.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2003.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2002.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2001.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-2000.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-1999.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
  DataTuple("https://www.grao.bg/tna/tadr-1998.txt", HeaderEnum.Old, TableTypeEnum.Yearly), 
]

## Regular Expresions Construction

In [4]:
# Building regex strings
cap_letter = '\p{Lu}'
low_letter = '\p{Ll}'
separator = '[\||\!]\s*'
number = '\d+'

year_group = '(\d{4})'

name_part = f'\s*{cap_letter}*'
name_part_old = f'\s{cap_letter}*'
type_abbr = f'{cap_letter}+\.'
name = f'{cap_letter}+{name_part * 3}'
name_old = f'{cap_letter}+{name_part_old * 3}'
word = f'{low_letter}+'
number_group = f'{separator}({number})\s*'

old_reg = f'ОБЛАСТ:({name_old})'
print(old_reg)
old_mun = f'ОБЩИНА:({name_old})'
print(old_mun)

region_name_new_re = f'{word} ({name}) {word} ({name})'
print(region_name_new_re)
# Quaterly
settlement_info_quarterly_re = f'({type_abbr}{name})\s*{number_group * 3}'
print(settlement_info_quarterly_re)
# Yearly
settlement_info_yearly_re = f'({type_abbr}{name})\s*{number_group * 6}'
print(settlement_info_yearly_re)

ОБЛАСТ:(\p{Lu}+\s\p{Lu}*\s\p{Lu}*\s\p{Lu}*)
ОБЩИНА:(\p{Lu}+\s\p{Lu}*\s\p{Lu}*\s\p{Lu}*)
\p{Ll}+ (\p{Lu}+\s*\p{Lu}*\s*\p{Lu}*\s*\p{Lu}*) \p{Ll}+ (\p{Lu}+\s*\p{Lu}*\s*\p{Lu}*\s*\p{Lu}*)
(\p{Lu}+\.\p{Lu}+\s*\p{Lu}*\s*\p{Lu}*\s*\p{Lu}*)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*
(\p{Lu}+\.\p{Lu}+\s*\p{Lu}*\s*\p{Lu}*\s*\p{Lu}*)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*[\||\!]\s*(\d+)\s*


## Helper Functions

In [0]:
def pipeline(
        value: T,
        function_pipeline: Sequence[Callable[[T], T]],
) -> T:
    '''A generic Unix-like pipeline

    :param value: the value you want to pass through a pipeline
    :param function_pipeline: an ordered list of functions that
        comprise your pipeline
    '''
    return reduce(lambda v, f: f(v), function_pipeline, value)

def build_pipline(functions: Sequence[Callable[[T], T]]) -> Callable[[T], T]:
  return (lambda value: pipeline(value, function_pipeline=functions))

def execute_pipeline(value: T, pipeline: Callable[[T], T]) -> T:
  return pipeline(value)



def static_vars_funktion(**kwargs):
  def decorate(func):
      for k in kwargs:
          setattr(func, k, kwargs[k])
      return func
  return decorate

## Parsing Pipeline Definitions

In [0]:
def fetch_raw_table(data_tuple: DataTuple) -> DataTuple:
  headers = requests.utils.default_headers()
  headers.update({ 
      'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0'
  })

  url = data_tuple.data
  req = requests.get(url, headers)
  req.encoding = 'windows-1251'

  return DataTuple(req, data_tuple.header_type, data_tuple.table_type)

def raw_table_to_lines(data_tuple: DataTuple) -> DataTuple:
  req = data_tuple.data
  soup = BeautifulSoup(req.text, 'lxml').prettify()
  split = soup.split('\r\n')

  return DataTuple(split, data_tuple.header_type, data_tuple.table_type)

def parse_lines(data_tuple: DataTuple) -> DataTuple:

  def parse_data_line(line: str, table_type: TableTypeEnum) -> Optional[SettlementInfo]:
    settlement_info_re = ''
    permanent_population_position = -1
    current_population_position = -1

    if table_type == TableTypeEnum.Qarterly:
      settlement_info_re = settlement_info_quarterly_re
      permanent_population_position = 2
      current_population_position = 3
    elif table_type == TableTypeEnum.Yearly:
      settlement_info_re = settlement_info_yearly_re
      permanent_population_position = 2
      current_population_position = 6

    settlement_info = regex.search(settlement_info_re, line)

    if settlement_info:
      name, permanent, current = settlement_info.group(1, 
                                                       permanent_population_position, 
                                                       current_population_position)
      settlement_info = SettlementInfo(name.strip(), permanent, current)

    return settlement_info

  @static_vars_funktion(region=None)
  def parse_header_line(line: str, header_type: HeaderEnum) -> Optional[MunicipalityIdentifier]:
    region_name = None

    if header_type == HeaderEnum.New:
      region_name_re = region_name_new_re
      region_gr = regex.search(region_name_re, line)

      if region_gr:
        region, municipality = region_gr.group(1, 2)
        region_name = MunicipalityIdentifier(region.strip(), municipality.strip())

    elif header_type == HeaderEnum.Old:
      if not parse_header_line.region:
        parse_header_line.region = regex.search(old_reg, line)
        region_name = None
      else:
        mun_gr = regex.search(old_mun, line)

        if mun_gr:
          region, municipality = parse_header_line.region.group(1), mun_gr.group(1)
          region_name = MunicipalityIdentifier(region.strip(), municipality.strip())

          parse_header_line.region = None

    return region_name

  municipality_ids = {}
  settlements_info = {}  

  for line_num, line in enumerate(data_tuple.data):
    municipality_id = parse_header_line(line, data_tuple.header_type)
    if municipality_id:
      municipality_ids[line_num] = municipality_id

    settlement_info = parse_data_line(line, data_tuple.table_type)
    if settlement_info:
      settlements_info[line_num] = settlement_info

  return DataTuple(ParsedLines(municipality_ids, settlements_info), data_tuple.header_type, data_tuple.table_type)

def parssed_lines_to_full_info_list(data_tuple: DataTuple) -> DataTuple:
  regions = data_tuple.data.municipality_ids
  settlements_info = data_tuple.data.settlements_info

  reg_keys = list(regions.keys())
  settlement_keys = list(settlements_info.keys())
  
  reg_keys_pairs = zip(reg_keys[:-1], reg_keys[1:])

  sk_index = 0
  full_name_settlement_infos = []

  for current_mun, next_mun in reg_keys_pairs:
    while current_mun < settlement_keys[sk_index] < next_mun:
      reg = regions[current_mun]
      set_info = settlements_info[settlement_keys[sk_index]]
      fnsi = FullSettlementInfo(reg.region,
                                reg.municipality,
                                set_info.name,
                                set_info.permanent_residents,
                                set_info.current_residents)
      full_name_settlement_infos.append(fnsi)

      sk_index += 1

  return DataTuple(full_name_settlement_infos, data_tuple.header_type, data_tuple.table_type)

def full_info_list_to_data_frame(data_tuple: DataTuple) -> DataTuple:
  df = pd.DataFrame(data_tuple.data)
  df.set_index(['region', 'municipality', 'settlement'], drop=True, inplace=True)

  return DataTuple(df, data_tuple.header_type, data_tuple.table_type)

parsing_pipeline = build_pipline(functions=(
  fetch_raw_table,
  raw_table_to_lines,
  parse_lines,
  parssed_lines_to_full_info_list,
  full_info_list_to_data_frame
))

## Data Processing Pipeline

In [0]:
def process_data(data_source: List[DataTuple]) -> List[DataTuple]:
  parsed_data = None
  data_frame_list = []

  for data_tuple in data_source:
    year = regex.search(year_group, data_tuple.data).group(1)
    data_frame = execute_pipeline(data_tuple, parsing_pipeline).data
    data_frame = data_frame.rename(columns={'permanent_residents':f'permanent_{year}', 
                                            'current_residents':f'current_{year}'})
    
    data_frame_list.append(data_frame)
    if isinstance(parsed_data, pd.DataFrame):
      parsed_data = parsed_data.merge(data_frame, sort=False, how='right', left_index=True, right_index=True)
    else:
      parsed_data = data_frame

  return [DataTuple(parsed_data,0,0), DataTuple(data_frame_list,0,0)]

def store_data(processed_data: List[DataTuple]) -> List[DataTuple]:
  directory = './grao'
  if not os.path.exists(directory):
    os.makedirs(directory)

  combined_data = processed_data[0].data
  combined_data.to_csv(f'{directory}/combined_data.csv')
  combined_data.to_pickle(f'{directory}/combined_data.pkl')

  data_list = processed_data[1].data
  with open(f'{directory}/data_frames_list.pkl', 'wb') as f:
    pickle.dump(data_list, f)

  return processed_data

processing_pipeline = build_pipline(functions=(
    process_data,
    store_data
))

## Data Processing

In [0]:
processed_data = execute_pipeline(data_source, processing_pipeline)
processed_data

In [0]:
!sed 's/,,/, ,/g;s/,,/, ,/g' ./grao/combined_data.csv | column -s, -t