Scraping with requests + beautiful soup (we can't procede with this method since the webpage is dynamic and we can't fetch the main content)

In [20]:
import requests
from bs4 import BeautifulSoup

url = 'https://www.parliament.bg/bg/plenaryst/ns/55/ID/10940'
cert_path = r"C:\Users\ivank\Desktop\Parliament certs\parliament.pem"
headers = {'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'} 


response = requests.get(url, verify=cert_path, headers = headers)
#soup = BeautifulSoup(response.text, 'html')


print(response.status_code)

200


Scraping with selenium

Importing Packages

In [2]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import re
import pandas as pd

Create a list of URLs from the range between two urls

In [3]:
def url_list(url1,url2):

    urls = []
    url1_num = int(url1.split('/')[8])
    url2_num = int(url2.split('/')[8])

    while url1_num < url2_num + 1:
        url_combined = 'https://www.parliament.bg/bg/plenaryst/ns/55/ID/' + str(url1_num)
        urls.append(url_combined)
        url1_num +=1

    print('Number of URLs to scrape: ' + str(len(urls)))
    return urls

Regular Expressions to parse different Elements from Corpus

In [4]:
def end_of_hearing(text):
    end_of_hearing_pattern = re.compile(r'\d\d,\d\d\sч.\)\n{2,4}[А-Я][а-я]+')
    match = end_of_hearing_pattern.search(text)
    return match

def end_position_hearing(text):
    end_of_hearing_pattern = re.compile(r'\d\d,\d\d\sч.\)\n{2,4}[А-Я][а-я]+')
    match = end_of_hearing_pattern.search(text)
    end_position = match.start()
    return end_position

Load Webpage of Parliament.bg and extract the text corpus. 
Setup Crawler with an Explicit waiting strategy (upon loading the xpath containing the text corpus). Fetch the corpus

In [5]:
def scraper (urls,explicit_wait_seconds,poll_frequency):

    chrome_driver_path = r"C:\Program Files (x86)\chromedriver-win64\chromedriver.exe"
    cService = webdriver.ChromeService(chrome_driver_path)
    driver = webdriver.Chrome(service = cService)

    texts = []
    successful_urls = []
    unsuccessful_urls = []
    unsuccessful_messages = []

    for url in urls:

        driver.get(url)

        try: 
            WebDriverWait(driver, explicit_wait_seconds,poll_frequency).until(EC.presence_of_element_located((By.XPATH, '/html/body/div/main/div/div/div[2]/div[1]/div/div[3]')))
            corpus = driver.find_element(By.ID, 'app')

            if end_of_hearing(corpus.text) is None:

                unsuccessful_urls.append(url)
                unsuccessful_messages.append('Initial Xpath located for Url but corpus is empty')

            else:

                successful_urls.append(url)
                texts.append(corpus.text)
        
        except: 
            WebDriverWait(driver, explicit_wait_seconds, poll_frequency).until(EC.presence_of_element_located((By.XPATH, '//*[@id="app"]/main/div/div/div[2]/div[1]/div/div[2]')))
            corpus = driver.find_element(By.ID, 'app')

            if end_of_hearing(corpus.text) is None:

                unsuccessful_urls.append(url)
                unsuccessful_messages.append('Error triggered: Initial Xpath was not located and no corpus was found')

            else:

                successful_urls.append(url)
                texts.append(corpus.text)
    
    scraper_dict = {
        'texts' : texts,
        'successful_urls' : successful_urls,
        'unsuccessful_urls' : unsuccessful_urls,
        'unsuccessful_messages' : unsuccessful_messages
    }

    print('Number of scraped URLs: ' + str(len(successful_urls)) + ' (' + str(round(100*len(successful_urls)/len(urls),2)) +'% Success)' )

    return scraper_dict

Get Basic Attributes, which will be added to each statement (Assembly Number, Hearing Number, Date)

In [6]:
def general_parser (text):

    assembly_pattern = re.compile(r'[А-Я]+\sИ\s[А-Я]+\sНАРОДНО\sСЪБРАНИЕ|[А-Я]+\sНАРОДНО\sСЪБРАНИЕ') 
    session_pattern = re.compile(r'[А-Я]+\sИ\s[А-Я]+\sСЕСИЯ|[А-Я]+\sСЕСИЯ')
    hearing_pattern = re.compile(r'([А-Я]+\sИ\s[А-Я]+\s[А-Я]+\sЗАСЕДАНИЕ|[А-Я]+\sИ\s[А-Я]+\sЗАСЕДАНИЕ|[А-Я]+\s[А-Я]+\sЗАСЕДАНИЕ|[А-Я]+\sЗАСЕДАНИЕ)') 


    assembly_matches = assembly_pattern.search(text)

    try:
        session_matches = session_pattern.search(text)
    except:
        session_matches = 'False'


    if session_matches == 'False':

        hearing_matches = hearing_pattern.search(text[session_matches.end():])
    else:
        hearing_matches = hearing_pattern.search(text[assembly_matches.end():])


    assembly = assembly_matches.group().title()
    hearing = hearing_matches.group().title()

    pattern_date = re.compile(r'(\d{2}).(\d{2}).(\d{4})') # find the date of the session
    matches_date = pattern_date.search(text)
    year = matches_date.group(3)
    month = matches_date.group(2)
    day = matches_date.group(1)
    date = year + '.' + month + '.' + day

    general_info_dict = {
        
        'assembly': assembly,
        'hearing': hearing,
        'date': date
                         }

    return general_info_dict

