<a href="https://colab.research.google.com/github/lblogan14/web_scraping_with_python/blob/master/ch16_crawling_in_parallel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Processes versus Thread
In computer science, each process running on an operating system can have multiple
threads. Each process has its own allocated memory, which means that multiple
threads can access that same memory, while multiple processes cannot and must
communicate information explicitly.

#Multithreaded Crawling
Use the `_thread` module

In [0]:
import _thread
import time

In [0]:
def print_time(threadName, delay, iterations):
  start = int(time.time())
  for i in range(0, iterations):
    time.sleep(delay)
    seconds_elapsed = str(int(time.time()) - start)
    print('{} {}'.format(seconds_elapsed, threadName))

In [3]:
try:
  _thread.start_new_thread(print_time, ('Fizz', 3, 33))
  _thread.start_new_thread(print_time, ('Buzz', 5, 20))
  _thread.start_new_thread(print_time, ('Counter', 1, 100))
except:
  print('Error: unable to start thread')
  
while 1:
  pass

1 Counter
2 Counter
3 Fizz
3 Counter
4 Counter
5 Buzz
5 Counter
6 Fizz
6 Counter
7 Counter


KeyboardInterrupt: ignored

This is a reference to the classic FizzBuzz programming test with a somewhat more verbose output.

To perform a more useful task in the threads, such as crawling a website:

In [0]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random

import _thread
import time

In [0]:
visited = []

def getLinks(thread_name, bsObj):
  print('Getting links in {}'.format(thread_name))
  links = bsObj.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
  return [link for link in links if link not in visited]

def scrape_article(thread_name, path):
  visited.append(path)
  html = urlopen('http://en.wikipedia.org{}'.format(path))
  time.sleep(5)
  bsObj = BeautifulSoup(html, 'html.parser')
  title = bsObj.find('h1').get_text()
  print('Scraping {} in thread {}'.format(title, thread_name))
  links = getLinks(thread_name, bsObj)
  if len(links) > 0:
    newArticle = links[random.randint(0, len(links)-1)].attrs['href']
    print(newArticle)
    scrape_article(thread_name, newArticle)

In [6]:
try:
  _thread.start_new_thread(scrape_article, ('Thread 1', '/wiki/Kevin_Bacon',))
  _thread.start_new_thread(scrape_article, ('Thread 2', '/wiki/Monty_Python',))
except:
  print('Error: unable to start threads')
  
while 1:
  pass

Scraping Kevin Bacon in thread Thread 1
Getting links in Thread 1
/wiki/Stacy_Keach
Scraping Monty Python in thread Thread 2
Getting links in Thread 2
/wiki/Razzies
Scraping Stacy Keach in thread Thread 1
Getting links in Thread 1
/wiki/Planes_(film)
Scraping Golden Raspberry Awards in thread Thread 2
Getting links in Thread 2
/wiki/25th_Golden_Raspberry_Awards
Scraping 25th Golden Raspberry Awards in thread Thread 2
Getting links in Thread 2
/wiki/Golden_Raspberry_Awards
Scraping Planes (film) in thread Thread 1
Getting links in Thread 1
/wiki/Finding_Nemo
Scraping Golden Raspberry Awards in thread Thread 2
Getting links in Thread 2
/wiki/Dean_Devlin
Scraping Finding Nemo in thread Thread 1
Getting links in Thread 1
/wiki/Gore_Verbinski


KeyboardInterrupt: ignored

Because now the crawling speed on Wikipedia is almost twice as fast as with a single thread, use `time.sleep(5)` to prevent the scrpt from putting too much of a load on Wikipedia's servers. In practice, when running against a server where the number of requests is not an issue, this line should be removed.

To keep track of the articles the threads have collectively seen, a list `visited` is used in this multi-threaded environment to store the visited websites.

##Race Conditions and Queues
Lists are great for appending to or reading from, but not so great for removing items
at arbitrary points, especially from the beginning of the list. Try to pass messages to threads using nonlist variables.

Queues are list-like objects that operate on either a First In First Out (FIFO)
approach or a Last In First Out (LIFO) approach. A queue receives messages from
any thread via `queue.put('My message')` and can transmit the message to any
thread that calls `queue.get()`.

Queues are not designed to store static data, but to transmit it in a thread-safe way.

Users can have a smaller number of database threads, each with its own connection, sitting around taking items from a queue and storing them. This provides a much more manageable set of database connections.

In [0]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
import _thread
from queue import Queue
import time
import pymysql

In [0]:
def storage(queue):
  conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock', user='root', passwd='', db='mysql', charset='utf8')
  cur = conn.cursor()
  cur.execute('USE wiki_threads')
  while 1:
    if not queue.empty():
      article = queue.get()
      cur.execute('SELECT * FROM pages WHERE path = %s', (article["path"]))
      if cur.rowcount == 0:
        print("Storing article {}".format(article["title"]))
        cur.execute('INSERT INTO pages (title, path) VALUES (%s, %s)', (article["title"], article["path"]))
        conn.commit()
      else:
        print("Article already exists: {}".format(article['title']))


