Направивме споредба помеѓу двете верзии на податоците (англиската и
македонската) и воочивме дека нема никаква разлика помеѓу собраните податоци.

In [None]:
!pip install aiohttp
!pip install pandas
!pip install lxml

In [53]:
import re
import random
import aiohttp
import asyncio
import pandas as pd
from lxml import etree
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

In [54]:
class Pipeline(object):
  def __init__(self):
    self.filters = []

  def connect(self, filter):
    self.filters.append(filter)
    return self

  async def combine_data(self, session, url, data):
    return await self.filters[-1].process(session, url, data)

  async def process_data(self, session, url, data):
    for filter in self.filters[1:-1]:
      data = await filter.process(session, url, data)
    return data

  async def execute(self, url, issuer_urls):
      final_data = []
      async with aiohttp.ClientSession() as session:
        start_data = await self.filters[0].process(session, issuer_urls, [])
        processing_tasks = [self.process_data(session, url, data) for data in start_data]
        results = await asyncio.gather(*processing_tasks)

        final_data = await self.combine_data(session, url, results)

      return final_data

In [55]:
class Filter(object):
  async def process(self, session, url, data):
    raise NotImplementedError()

In [56]:
class ValidIssuersFilter(Filter):

  async def get_issuers(self, session, urls):
    issuers = []
    async def fetch_issuers(url):
      async with session.get(url) as response:
        response.raise_for_status()
        data = await response.text()
        tree = etree.HTML(data)
        options = tree.xpath("//tbody//tr//td//a")
        return [option.text.strip() for option in options if option.text.strip()]

    results = await asyncio.gather(*[fetch_issuers(url) for url in urls])

    for result in results:
      issuers.extend(result)
    return issuers


  async def process(self, session, url, data):
    issuers = await self.get_issuers(session, url)
    pattern = re.compile(r"^[^\d]*$")
    return list(set([issuer for issuer in issuers if pattern.search(issuer)]))


In [57]:
class IssuerDatesFilter(Filter):
  def __init__(self):
    self.prev_data = load_data()

  def get_latest_date(self, df, issuer):
    issuer_data = df[df['Issuer'] == issuer]
    dates = pd.to_datetime(issuer_data['Date'], format='%m/%d/%Y', errors='coerce')
    return dates.max() if not issuer_data.empty else (pd.Timestamp.now() - pd.DateOffset(years=10)).normalize()

  def generate_date_pairs(self, latest_date):
    now = pd.Timestamp.today().normalize()
    pairs = []

    start_date = latest_date + pd.Timedelta(days=1)

    while start_date <= now:
      end_date = min(start_date + pd.Timedelta(days=364), now)
      pairs.append((start_date, end_date))
      start_date = end_date + pd.Timedelta(days=1)
    return pairs

  async def process(self, session, url, data):
    issuer_date_pairs = {}
    ten_years_ago = (pd.Timestamp.now() - pd.DateOffset(years=10)).normalize()
    if self.prev_data.empty or data not in self.prev_data['Issuer'].unique():
      latest_date = ten_years_ago
    else:
      latest_date = self.get_latest_date(self.prev_data, data)

    date_pairs = self.generate_date_pairs(latest_date)
    issuer_date_pairs[data] = date_pairs
    return issuer_date_pairs

