# <center> Producer-consumer test </center><a class="tocSkip">

# Introduction

Import modules:

In [1]:
import queue
import csv
import threading
import requests
from bs4 import BeautifulSoup

## The problem

Delivery: Push to a git repository A Simple Producer/Consumer Web Link Extractor

The Producer
1. The producer receives a list of URLs ­ it can be from file, command line etc; doesn't matter.
2. It extracts the markup from each of the URLs and places this output onto some form of queue.

The Consumer

1. The consumer reads the queue until it is empty and the producer is no longer extracting markup.
2. It parses the HTML and extracts and hyperlinks into a list. This list is output (file or command line) against each parsed URL.

Requirements

1. The producer and consumer must run concurrently.
2. Error handling should ensure isolation ­ one bad fetch or parse should not affect processing of others.
3. Some unit tests.
4.  Create a GitHub account and put the project there, before sending us a link.

Bonus Points
1. URLs fetched concurrently.
2. Trimming oldest queue entries if queue size balloons.
3. Comprehensive test coverage.
4. Other considerations/enhancements that we have neglected here.

# Components

In this section, we'll gather together some individual components that we'll later use.

Extract the URLs from a CSV into an array:

In [2]:
file_urls = 'URLs.csv'
urls = []
with open(file_urls, 'r', encoding='utf-8-sig') as file:
    reader = csv.reader(file)
    for row in reader:
        urls.append(row[0])
urls

['https://www.google.co.uk/', 'https://example.com/']

A function to extract all absolute URLs from a URL:

In [3]:
def extract_absolute_links(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    urls = []
    for a_tag in soup.find_all('a', href=True):
        href = a_tag['href']
        if href.startswith('http'):
            urls.append(href)
    return urls

url = 'https://example.com/'
extract_absolute_links(url)

['https://www.iana.org/domains/example']

# Solution

Extract URLs from a CSV (URLs must begin with 'http'):

In [4]:
file_urls = 'URLs.csv'
urls = []
with open(file_urls, 'r', encoding='utf-8-sig') as file:
    reader = csv.reader(file)
    for row in reader:
        urls.append(row[0])
urls

['https://www.google.co.uk/', 'https://example.com/']

## Producer class

The `Producer` class inherits from the `Thread` class. It takes as input: (1) a `Queue`, (2) the extracted URLs. The `Queue` is the shared buffer, into which the producer adds, and from which the consumer takes.

The `run()` method describes what the producer does. Namely, for a URL, it extracts the markup, and adds the URL-markup pair as an element to the queue. If the URL isn't in the correct format, then it doesn't add this to the queue, instead showing an exception.

In [5]:
class Producer(threading.Thread):
    def __init__(self, q, urls):
        super().__init__()
        self.q = q
        self.urls = urls
    
    def run(self):
        for i in range(len(self.urls)):
            url = self.urls[i]
            try:
                response = requests.get(url)
                soup = BeautifulSoup(response.text, 'html.parser')
                print(f'Producer: Adding {url} to the queue')
                self.q.put([url, soup])
            except requests.exceptions.MissingSchema as e:
                print(f'error: {e}')

## Consumer class

The `Consumer` class also inherits from the `Thread` class. It takes the same `Queue` object as an input. 

Its `run()` method removes and returns an item from the queue. It attempts to extract all the absolute hyperlinks in a given URL, and puts this into a dictionary, where the keys are the URLs, and the values are the hyperlinks in that URL.

The `get()` function looks for an item in the queue. If one is not found within 4 seconds, then it checks if the producer thread is still alive. If it isn't, then the producer will not do anything further, and so the consumer also has nothing further to do.

In [6]:
class Consumer(threading.Thread):
    def __init__(self, q):
        super().__init__()
        self.q = q
        self.url2urls = {}
    
    def run(self):
        while True:
            try:
                url, soup = self.q.get(timeout=4)
                print(f'Consumer: Extracting hyperlinks in {url}')
                urls = []
                for a_tag in soup.find_all('a', href=True):
                    href = a_tag['href']
                    if href.startswith('http'):
                        urls.append(href)
                self.url2urls[url] = urls
            except queue.Empty:
                if not producer_thread.is_alive():
                    break

The following code creates producer and consumer threads, starts them, and outputs the dictionary of URLs and hyperlinks:

In [7]:
q = queue.Queue(maxsize=10)
producer_thread = Producer(q, urls)
consumer_thread = Consumer(q)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
consumer_thread.url2urls

Producer: Adding https://www.google.co.uk/ to the queue
Consumer: Extracting hyperlinks in https://www.google.co.uk/
Producer: Adding https://example.com/ to the queue
Consumer: Extracting hyperlinks in https://example.com/


{'https://www.google.co.uk/': ['https://www.google.co.uk/imghp?hl=en&tab=wi',
  'https://maps.google.co.uk/maps?hl=en&tab=wl',
  'https://play.google.com/?hl=en&tab=w8',
  'https://www.youtube.com/?tab=w1',
  'https://news.google.com/?tab=wn',
  'https://mail.google.com/mail/?tab=wm',
  'https://drive.google.com/?tab=wo',
  'https://www.google.co.uk/intl/en/about/products?tab=wh',
  'http://www.google.co.uk/history/optout?hl=en',
  'https://accounts.google.com/ServiceLogin?hl=en&passive=true&continue=https://www.google.co.uk/&ec=GAZAAQ',
  'https://www.google.co.uk/setprefdomain?prefdom=US&sig=K_LIwOL7l082BZnfJBg3VE636h4xs%3D'],
 'https://example.com/': ['https://www.iana.org/domains/example']}

## Unit tests

We use a bad input for a URL to show how such things get handled:

In [8]:
urls = ['https://www.google.co.uk/', 'https://example.com/', 'bad']
q = queue.Queue(maxsize=10)
producer_thread = Producer(q, urls)
consumer_thread = Consumer(q)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
consumer_thread.url2urls

Producer: Adding https://www.google.co.uk/ to the queue
Consumer: Extracting hyperlinks in https://www.google.co.uk/
Producer: Adding https://example.com/ to the queue
Consumer: Extracting hyperlinks in https://example.com/
error: Invalid URL 'bad': No scheme supplied. Perhaps you meant https://bad?


{'https://www.google.co.uk/': ['https://www.google.co.uk/imghp?hl=en&tab=wi',
  'https://maps.google.co.uk/maps?hl=en&tab=wl',
  'https://play.google.com/?hl=en&tab=w8',
  'https://www.youtube.com/?tab=w1',
  'https://news.google.com/?tab=wn',
  'https://mail.google.com/mail/?tab=wm',
  'https://drive.google.com/?tab=wo',
  'https://www.google.co.uk/intl/en/about/products?tab=wh',
  'http://www.google.co.uk/history/optout?hl=en',
  'https://accounts.google.com/ServiceLogin?hl=en&passive=true&continue=https://www.google.co.uk/&ec=GAZAAQ',
  'https://www.google.co.uk/setprefdomain?prefdom=US&sig=K_cjZ06qbYqJ9t6yguF9R1MjJrcdY%3D'],
 'https://example.com/': ['https://www.iana.org/domains/example']}