In [0]:
visited = []
def getLinks(thread_name, bsObj):
  print('Getting links in {}'.format(thread_name))
  links = bsObj.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
  return [link for link in links if link not in visited]

def scrape_article(thread_name, path, queue):
  visited.append(path)
  html = urlopen('http://en.wikipedia.org{}'.format(path))
  time.sleep(5)
  bsObj = BeautifulSoup(html, 'html.parser')
  title = bsObj.find('h1').get_text()
  print('Added {} for storage in thread {}'.format(title, thread_name))
  queue.put({"title":title, "path":path})
  links = getLinks(thread_name, bsObj)
  if len(links) > 0:
    newArticle = links[random.randint(0, len(links)-1)].attrs['href']
    scrape_article(thread_name, newArticle, queue)

In [0]:
queue = Queue()
try:
  _thread.start_new_thread(scrape_article, ('Thread 1', '/wiki/Kevin_Bacon', queue,))
  _thread.start_new_thread(scrape_article, ('Thread 2', '/wiki/Monty_Python', queue,))
  _thread.start_new_thread(storage, (queue,))
except:
  print ('Error: unable to start threads')

while 1:
  pass

This script creates three threads: two to scrape pages from Wikipedia in a random
walk, and a third to store the collected data in a MySQL database.

##The `threading` Module
The Python `_thread` module allows users to micro-manage threads but does not provide a lot of higher-level functions that make life easier. The `threading` module allows users to use threads cleanly while still exposing all of the features of the underlying `_thread`.

The example is shown below:

In [0]:
import threading
import time

In [8]:
def print_time(threadName, delay, iterations):
  start = int(time.time())
  for i in range(0, iterations):
    time.sleep(delay)
    seconds_elapsed = str(int(time.time()) - start)
    print('{} {}'.format(seconds_elapsed, threadName))

Scraping Old Lake Highlands, Dallas in thread Thread 1
Getting links in Thread 1
/wiki/Kessler,_Dallas


In [0]:
threading.Thread(target=print_time, args=('Fizz', 3, 33)).start()
threading.Thread(target=print_time, args=('Buzz', 5, 20)).start()
threading.Thread(target=print_time, args=('Counter', 1, 100)).start()

This produces the same "FizzBuzz" output as the previous simple `_thread` example.

The `threading` module can also create local thread data that is unavailable to the other threads. This local data can be created at any point within the thread function by calling `threading_local()`. This solves the problem of race conditions happening between shared objects in threads.

In [0]:
import threading

In [0]:
def crawler(url):
  data = threading.local()
  data.visited = []
  #Crawl site
  
threading.Thread(target=crawler, args=('http://brookings.edu')).start()

The `isAlive` function looks to see if the thread is still active. It will be true until a thread completes crawling (or crashes). Crawlers are designed to run for a very long time. The `isAlive` method can ensure that, if a thread crashes, it restarts.

In [0]:
import threading
import time

In [0]:
class Crawler(threading.Thread):
  
  def __init__(self):
    threading.Thread.__init__(self)
    self.done = False
    
  def isDone(self):
    return self.done
  
  def run(self):
    time.sleep(5)
    self.done = True
    raise Exception('Something bad happened!')

In [0]:
t = Crawler()
t.start()

while True:
  time.sleep(1)
  if t.isDone():
    print('Done')
    break
    
  if not t.isAlive():
    t = Crawler()
    t.start()

The `Crawler` class contains an isDone method that can be used to check if the
crawler is done crawling.

Any exceptions raised by `Crawler.run` will cause the class to be restarted until `isDone`
is `True` and the program exits.

#Multiprocess Crawling
The Python `Process` module creates new process objects that can be started and joined from the main process. The following example uses the FizzBuzz example:

In [0]:
from multiprocessing import Process
import time

In [0]:
def print_time(threadName, delay, iterations):
  start = int(time.time())
  for i in range(0, iterations):
    time.sleep(delay)
    seconds_elapsed = str(int(time.time()) - start)
    print(threadName if threadName else seconds_elapsed)

In [15]:
processes = []
processes.append(Process(target=print_time, args=(None, 1, 100)))
processes.append(Process(target=print_time, args=("Fizz", 3, 33)))
processes.append(Process(target=print_time, args=("Buzz", 5, 20)))

for p in processes:
  p.start()
  
for p in processes:
  p.join()
  
print('Program complete')

Scraping Jack Hindon Medal in thread Thread 1
Getting links in Thread 1
/wiki/Nkwe_Medal
1
2
3
Fizz
4
5
Buzz
Scraping Cardinal number in thread Thread 2
Getting links in Thread 2
/wiki/Dual_number
6
Fizz
Scraping Nkwe Medal in thread Thread 1
Getting links in Thread 1
/wiki/South_African_military_decorations_order_of_wear#Order_of_wear
7
8
9
Fizz
10
Buzz
Scraping Dual number in thread Thread 2
Getting links in Thread 2
/wiki/Nilpotent
11
Scraping South African military decorations order of wear in thread Thread 1
Getting links in Thread 1
/wiki/Honoris_Crux_Diamond
12
Fizz
13
14
15
Fizz
Buzz
Scraping Nilpotent in thread Thread 2
Getting links in Thread 2
/wiki/Digital_object_identifier
16