In [58]:
class FillInIssuerDataFilter(Filter):

  def __init__(self):
    self.semaphore = asyncio.Semaphore(30)

  def parse_row(self, row, issuer):
    data = [ele.strip() for ele in row]
    if data:
      data.insert(0, issuer)
    return data

  def parse_table(self, html_content, issuer):
    tree = etree.HTML(html_content)
    rows = tree.xpath("//tbody//tr")
    return [
        self.parse_row([ele.text.strip() if ele.text else '' for ele in row.xpath(".//td")], issuer)
        for row in rows
    ]


  async def fetch_data(self, session, url, data):
    timeout = aiohttp.ClientTimeout(total=20)
    retries = 8

    for attempt in range(retries):
      try:
        async with session.post(url, data=data, timeout=timeout) as response:
          if response.status == 200:
            return await response.text()
          else:
            return None
      except asyncio.TimeoutError:
        pass
      except aiohttp.ClientError as e:
        pass
      await asyncio.sleep(2 ** attempt + random.uniform(0, 1))
    return None


  async def download_data(self, session, issuer, from_date, to_date):
    url = 'https://www.mse.mk/en/stats/symbolhistory/' + issuer
    payload = {
      'Code': issuer,
      'FromDate': from_date,
      'ToDate': to_date
    }
    async with self.semaphore:
      response = await self.fetch_data(session, url, payload)
      if response is None:
        return None

    return response


  async def download_latest_issuer_data(self, session, url, issuer, date_pairs):
    tasks = [self.download_data(session, issuer, from_date, to_date) for from_date, to_date in date_pairs]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    valid_results = [result for result in results if result is not None]

    collected_rows = []
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor() as executor:
      parsing_tasks = [
        loop.run_in_executor(executor, self.parse_table, result, issuer)
        for result in valid_results if result
      ]
      parsed_tables = await asyncio.gather(*parsing_tasks)

    for data in parsed_tables:
      if data:
        collected_rows.extend(data)

    return collected_rows


  async def process(self, session, url, data):
    issuer = list(data.keys())[0]
    res = await self.download_latest_issuer_data(session, url, issuer, data[issuer])
    return res

In [65]:
class CombineAndSaveFilter(Filter):
  def __init__(self, file_path):
    self.file_path = file_path

  def parse_col_names(self, tree):
    col_names = ["Issuer"] + [th.text.strip() for th in tree.xpath("//th") if th.text]
    return col_names

  async def get_col_names(self, session, url):
    async with session.get(url) as response:
      response.raise_for_status()
      data = await response.text()
      parser = etree.HTMLParser()
      tree = etree.fromstring(data, parser)
      return self.parse_col_names(tree)


  async def process(self, session, url, data):
    col_names = await self.get_col_names(session, url)

    prev_data = load_data(self.file_path)

    data = [d for d in data if d is not None]
    data = [sublist for group in data for sublist in group]

    new_data = pd.DataFrame(data, columns=col_names) if data else pd.DataFrame(columns=col_names)
    new_data = new_data.dropna(axis=1, how='all')

    combined_data = pd.concat([prev_data, new_data], ignore_index=True) \
      .drop_duplicates(subset=["Issuer", "Date"], keep="last")

    combined_data.iloc[:, 2:] = combined_data.iloc[:, 2:].apply(lambda x:
      x.where(x.isna(), x.astype(str))
      .str.replace(r'\.', '_', regex=True) \
      .str.replace(',', '.', regex=True) \
      .str.replace('_', ',', regex=True) \
    )

    combined_data.to_csv("issuers_data_en.csv", index=False, chunksize=5000)


In [66]:
def load_data(file_path="issuers_data_en.csv"):
  try:
    data = pd.read_csv(file_path, chunksize=5000)
    issuers_data = pd.concat(data, ignore_index=True)
  except:
    issuers_data = pd.DataFrame()
  return issuers_data

In [67]:
async def main():
  file_path = "issuers_data_en.csv"
  url = "https://www.mse.mk/en/stats/symbolhistory/ADIN"
  issuer_urls = ["https://www.mse.mk/en/stats/current-schedule#results-continuousTradingMode", "https://www.mse.mk/en/stats/current-schedule#results-fixingWith20PercentLimit", "https://www.mse.mk/en/stats/current-schedule#results-fixingWithoutLimit"]

  pipeline = Pipeline()
  pipeline.connect(ValidIssuersFilter()) \
    .connect(IssuerDatesFilter()) \
    .connect(FillInIssuerDataFilter()) \
    .connect(CombineAndSaveFilter(file_path=file_path))

  start_time = datetime.now()
  await pipeline.execute(url, issuer_urls)
  end_time = datetime.now()

  elapsed_time = end_time - start_time
  print(f"Total execution time: {elapsed_time}")

In [None]:
await main()