Script that will merge all the channel NPZ files together for a single time step on Google's GCP storage buckets to efficiently fetch a single time step in one go.

In [1]:
# TODO: Move this into an actual script rather than a notebook.

#!env python
"""
Script that will merge all the channel NPZ files together for a single time step on Google's GCP storage buckets
to efficiently fetch a single time step in one go.

Before running this, run the following command on your remote GCP compute instance:
gcloud auth application-default login

Note that you must also have the correct GCP access scopes set for your service provider
account for this script to work:
* Compute Engine - Read Write
* Storage - Full

Make sure the project wide service account also has the right cloud storage roles. Run
this on your local laptop:
export SERVICE_ACCOUNT=524456442905-compute@developer.gserviceaccount.com
export BUCKET_NAME=fdl-sdo-data
gsutil iam ch serviceAccount:$SERVICE_ACCOUNT:roles/storage.objectAdmin gs://$BUCKET_NAME
gsutil iam ch serviceAccount:$SERVICE_ACCOUNT:roles/storage.objectCreator gs://$BUCKET_NAME
gsutil iam ch serviceAccount:$SERVICE_ACCOUNT:roles/storage.objectViewer gs://$BUCKET_NAME

When you run this script, ensure you send its output to a log:
python ./combine_channels.py > /tmp/combine_output.log 2>&1 & tail -f /tmp/combine_output.log
Grep the log to make sure no "FINAL ERROR!!!!!!!!" messages are in there; if they are, they indicate
holes in your processed dataset due to un-fixable retries.


Example of how you'd reload one of these compressed all-in-one channel files after this script
is finished pre-processing everything:

client = storage.Client()
bucket = client.get_bucket('fdl-sdo-data')
filename = 'SDOMLnpz/2010/05/01/HMI20100501_0036_all.pklz'
blob = bucket.get_blob(filename)
results = pickle.loads(bz2.decompress(blob.download_as_string()))

'results' is an array of tuples, where each tuple contains the string of the channel name,
such as 'bx', with the numpy array of its image results at 512x512 resolution.
"""


import bz2
import concurrent
import itertools
from io import BytesIO
import multiprocessing as mp
import re
import os
import pickle
import time

from google.cloud import storage

import numpy as np
import pandas as pd


# Pointer to where 'gcloud auth login' put credentials.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.expanduser(
    '~/expanding-sdo-capabilities/config/space_weather_sdo.json')

# The number of processes we want to spawn in parallel to handle converting timestamp chunks.
NUM_CONSUMERS = mp.cpu_count() - 1

# The number of timestamp channel groups we want to download and convert in one shot.
NUM_TIMESTAMPS_AT_ONCE_PER_PROCESS = 10

# Bucket name of where we store our SDO data.
GCP_BUCKET_NAME = 'fdl-sdo-data'

# Path to NPZ inventory for quick look ups.
GCP_INVENTORY_IN_PATH = 'SDOMLnpz/inventory.pkl'

# Set to a number like 20 to test processing a subset of data during testing.
# TODO!!! Set back to None when we are done debugging.
MAX_ITER = 20
#MAX_ITER = None

# Lock to safely have threads and processes print to the screen.
global console_lock
console_lock = mp.Lock()


def safeprint(msg):
    """
    Make sure multiple processes don't step on each others feet when printing out to the
    console.
    """
    with console_lock:
        print(msg)

        
def enumerate_timestamp_channels(df, max_iter):
    """
    Group each of the timestamps into all of their channels, returning
    the results chunked together to handle as a group of channels per
    timestamp. max_iter is useful for debugging to limit the number of
    results we iterate and produce.
    """
    indexes = ['year', 'month', 'day', 'hour', 'min']
    counter = 0
    for _, timestamp_group in df.groupby(indexes):
        channels = [
            {'channel': row['channel'], 'file': row['file']}
            for _, row in timestamp_group.iterrows()
        ]
        yield channels
        
        counter += 1
        if max_iter is not None and counter >= max_iter:
            break

            
def chunked_iterable(iterable, chunk_size):
    """
    Iterate through something in 'chunks', where each chunk is returned as a group.
    """
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, chunk_size))
        if not chunk:
            break
        yield chunk

        
