## This Notebook scrapes company data from the Bundesanzeiger / Federal Gazette

In [1]:
#only for jupyter
import nest_asyncio
nest_asyncio.apply()
#AWS native
import asyncio
import csv
import datetime
import json
from concurrent.futures import ThreadPoolExecutor
import time
from io import BytesIO
import dateparser
import numpy as np
import requests
from bs4 import BeautifulSoup
import hashlib
import json
import deutschland.bundesanzeiger.model
from deutschland.config import Config, module_config
from validateND import validate_north_data



In [2]:
# *****************************************************************************/
# *    The code is a modification of existing code of the bundesanzeiger function from the deutschland package
# *    Authors: Nico Duldhardt and Friedrich Schöne
# *    Date: 2020
# *    Availability: https://github.com/bundesAPI/deutschland / https://av.tib.eu/media/52366
# ****************************************************************************/

class Report:
    __slots__ = ["date", "name", "content_url", "company", "report"]

    def __init__(self, date, name, content_url, company, report=None):
        self.date = date
        self.name = name
        self.content_url = content_url
        self.company = company
        self.report = report

    def to_dict(self):
        return {
            "date": self.date,
            "name": self.name,
            "company": self.company,
            "report": self.report,
        }

    def to_hash(self):
            """MD5 hash of a the report."""

            dhash = hashlib.md5()

            entry = {
                "date": self.date.isoformat(),
                "name": self.name,
                "company": self.company,
                "report": self.report,
            }

            encoded = json.dumps(entry, sort_keys=True).encode('utf-8')
            dhash.update(encoded)

            return dhash.hexdigest()


class Bundesanzeiger:
    __slots__ = ["session", "model", "captcha_callback", "_config"]

    def __init__(self, on_captach_callback=None, config: Config = None):
        if config is None:
            self._config = module_config
        else:
            self._config = config

        self.session = requests.Session()
        if self._config.proxy_config is not None:
            self.session.proxies.update(self._config.proxy_config)
        if on_captach_callback:
            self.callback = on_captach_callback
        else:
            import deutschland.bundesanzeiger.model

            self.model = deutschland.bundesanzeiger.model.load_model()
            self.captcha_callback = self.__solve_captcha

    def __solve_captcha(self, image_data: bytes):

        image = BytesIO(image_data)
        image_arr = deutschland.bundesanzeiger.model.load_image_arr(image)
        image_arr = image_arr.reshape((1, 50, 250, 1)).astype(np.float32)

        prediction = self.model.run(None, {"captcha": image_arr})[0][0]
        prediction_str = deutschland.bundesanzeiger.model.prediction_to_str(prediction)

        return prediction_str

    def __is_captcha_needed(self, entry_content: str):
        soup = BeautifulSoup(entry_content, "html.parser")
        return not bool(soup.find("div", {"class": "publication_container"}))

    def __find_all_entries_on_page(self, page_content: str, search_name: str):
        soup = BeautifulSoup(page_content, "html.parser")
        wrapper = soup.find("div", {"class": "result_container"})
        rows = wrapper.find_all("div", {"class": "row"})
        for row in rows:
            info_element = row.find("div", {"class": "info"})

            if not info_element:
                continue

            link_element = info_element.find("a")
            if not link_element:
                continue

            entry_link = link_element.get("href")
            entry_name = link_element.contents[0].strip()

            date_element = row.find("div", {"class": "date"})
            if not date_element:
                continue

            date = dateparser.parse(date_element.contents[0], languages=["de"])

            company_name_element = row.find("div", {"class": "first"})
            if not date_element:
                continue

            company_name = company_name_element.contents[0].strip()

            if not company_name.lower() == search_name.lower():  # match report name with the search name 
                continue

            if not date >= datetime.datetime(2019, 1, 1, 0, 0):  # only store reports that were uploaded after 2018 
                continue

            yield Report(date, entry_name, entry_link, company_name)

    def __generate_result(self, content: str, company_name: str):
        """iterate trough all results and try to fetch single reports"""
        result = {}
        for element in self.__find_all_entries_on_page(content, company_name):
            get_element_response = self.session.get(element.content_url)
            if self.__is_captcha_needed(get_element_response.text):
                soup = BeautifulSoup(get_element_response.text, "html.parser")
                captcha_image_src = soup.find("div", {"class": "captcha_wrapper"}).find(
                    "img"
                )["src"]
                img_response = self.session.get(captcha_image_src)
                captcha_result = self.captcha_callback(img_response.content)
                captcha_endpoint_url = soup.find_all("form")[1]["action"]
                get_element_response = self.session.post(
                    captcha_endpoint_url,
                    data={"solution": captcha_result, "confirm-button": "OK"},
                )

            content_soup = BeautifulSoup(get_element_response.text, "html.parser")
            content_element = content_soup.find(
                "div", {"class": "publication_container"}
            )

            if not content_element:
                continue

            element.report = str(content_element)

            result[element.name] = element.to_dict()


        return result

    def get_reports(self, company_name: str):
        """
        fetch all reports for this company name
        :param company_name:
        :return" : "Dict of all reports
        """
        self.session.cookies["cc"] = "1628606977-805e172265bfdbde-10"
        self.session.headers.update(
            {
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
                "Accept-Encoding": "gzip, deflate, br",
                "Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7,et;q=0.6,pl;q=0.5",
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "DNT": "1",
                "Host": "www.bundesanzeiger.de",
                "Pragma": "no-cache",
                "Referer": "https://www.bundesanzeiger.de/",
                "sec-ch-ua-mobile": "?0",
                "Sec-Fetch-Dest": "document",
                "Sec-Fetch-Mode": "navigate",
                "Sec-Fetch-Site": "same-origin",
                "Sec-Fetch-User": "?1",
                "Upgrade-Insecure-Requests": "1",
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36",
                "From": "217614@mds.hertie-school.org"
            }
        )
        # get the jsessionid cookie
        response = self.session.get("https://www.bundesanzeiger.de")
        # go to the start page
        response = self.session.get("https://www.bundesanzeiger.de/pub/de/start?0")
        # perform the search
        response = self.session.get(
            f"https://www.bundesanzeiger.de/pub/de/start?0-2.-top%7Econtent%7Epanel-left%7Ecard-form=&fulltext={company_name}&area_select=22&search_button=Suchen"
        )
        return self.__generate_result(response.text, company_name)


