## Welcome to the Futures Lesson!

### What are futures?

Futures are small tasks that you want returned at a later time. They are asynchronous like threads, but have an extra layer of helper functions around them.

Let's take a look at the following function...

In [1]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_3_secs(message):
    sleep(3)
    return message

# There are three threads here!
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_3_secs, ('fin'))
print(future.done())
sleep(3)
print(future.done())
print(future.result())

False
True
fin


You can see that as we query **```future.done()```**, we keep getting false! Well, obviously this is true, we haven't run our future yet! We can start our future with **```future.result()```**. This will attempt to run any query submitted to it with **```pool.submit()```**. Once our future is started we will have to wait 3 seconds to see it returned from the function because of the line **```sleep(3)```**

Let's take a look at our second example. The **```with```** statement loads all the threads into a new pool we are calling **```executor```**. The executor is assigned a future for each url and **```concurrent.futures.as_completed(future_to_url)```** cycles through those futures. **```data = future.result()```** takes the results of the futures so that we can print them out in our try except statment.

If you are having trouble understanding how the **```with```** statement works, check out the explanation [here](http://preshing.com/20110920/the-python-with-statement-by-example/). 

In [4]:
import concurrent.futures
import requests
    
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://google.com/']
 
# Retrieve a single page and report the url and contents
def load_url(url):
    response = requests.get(url)
    return response.text
 
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

'http://www.cnn.com/' page is 130297 bytes
'http://www.foxnews.com/' page is 70367 bytes
'http://google.com/' page is 11519 bytes
'http://www.bbc.co.uk/' page is 181448 bytes
'http://europe.wsj.com/' page is 891076 bytes


## Here's an example of the ProcessPoolExecutor()

What do you think the advantages are of running multiple queries on different processes vs. different threads?
What do you think the advantages are of using a ProcessPoolExecutor vs. ThreadPoolExecutor?

In [5]:
import concurrent.futures
import math
 
PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
 
def is_prime(n):
    if n % 2 == 0:
        return False
 
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
 
def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
 
main()

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False


### Why?

You can obviously run the same exact code with a **```threadpoolexecutor```**, so why even bother? Well, threads stay inside the process that they are run in, while processes stay inside your kernel (Operating System). So while each process on your computer (iPython in this case) only has limited resources for each thread, multiple processes are able to draw from a much larger pool of resources. This comes with a price, as seperating your calculations into processes can slow down and even crash your kernel if they are not run correctly. In this way there is a trade off. Generally you will want to use threads until you absolutely have to upgrade to processes.

## as_completed

The next useful feature we are going to learn today is as_completed. This returns each future as they are completed! All you need to do is count up your threads to make sure they all come back.

In [11]:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
 
def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
 
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))
 
for x in as_completed(futures):
    print(x.result())

Return of 4
Return of 1
Return of 0
Return of 2
Return of 3


## Map

The last feature is the map function. The code below maps the values 0-9 to the function func. Try to use this function as much as possible. It will help you out immensely when you need to deal with large datasets.

In [12]:
import concurrent.futures

def func(num):
    return num + 5

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(func, range(10))
    for x in results:
        print x

5
6
7
8
9
10
11
12
13
14


As a final example I have included the imdb top 250 query done in futures. Try changing around the max workers and playing around with the Thread / Process Pools. Do you notice any difference in the speed with how things are returned? What are the tradeoffs of the different settings?

In [22]:
import concurrent.futures
import requests
import re
import pandas as pd
import sys

def get_top_250():
    response = requests.get('http://www.imdb.com/chart/top')
    html = response.text
    entries = re.findall("<a href.*?/title/(.*?)/", html) #Wrong regex
    return list(set(entries))

def queryOMDB(id):
    res = requests.get('http://www.omdbapi.com/?i='+id)
    return {'id':id,'data':res.text,'type':'omdb'}

def queryGross(id):
    try:
        res = requests.get('http://www.imdb.com/title/'+id)
        gross_list = re.findall("Gross:</h4>[ ]*\$([^ ]*)", res.text)
        gross = int(gross_list[0].replace(',', ''))
        return {'id':id,'data':gross,'type':'gross'}
    except Exception as ex:
        return {'id':id,'data':'','type':'gross'}