Process Process-2:
Process Process-1:
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-13-aeccb3614d11>", line 4, in print_time
    time.sleep(delay)
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-13-aeccb3614d11>", line 4, in print_time
    time.sleep(delay)
  File "<

Scraping Honoris Crux Diamond in thread Thread 1
Getting links in Thread 1
/wiki/South_African_Defence_Force


KeyboardInterrupt: ignored

Each process is treated as an individual independent program by the OS. The PIDs can be found using the `os` module:

```
import os
...
#prints the child PID
os.getpid()
#prints the parent PID
os.getppid()
```

Each process in your program should print a different PID for the line `os.getpid()`,
but will print the same parent PID on `os.getppid()`.

The `join` statement here is needed to execute any code after the child processes complete.

##Multiprocess Crawling

In [0]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random

from multiprocessing import Process, Queue
import os
import time
from threading import Thread

In [0]:
def getLinks(bsObj, queue):
  print('Getting links in {}'.format(os.getpid()))
  links = bsObj.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
  return [link for link in links if link not in queue.get()]

In [0]:
def scrape_article(path, queue):
  queue.get().append()
  print('Process {} list is now: {}'.format(os.getpid(), visited))
  html = urlopen('http://en.wikipedia.org{}'.format(path))
  time.sleep(5)
  bsObj = BeautifulSoup(html, 'html.parser')
  title = bsObj.find('h1').get_text()
  print('Scraping {} in process {}'.format(title, os.getpid()))
  links = getLinks(bsObj)
  if len(links) > 0:
    newArticle = links[random.randint(0, len(links)-1)].attrs['href']
    print(newArticle)
    scrape_article(newArticle)

In [0]:
processes = []
queue = Queue()
processes.append(Process(target=scrape_article, args=('/wiki/Kevin_Bacon', queue,)))
processes.append(Process(target=scrape_article, args=('/wiki/Monty_Python', queue,)))

for p in processes:
  p.start()


Here the process is artificially slowed by including a `time.sleep(5)` so that this can be used for example purposes without placing an unreasonably high load on Wikipedia’s servers.

Crawling in separate processes is, in theory, slightly faster than crawling in separate
threads for two major reasons:
* Processes are not subject to locking by the GIL and can execute the same lines of
code and modify the same (really, separate instantiations of the same) object at
the same time.
* Processes can run on multiple CPU cores, which may provide speed advantages if
each of your processes or threads is processor intensive.

##Communicating between Processes
Processes operate in their own independent memory, which can cause problems if
they are expected to share information.

If the static list of web pages was replaced with some sort of a scraping delegator, the scrapers could pop off a task from one queue in the form of a path to scrape (for example, */wiki/Monty_Python*) and in return, add a list of "found URLs" back onto a separate queue that would be processed by the scraping delegator so that only new URLs were added to the first task queue:

In [0]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
from multiprocessing import Process, Queue
import os
import time

In [0]:
def task_delegator(taskQueue, foundUrlsQueue):
  #Initialize with a task for each process
  visited = ['/wiki/Kevin_Bacon', '/wiki/Monty_Python']
  taskQueue.put('/wiki/Kevin_Bacon')
  taskQueue.put('/wiki/Monty_Python')

  while 1:
    #Check to see if there are new links in the foundUrlsQueue for processing
    if not foundUrlsQueue.empty():
      links = [link for link in foundUrlsQueue.get() if link not in visited]
      for link in links:
        #Add new link to the taskQueue
        taskQueue.put(link)
        #Add new link to the visited list
        visited.append(link)

In [0]:
def get_links(bsObj):
  links = bsObj.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
  return [link.attrs['href'] for link in links]

In [0]:
def scrape_article(taskQueue, foundUrlsQueue):
  while 1:
    while taskQueue.empty():
      #Sleep 100 ms while waiting for the task queue 
      #This should be rare
      time.sleep(.1)
    path = taskQueue.get()
    html = urlopen('http://en.wikipedia.org{}'.format(path))
    time.sleep(5)
    bsObj = BeautifulSoup(html, 'html.parser')
    title = bsObj.find('h1').get_text()
    print('Scraping {} in process {}'.format(title, os.getpid()))
    links = get_links(bsObj)
    #Send these to the delegator for processing
    foundUrlsQueue.put(links)

In [0]:
processes = []
taskQueue = Queue()
foundUrlsQueue = Queue()
processes.append(Process(target=task_delegator, args=(taskQueue, foundUrlsQueue,)))
processes.append(Process(target=scrape_article, args=(taskQueue, foundUrlsQueue,)))
processes.append(Process(target=scrape_article, args=(taskQueue, foundUrlsQueue,)))

for p in processes:
  p.start()

Rather than each process or thread following its own random walk from the starting
point they were assigned, they work together to do a complete coverage crawl of the
website. Each process can pull any “task” from the queue, not just links that they have
found themselves.