In [4]:
class DateTimeEncoder(json.JSONEncoder):
        #Override the default method
        def default(self, obj):
            if isinstance(obj, (datetime.date, datetime.datetime)):
                return obj.isoformat()
            
def save_data(data,item): # store reports for each company as JSON
    with open("data/financial_information/"+item+".json", "w",encoding='UTF-8') as write_file:
        json.dump(data, write_file, indent=4, cls=DateTimeEncoder)

def scrape(item):
    if item[3]=='': # validation that item hasn't bee scraped yet
        ba = Bundesanzeiger()
        try:
            data = ba.get_reports(item[1]) # call scrape function with disclosed name
            if data:
                save_data(data,item[0])
                return 

        except Exception as e:            
            pass  

        try:
            name = validate_north_data(item[1],item[2]) # Try to validate company name externally with register number
            if name:
                ba2 = Bundesanzeiger()
                data = ba2.get_reports(str(name)) # call scrape function with validated name
                if data:
                    save_data(data,item[0])
                    return
                
        except Exception as e:            
            pass



async def get_data_asynchronous():
    start_time = time.time()
    tasks = []
    
    with open('data/main/companies.csv') as file:    #Files contain all beneficiaries of government aid
        csv_reader = list(csv.reader(file, delimiter='$'))[0:10] ## stopping after 10 companies (for test runs)

        with ThreadPoolExecutor(max_workers=3) as executor:  ## set workers asynchronous calls

            loop = asyncio.get_event_loop()
            tasks = [
                loop.run_in_executor(executor,scrape,item)
            
                for item in csv_reader]     # the item contains: 0.) index from all aid beneficiaries; 1.) search name; 2.) register number of company; 3.) status to prevent repetitions
            for response in await asyncio.gather(*tasks):
                pass

    time_difference = time.time() - start_time
    print(f'Scraping time: %.2f seconds.' % time_difference)

def main(): 
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main() # "main" function starts event loop for asynchronous calls

Scraping time: 3340.04 seconds.