def checkAllDone(futures):
    for x in futures:
        if x.running():
            return False
    return True

executors_list = []
pre_df = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    for url in get_top_250():
        executors_list.append(executor.submit(queryOMDB, url))
        executors_list.append(executor.submit(queryGross, url))
    for future in executors_list:
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            if not data['id'] in pre_df:
                pre_df[data['id']] = {}
                print str(100 * (len(pre_df.keys() * 2) / float(len(executors_list)))) + '%'
            if data['type'] == 'gross':
                pre_df[data['id']]['gross'] = data['data']
            if data['type'] == 'omdb':
                pre_df[data['id']]['omdb'] = data['data']
            if checkAllDone(executors_list):
                print 'fin'
                new_df = pd.DataFrame(pre_df).T

0.4%
0.8%
1.2%
1.6%
2.0%
2.4%
2.8%
3.2%
3.6%
4.0%
4.4%
4.8%
5.2%
5.6%
6.0%
6.4%
6.8%
7.2%
7.6%
8.0%
8.4%
8.8%
9.2%
9.6%
10.0%
10.4%
10.8%
11.2%
11.6%
12.0%
12.4%
12.8%
13.2%
13.6%
14.0%
14.4%
14.8%
15.2%
15.6%
16.0%
16.4%
16.8%
17.2%
17.6%
18.0%
18.4%
18.8%
19.2%
19.6%
20.0%
20.4%
20.8%
21.2%
21.6%
22.0%
22.4%
22.8%
23.2%
23.6%
24.0%
24.4%
24.8%
25.2%
25.6%
26.0%
26.4%
26.8%
27.2%
27.6%
28.0%
28.4%
28.8%
29.2%
29.6%
30.0%
30.4%
30.8%
31.2%
31.6%
32.0%
32.4%
32.8%
33.2%
33.6%
34.0%
34.4%
34.8%
35.2%
35.6%
36.0%
36.4%
36.8%
37.2%
37.6%
38.0%
38.4%
38.8%
39.2%
39.6%
40.0%
40.4%
40.8%
41.2%
41.6%
42.0%
42.4%
42.8%
43.2%
43.6%
44.0%
44.4%
44.8%
45.2%
45.6%
46.0%
46.4%
46.8%
47.2%
47.6%
48.0%
48.4%
48.8%
49.2%
49.6%
50.0%
50.4%
50.8%
51.2%
51.6%
52.0%
52.4%
52.8%
53.2%
53.6%
54.0%
54.4%
54.8%
55.2%
55.6%
56.0%
56.4%
56.8%
57.2%
57.6%
58.0%
58.4%
58.8%
59.2%
59.6%
60.0%
60.4%
60.8%
61.2%
61.6%
62.0%
62.4%
62.8%
63.2%
63.6%
64.0%
64.4%
64.8%
65.2%
65.6%
66.0%
66.4%
66.8%
67.2%
67.6%
68.0%
68.4

In [24]:
# Total amount of vals
print "DataFrame Size: " , new_df.size
print
# Some sample ids
print "Sample id: "
print pd.Series(new_df.index.values).value_counts()[:5]
print
# The head
new_df.head()

DataFrame Size:  500

Sample id: 
tt0116231    1
tt1375666    1
tt0075686    1
tt0078748    1
tt0993846    1
dtype: int64



Unnamed: 0,gross,omdb
tt0012349,2500000.0,"{""Title"":""The Kid"",""Year"":""1921"",""Rated"":""NOT ..."
tt0015864,,"{""Title"":""The Gold Rush"",""Year"":""1925"",""Rated""..."
tt0017136,26435.0,"{""Title"":""Metropolis"",""Year"":""1927"",""Rated"":""N..."
tt0017925,,"{""Title"":""The General"",""Year"":""1926"",""Rated"":""..."
tt0018455,,"{""Title"":""Sunrise"",""Year"":""1927"",""Rated"":""NOT ..."
