In [10]:
import json
import logging
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from itertools import islice
from google.cloud import storage, bigquery

# Note: you'll need a functioning Google Cloud account to be able to use this notebook
from secrets.secrets import PROJECT_ID, BUCKET_URI, GS_FOLDER, BQ_DATASET, BQ_TABLE
from helpers import (
    n_fibonacci, calc_lengths,
    calc_length, print_stuff,
    create_bigquery_client,
    stream_data, bq_data_insert
)

logger = logging.getLogger("notebook")
logger.setLevel(logging.DEBUG)

In [11]:
def get_blobs():
    """Get blobs for files on storage"""
    client = storage.Client(project=PROJECT_ID)
    bucket = client.get_bucket(BUCKET_URI)
    
    blob_names = (
        b.name for b in bucket.list_blobs(
            prefix="data",
        ) if not b.name.endswith("/")
    )
    for bname in blob_names:
        yield bucket.blob(bname)

def download_as_string(myblob):
    return myblob.download_as_string().decode()

def blobs_slice():
    return islice(get_blobs(), 16)

In [12]:
%%time
# Sequential code
seq_sizes = [
    download_as_string(b) for b in blobs_slice()
]
seq_sizes = map(int, seq_sizes)
fibonaccis = map(n_fibonacci, seq_sizes)
sizes = map(lambda x: len(str(x)), fibonaccis)
# number_size = calc_length(fibonacci)
for a, b in zip(seq_sizes, sizes):
    data = json.dumps(
        dict(
            col1=a,
            col2=b,
        )
    )
    print(data)
    stream_data(data, BQ_DATASET, BQ_TABLE)
# print(sizes)

{"col1": 200006, "col2": 41799}
{"col1": 200003, "col2": 41798}
{"col1": 200006, "col2": 41798}
{"col1": 200001, "col2": 41799}
{"col1": 200004, "col2": 41798}
{"col1": 200009, "col2": 41799}
{"col1": 200002, "col2": 41799}
{"col1": 200008, "col2": 41799}
CPU times: user 4.29 s, sys: 80 ms, total: 4.37 s
Wall time: 18.6 s


In [13]:
from functools import partial
upstream_mytable = partial(
    stream_data,
    dataset_name=BQ_DATASET,
    table_name=BQ_TABLE,
)

def cpu_bound_work(input_number):
    return len(str(n_fibonacci(int(input_number))))

In [14]:
%%time
# Concurrent code 1
with ProcessPoolExecutor() as pp:
    with ThreadPoolExecutor() as tp:
        # This needs to stay in memory
        seq_sizes = list(
            tp.map(
                download_as_string, blobs_slice()
            )
        )
        sizes = pp.map(
            cpu_bound_work, seq_sizes
        )
        data = [
            json.dumps(
                dict(
                    col1=a,
                    col2=b,
                )
            ) for a, b in zip(seq_sizes, sizes)
        ]
#         print(list(data))
        stream = tp.map(
            upstream_mytable, data
        )
        print(list(stream))

[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]
CPU times: user 1.26 s, sys: 212 ms, total: 1.48 s
Wall time: 8.19 s


In [15]:
%%time
# Concurrent code 2
with ProcessPoolExecutor() as pp:
    with ThreadPoolExecutor() as tp:
        futures = [
            tp.submit(
                    download_as_string, b
                ) for b in blobs_slice()
        ]
        length_futures = {
            pp.submit(
                cpu_bound_work, f.result()
            ): f.result() for f in as_completed(futures)
        }
        for f in as_completed(length_futures):
            data = json.dumps(
                dict(
                    col1=length_futures[f],
                    col2=f.result(),
                )
            )
#             print(data)
            tp.submit(upstream_mytable, data)

        print(length_futures.values())

dict_values(['200006', '200006', '200003', '200006', '200001', '200005', '200002', '200004', '200001', '200001', '200005', '200009', '200008', '200007', '200002', '200006'])
CPU times: user 720 ms, sys: 176 ms, total: 896 ms
Wall time: 7.66 s