Get Statements, politicians first and last names and political parties

In [7]:
def statements_parser (text,url):

    general_info_dict = general_parser(text)
    pattern_statements = re.compile(r'([А-Я]+\s)?([А-Я]+\s)([А-Я]+)(:|\s\((.+)\):)') # find  first name + last name + political party + statement
    matches = pattern_statements.finditer(text)
    match_position = pattern_statements.finditer(text)

    first_names = []
    last_names = []
    political_parties_raw = []
    assembly_roles = []
    statements = []
    end_positions = []
    start_positions = []
    assemblies = []
    hearings = []
    dates = []
    urls = []


    for index in match_position:

        first_names.append(index.group(2).title())
        last_names.append(index.group(3).title())
        political_parties_raw.append(index.group(4))
        end_positions.append(index.end())
        start_positions.append(index.start())

        if index.group(1) is None:
            assembly_roles.append('Политик')
        else:
            assembly_roles.append(index.group(1).title())


    i=0
    
    while  i < len(end_positions):

        assemblies.append(general_info_dict.get('assembly'))
        hearings.append(general_info_dict.get('hearing'))
        dates.append(general_info_dict.get('date'))
        urls.append(url)
        i+=1

    i=0   
    
    end_position = end_position_hearing(text)

    for match in matches:

        if i == len(end_positions) - 1:
            
            clean_statement = text[end_positions[i]:end_position].replace('\n', ' ').strip().translate({ord(i): None for i in '('}).strip()
            statements.append(clean_statement)
            break
        else:
            clean_statement = text[end_positions[i]:start_positions[i+1]].replace('\n', ' ').strip()
            statements.append(clean_statement)
            i+=1


    political_parties = []
    speaking_locations = []

    for party in political_parties_raw:

        if party == ':':
            political_parties.append('Председателски Орган')
            speaking_locations.append('От Трибуната')
        else:
            if ', от' in party:
                clean = party.translate({ord(i): None for i in '():'}).strip()
                clean_split = clean.split(', ')
                political_parties.append(clean_split[0])
                speaking_locations.append(clean_split[1].title())

            elif 'встрани от микрофоните' in party:
                speaking_locations.append('От Място')
                political_parties.append('')

            else:
                clean = party.translate({ord(i): None for i in '():'}).strip()
                political_parties.append(clean)
                speaking_locations.append('От Трибуната')


    statements_dict = {
        'Народно Събрание': assemblies,
        'Заседание': hearings, 
        'Дата': dates, 
        'Позиция в Парламента': assembly_roles,
        'Първо Име': first_names,
        'Фамилно Име': last_names,
        'Партия': political_parties,
        'Говорил От': speaking_locations,
        'Изказване': statements,
        'Начална Позиция на Изказване': start_positions,
        'Крайна Позиция на Изказване': end_positions,
        'Линк към изказване': urls
    }

    return statements_dict

Create a Pandas df from the gathered Attributes and save it to CSV

In [8]:
def save_df (statements_dict):

    df = pd.DataFrame.from_dict(statements_dict)
    df.to_csv(r'C:\Users\ivank\Desktop\Scraper\Hearings\{date}_{assembly}.csv'.format(assembly= statements_dict.get('Народно Събрание')[0],date= statements_dict.get('Дата')[0]), encoding='utf-8-sig')

Create a function Iterating through texts and mapping texts to CSV and subsequently saving them 

In [9]:
def parser (scraper_dict):

    texts = scraper_dict.get('texts')
    urls = scraper_dict.get('successful_urls')
    failed_scraping_urls = scraper_dict.get('unsuccessful_urls')
    failed_scraping_messages = scraper_dict.get('unsuccessful_messages')

    i = 0          
    failed_mapping_urls = []
    failed_mapping_messages = []

    for text,url in zip(texts,urls):


        try:
            statements_dict = statements_parser(text,url)
            save_df(statements_dict)

        except:
            failed_mapping_urls.append(url)
            failed_mapping_messages.append('The parsing failed')
        

        i+=1


    done_count = len(urls) - len(failed_mapping_urls)
    success_rate = str(round(100*done_count / len(texts),2))

    print('Successfully parsed and saved ' + str(done_count) + ' Texts (' + success_rate + '% Success)' )

    failed_urls = failed_mapping_urls + failed_scraping_urls
    failed_messages = failed_mapping_messages + failed_scraping_messages

    failed_dict = { 'Url': failed_urls,
                    'Message': failed_messages
    }

    df_failed =pd.DataFrame.from_dict(failed_dict)
    df_failed.to_csv(r'C:\Users\ivank\Desktop\Scraper\Failed Reports\Failed_Report.csv', encoding='utf-8-sig')
    
    return


Combine all methods

In [10]:
def parliament_scraper (url1,url2, explicit_wait_seconds, poll_frequency):

    urls = url_list(url1,url2)
    scraper_dict = scraper(urls,explicit_wait_seconds,poll_frequency)
    parser(scraper_dict)


In [15]:
url1 = 'https://www.parliament.bg/bg/plenaryst/ns/55/ID/10958'
url2 = 'https://www.parliament.bg/bg/plenaryst/ns/55/ID/10969'

explicit_wait_seconds = 10
poll_frequency = 2


parliament_scraper(url1, url2, explicit_wait_seconds, poll_frequency)

Number of URLs to scrape: 12
Number of scraped URLs: 12 (100.0% Success)
Successfully parsed and saved 12 Texts (100.0% Success)
