In [None]:
#!pip install requests
#!pip install pandas
#!pip install kafka
#!pip install beautifulsoup4
#!pip install requests-cache
#!pip install retry-requests
#!pip install atpbar
#!pip install ipywidgets

In [None]:
#imports
import threading
import time
import requests
import re
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import openmeteo_requests
import requests_cache
import pandas as pd
from retry_requests import retry
import json
from atpbar import atpbar
from atpbar import flush

In [None]:
# set parameters
source_url='https://öffi.at/?archive=1&text=&types=2%2C3&page='
kafka_server = 'localhost:9092'
thread_count=4

In [None]:
# get number of total pages
response = requests.get(source_url)
soup = BeautifulSoup(response.content, 'html.parser')
pages=int(soup.find(string=re.compile(r'Aktuelle Seite: \d+/\d+')).split('/')[-1].strip()[:-1])
print(f'{pages} pages found in total')

In [None]:
# init producer
producer = KafkaProducer(bootstrap_servers=kafka_server,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [None]:
#get data
def scrape_oeffi_to_kafka(min,max):
    cur_thread=threading.current_thread().name
    if min == 0:
        min=1
    if max > pages+1:
        max=pages+1
    print(f'thread {cur_thread} working on pages {min} to {max}')
    
    for n in range(min,max):
        # Get page source html
        response = requests.get(f'{source_url}3036')
    
        # Parse html into beautifulSoup
        soup = BeautifulSoup(response.content, 'html.parser')
        interruption_list_raw = soup.find('ul', {'class': 'category-filter'})
    
        interruptions = []
    
        # Build json
        for delay in interruption_list_raw.findChildren('li', attrs={'class': 'disruption uk-padding-small'},recursive=False):
            lines = []
            stations = []
        
            # Assign variables
            id = delay.attrs['id']
            title = delay.find('h2',{'class':'disruption-title'}).text.split(':')[-1].strip()
            content = delay.find('div',{'class':'uk-accordion-content'})
            behoben = content.find('p') != None
        
            if len(content.find_all('ul')) > 0:
              for line in content.find_all('ul')[0].find_all('li'):
                  lines.append(line.text)
          
              if len(content.find_all('ul')) > 1: # some interruptions do not have stations see page 3041 N24
                  for station in content.find_all('ul')[1].find_all('li'):
                      stations.append(station.text)
        
            start = content.find_all(string=re.compile(r': \d{2}\.\d{2}\.\d{4} \d{2}:\d{2}'))[0].split(': ')[1]
            end = content.find_all(string=re.compile(r': \d{2}\.\d{2}\.\d{4} \d{2}:\d{2}'))[1].split(': ')[1]
        
            #combine into a single dict and send to kafka
            interruption = {
              'id': id,
              'title':title,
              'behoben': behoben,
              'lines': lines,
              'stations': stations,
              'start': start,
              'end': end
            }
            producer.send('delays', interruption)
            producer.flush()
            interruptions.append(interruption)

        # wait 5 secs before next request
        time.sleep(5)

In [None]:
#init threads
threads = []
pages_per_thread=int((pages+1)/thread_count)
for n in range(1,thread_count+1):
    thread = threading.Thread(target=scrape_oeffi_to_kafka, args=(pages_per_thread*(n-1),pages_per_thread*n), name='t'+str(n))
    threads.append(thread)
    thread.start()

In [None]:
for thread in threads:  
    thread.join()
flush()
producer.close()

In [None]:
producer.close()