def connect_gcp(gcp_bucket_name):
    """
    Connect to the Google Cloud Provider storage bucket. Note
    that the environment variable GOOGLE_APPLICATION_CREDENTIALS must
    be set to the path to a GCP JSON config file before this is run,
    such as:
    export GOOGLE_APPLICATION_CREDENTIALS=~/expanding-sdo-capabilities/config/space_weather_sdo.json
    """
    client = storage.Client()
    bucket = client.get_bucket(gcp_bucket_name)
    return client, bucket


def get_combined_path(channel_path):
    """
    Given a path to a channel, generates an *_all.pklz version of that path pointing
    to the combined channels file.
    """
    return re.sub(r'_[^._]*?\.npz', '_all.pklz', channel_path)


# Maintain a single producer process and multiple consumer processes.
# Use a single blocking queue to coordinate the work from the producer to the consumer
# processes. The producer will keep putting chunks of timesteps to process, blocking
# when the queue gets too full, and the consumers will read from this queue.
#
# Each consumer process will also spawn threads inside of itself to deal with its
# chunked items in parallel. This means we get to use multiple cores on the system via
# Python processes for the consumer processes, as well as being efficient in terms of
# being network/IO-bound using Python threads.


def producer(df, work_queue, max_iter):
    # For each timestep group all of its channels together, then group these into chunks that
    # we can efficiently process all at once. Put these into the work queue for consumers
    # to process.
    safeprint('Producer setting up iterable chunks...')
    channels = chunked_iterable(enumerate_timestamp_channels(df, max_iter=max_iter),
                                chunk_size=NUM_TIMESTAMPS_AT_ONCE_PER_PROCESS)
    safeprint('Producer finished setting up iterable chunks')
    for chunk in channels:
        work_queue.put(chunk)
    
    # TODO: Get the updated final list of combined paths and update the inventory
    # path, then re-upload that.
    
    for idx in range(NUM_CONSUMERS):
        work_queue.put('DONE')

        
def consumer(process_idx, work_queue):
    """
    Note: this method runs in its own process.
    """
    safeprint('Consumer {} started'.format(process_idx))
    # Note: GCP connections are thread-safe but not multi-process safe.
    client, bucket = connect_gcp(GCP_BUCKET_NAME)
    while True:
        msg = work_queue.get()
        if msg == 'DONE':
            safeprint('Consumer {} DONE'.format(process_idx))
            break
        
        chunks = msg
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(chunks)) as executor:
            executor.map(lambda chunk: process_timestamp(process_idx, chunk, bucket), chunks)
        safeprint('Process {} finished with its threads'.format(process_idx))

        
def process_timestamp(process_idx, channels, bucket, max_retries=5, wait_time_s=5):
    """
    For each timestamp, download all of its channels in parallel then combine them into a single
    file we can re-upload back to GCP.

    Note: this runs in its own thread, managed by the consumer() method.
    """
    
    def download_channel(process_idx, channel_name, channel_path, bucket, max_retries=5, wait_time_s=5):
        """
        Download an individual timestamps channel.
        
        Note: this runs in its own thread, managed by the process_timestamp method().
        """
        for retry in range(max_retries):
            try:
                img_data = bucket.blob(channel_path).download_as_string()
                img = np.load(BytesIO(img_data))['x']
                return (channel_name, img)
            except Exception as e:
                safeprint('Consumer {}: ERROR in download_channel thread for {}: {}, waiting {} seconds for retry {}'
                          .format(process_idx, channel_path, e, wait_time_s, retry))
                time.sleep(wait_time_s)
        
        safeprint('ERROR!!!!!!!! Even after retries unable to fully run download_channel for {}'
                  .format(channel_path))
        
    for retry in range(max_retries):
        try:
            with concurrent.futures.ThreadPoolExecutor(max_workers=len(channels)) as executor:
                runme = lambda channel: download_channel(process_idx, channel['channel'], channel['file'], bucket)
                results = executor.map(runme, channels)

            # Ensure we always sort the list of channels consistently by the channel name.
            results = list(results)
            results = sorted(results, key=lambda entry: entry[0])
            
            # Pickle, compress, and upload these to GCP as a single combined channel file.
            compressed_out = bz2.compress(pickle.dumps(results))
            combined_path = get_combined_path(channels[0]['file'])
            blob = bucket.blob(combined_path)
            blob.upload_from_string(compressed_out)        

            safeprint('Consumer {}: Finished {}'.format(process_idx, combined_path))
            return
        except Exception as e:
            safeprint('Consumer {}: ERROR in process_timestamp thread for {}: {}, waiting {} seconds for retry {}'
                      .format(process_idx, channels, e, wait_time_s, retry))
            time.sleep(wait_time_s)

        safeprint('FINAL ERROR!!!!!!!! Even after retries unable to fully run process_timestamp for {}'
                  .format(channels))


def update_inventory(df, bucket, max_iter):
    """
    Make sure we record the *_all versions of the combined channels into the inventory for efficient lookups.
    """
    safeprint('Updating paths in inventory file...')
    for channels in enumerate_timestamp_channels(df, max_iter=max_iter):
        pass
        
    safeprint('Finished updating paths in inventory file')
    
def main():
    client, bucket = connect_gcp(GCP_BUCKET_NAME)

    data = bucket.blob(GCP_INVENTORY_IN_PATH).download_as_string()
    df = pd.read_pickle(BytesIO(data), compression='gzip')

    work_queue = mp.Queue(maxsize=NUM_CONSUMERS)
    consumers = [None] * NUM_CONSUMERS
    for idx in range(NUM_CONSUMERS):
        c = mp.Process(target=consumer, args=(idx, work_queue))
        c.daemon = True
        c.start()

    producer(df, work_queue, MAX_ITER)

    update_inventory(df, bucket, MAX_ITER)

    safeprint('Main process DONE')



#main()

In [12]:
client, bucket = connect_gcp(GCP_BUCKET_NAME)

data = bucket.blob(GCP_INVENTORY_IN_PATH).download_as_string()
df = pd.read_pickle(BytesIO(data), compression='gzip')

In [None]:
# TODO: Add a short-circuit where we don't re-add the 'all' channel if its already present
# in the inventory.

In [15]:
max_iter = 5
indexes = ['year', 'month', 'day', 'hour', 'min']
counter = 0
updated_df = df.copy()
for _, timestamp_group in df.groupby(indexes):
    entries = list(timestamp_group.iterrows())
    new_entry = entries[0][1].copy()
    print('0 entry: {}'.format(new_entry))
    new_entry['channel'] = 'all'
    new_entry['file'] = get_combined_path(entries[0][1]['file'])
    updated_df = updated_df.append(new_entry)
    for (_, c) in entries:
        updated_df = updated_df.append(c)
    
    counter += 1
    if max_iter is not None and counter >= max_iter:
        break

0 entry: year                                              2010
month                                                5
day                                                  1
hour                                                 0
min                                                 12
channel                                             by
file       SDOMLnpz/2010/05/01/HMI20100501_0012_by.npz
Name: 1076941452, dtype: object
0 entry: year                                              2010
month                                                5
day                                                  1
hour                                                 0
min                                                 24
channel                                             by
file       SDOMLnpz/2010/05/01/HMI20100501_0024_by.npz
Name: 1076941464, dtype: object
0 entry: year                                              2010
month                                                5
day                          

In [14]:
updated_df

Unnamed: 0_level_0,year,month,day,hour,min,channel,file
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1076941452,2010.0,5,1,0,12,by,SDOMLnpz/2010/05/01/HMI20100501_0012_by.npz
1076941452,2010.0,5,1,0,12,bx,SDOMLnpz/2010/05/01/HMI20100501_0012_bx.npz
1076941452,2010.0,5,1,0,12,bz,SDOMLnpz/2010/05/01/HMI20100501_0012_bz.npz
1076941464,2010.0,5,1,0,24,by,SDOMLnpz/2010/05/01/HMI20100501_0024_by.npz
1076941464,2010.0,5,1,0,24,bx,SDOMLnpz/2010/05/01/HMI20100501_0024_bx.npz
...,...,...,...,...,...,...,...
1076941488,2010.0,5,1,0,48,bx,SDOMLnpz/2010/05/01/HMI20100501_0048_bx.npz
1076941500,2010.0,5,1,1,0,all,SDOMLnpz/2010/05/01/HMI20100501_0100_all.pklz
1076941500,2010.0,5,1,1,0,by,SDOMLnpz/2010/05/01/HMI20100501_0100_by.npz
1076941500,2010.0,5,1,1,0,bz,SDOMLnpz/2010/05/01/HMI20100501_0100_bz.npz


In [None]:
# TODO: Remember to save the resulting inventory pkl with compression on.