In [1]:
import argparse
import os

import apache_beam as beam
import tensorflow as tf
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.runners.interactive.interactive_beam as ib
import apache_beam.transforms.sql

import sys
sys.path.append('api')

import beam__common
import fidscs_globals

import data_extractor
import preprocessor

# from tensorflow.keras.preprocessing.image import img_to_array, load_img
import tensorflow.keras.preprocessing.image

import subprocess

%load_ext autoreload
%autoreload 2

In [2]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] # set by bash profile
print(f"GOOGLE_APPLICATION_CREDENTIALS: {os.environ['GOOGLE_APPLICATION_CREDENTIALS']}")
os.environ['BEAM_GCP_PROJECT'] = 'sc-fids-capstone'
print(f"BEAM_GCP_PROJECT: {os.environ['BEAM_GCP_PROJECT']}")
# os.environ['FIDS_CAPSTONE_WRK_DIR'] = f"gs://{os.environ['BEAM_GCP_PROJECT']}-bucket-{os.environ['BEAM_GCP_PROJECT']}"
os.environ['FIDS_CAPSTONE_WRK_DIR'] = f"/tmp"
print(f"FIDS_CAPSTONE_WRK_DIR: {os.environ['FIDS_CAPSTONE_WRK_DIR']}")
os.environ['FIDS_CAPSTONE_MAX_TARGET_VIDEOS'] = '500'
print(f"FIDS_CAPSTONE_MAX_TARGET_VIDEOS: {os.environ['FIDS_CAPSTONE_MAX_TARGET_VIDEOS']}")
os.environ['FIDS_CAPSTONE_GCP_REGION'] = 'us-west2'
print(f"FIDS_CAPSTONE_GCP_REGION: {os.environ['FIDS_CAPSTONE_GCP_REGION']}")

GOOGLE_APPLICATION_CREDENTIALS: /Users/stevencontreras/.secret/SC-FIDS-Capstone-4b4567e0e308.json
BEAM_GCP_PROJECT: sc-fids-capstone
FIDS_CAPSTONE_WRK_DIR: /tmp
FIDS_CAPSTONE_MAX_TARGET_VIDEOS: 500
FIDS_CAPSTONE_GCP_REGION: us-west2
FIDS_CAPSTONE_GCP_REGION: us-west2


In [3]:
# ensure we have already extracted data to preprocess
    # note that this is normally done via the commandline (outside of Jupyter)
    # but running this confirms data exists in the working directory and will download/extract it if not
    # also note that this will take a fair bit of time if it needs to do so

!python ./run_local__etl.py \
  --work-dir $FIDS_CAPSTONE_WRK_DIR \
  --max-target-videos $FIDS_CAPSTONE_MAX_TARGET_VIDEOS \
  --use-beam 1 \
  --beam-gcp-project $BEAM_GCP_PROJECT \
  --beam-gcp-region $FIDS_CAPSTONE_GCP_REGION \
  --beam-gcp-dataflow-job-name $BEAM_GCP_PROJECT-etl \
  --beam-gcp-dataflow-setup-file ./setup.py

execvP: bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;library/bin;bin;condabin;libr

In [5]:
preprocessor.run(
    work_dir=os.environ['FIDS_CAPSTONE_WRK_DIR'],
    use_beam=True,
    beam_runner='DirectRunner',
    beam_gcp_project=os.environ['BEAM_GCP_PROJECT'],
    beam_gcp_region=os.environ['FIDS_CAPSTONE_GCP_REGION'],
    beam_gcp_dataflow_job_name=f"{os.environ['BEAM_GCP_PROJECT']}-etl",
    beam_gcp_dataflow_setup_file='./setup.py'
)

TypeError: run() got an unexpected keyword argument 'use_beam'

In [None]:
options = None

if beam_runner != 'DirectRunner':
    if beam_gcp_dataflow_setup_file != './setup.py':
        print(f"*** FATAL ERROR!!! ***  beam_gcp_setup_file=={beam_gcp_dataflow_setup_file} but it should be ./setup.py")
        return

    logging.getLogger().setLevel(logging.INFO) # enable logging only for DataflowRunner

    options = {
        'runner': beam_runner,
        'streaming': False, # set to True if data source is unbounded (e.g. GCP PubSub),
        'max_num_workers': 8,
        'autoscaling_algorithm': 'THROUGHPUT_BASED',
        'num_workers': 4,
        'disk_size_gb': 250,
        'save_main_session': True,
        'enable_streaming_engine': False,

        # GCP options
        'project': beam_gcp_project,
        'region': beam_gcp_region,
        'worker_region': beam_gcp_region,
        'service_account_email': 'fids-capstone-beam-pl-gcs@sc-fids-capstone.iam.gserviceaccount.com',    # REDACTED
        'staging_location': beam_gcs_staging_bucket,
        'temp_location': beam_gcs_temp_location,
        'setup_file': beam_gcp_dataflow_setup_file,
        'job_name': beam_gcp_dataflow_job_name,
    }
else:
    options = {
        'runner': 'DirectRunner',
        'environment_type': 'DOCKER',
        'direct_num_workers': 0, # 0 is use all available cores
        'direct_running_mode': 'multi_processing', # ['in_memory', 'multi_threading', 'multi_processing']
        'streaming': False # set to True if data source is unbounded (e.g. GCP PubSub),
    }

options.update(beam__common.make_fids_options_dict(work_dir, beam_gcp_project=beam_gcp_project))

In [None]:
pl = beam.Pipeline(options=pipeline_options)

def train_val_index_schemad_pcoll_rows__to__ordered_tuples(train_val_index_schemad_pcoll_row):
    """
    """
    return (
        # SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX = [
        #     'TokenID',
        #     'CameraPerspective',
        #     'ASLConsultantID',
        #     'TargetVideoFilename',
        #     'UtteranceSequence',
        #     'TokenSequence',
        #     'FrameSequence'
        # ]
        train_val_index_schemad_pcoll_row.TokenID,
        train_val_index_schemad_pcoll_row.CameraPerspective,
        train_val_index_schemad_pcoll_row.ASLConsultantID,
        train_val_index_schemad_pcoll_row.TargetVideoFilename,
        train_val_index_schemad_pcoll_row.UtteranceSequence,
        train_val_index_schemad_pcoll_row.TokenSequence,
        train_val_index_schemad_pcoll_row.FrameSequence
    )

train_frame_sequences__assoc_index_schemad_pcoll = beam__common.pl__1__read_train_frame_sequences__assoc_index_csv(pl)
train_frame_sequences__assoc_index = (
    train_frame_sequences__assoc_index_schemad_pcoll
    | "Beam PL: transform train_frame_sequences__assoc_index_schemad_pcoll rows to ordered tuples (according to schema)" >> beam.Map(train_val_index_schemad_pcoll_rows__to__ordered_tuples)
)

val_frame_sequences_index_schemad_pcoll = beam__common.pl__1__read_val_frame_sequences__index_csv(pl)
val_frame_sequences_index = (
    val_frame_sequences_index_schemad_pcoll
    | "Beam PL: transform val_frame_sequences_index_schemad_pcoll rows to ordered tuples (according to schema)" >> beam.Map(train_val_index_schemad_pcoll_rows__to__ordered_tuples)
)

train_frame_sequences_index_schemad_pcoll = beam__common.pl__1__read_train_frame_sequences_index_csv(pl)
train_frame_sequences_index = (
    train_frame_sequences_index_schemad_pcoll
    | "Beam PL: transform train_frame_sequences_index_schemad_pcoll rows to ordered tuples (according to schema)" >> beam.Map(train_val_index_schemad_pcoll_rows__to__ordered_tuples)
)

In [None]:
def train_val_index__load_frame_img(train_val_index_tpl):
    """
    SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX = [
        'TokenID',
        'CameraPerspective',
        'ASLConsultantID',
        'TargetVideoFilename',
        'UtteranceSequence',
        'TokenSequence',
        'FrameSequence'
    ]
    """
    target_video_frames_dir = os.path.join(
        fidscs_globals.STICHED_VIDEO_FRAMES_DIR, 
        train_val_index_tpl[3].split('.')[0]
    )
    target_video_frame_fname = f"{train_val_index_tpl[6]}.jpg"
    target_video_frame_path = os.path.join(target_video_frames_dir, target_video_frame_fname)
    target_video_frame_img = tf.keras.preprocessing.image.load_img(
        target_video_frame_path, 
        target_size=fidscs_globals.FRAME_IMG_INPUT_SHAPE
    )  # this is a PIL image
    target_video_frame_img_array = tf.keras.preprocessing.image.img_to_array(target_video_frame_img)
    target_video_frame_img_array = target_video_frame_img_array.reshape((1,) + target_video_frame_img_array.shape)
    target_video_frame_img_array /= 255.0 # Rescale by 1/255

    return (
        train_val_index_tpl[0],
        train_val_index_tpl[1],
        train_val_index_tpl[2],
        train_val_index_tpl[3],
        train_val_index_tpl[4],
        train_val_index_tpl[5],
        train_val_index_tpl[6],
        target_video_frame_img_array
    )

In [None]:
def pl__X__sample_train_val_frame_sequences_index__with_frame_tensors(train_val_frame_sequences_index, name_train_val_frame_sequences_index, n_sample_size=50, rand=False):
    """
    """

    sample = (
        train_val_frame_sequences_index
        | f"Beam PL: get random sample of {n_sample_size} frames from {name_train_val_frame_sequences_index}" >> beam.combiners.Sample.FixedSizeGlobally(n_sample_size)
    ) if rand else (
        train_val_frame_sequences_index
        | f"Beam PL: get top {n_sample_size} frames from {name_train_val_frame_sequences_index}" >> beam.combiners.Top.Of(n_sample_size)
    )
    return (
        sample
        | f"Beam PL: flatten sample of {n_sample_size} frames from {name_train_val_frame_sequences_index}" >> beam.FlatMap(lambda lst_tpl: lst_tpl)
        | f"Beam PL: load frame images of {name_train_val_frame_sequences_index}" >> beam.Map(train_val_index__load_frame_img)
    )

train_frame_sequences__assoc_index__sample__with_frame_tensors = pl__X__sample_train_val_frame_sequences_index__with_frame_tensors(
    train_frame_sequences__assoc_index, 
    "train_frame_sequences__assoc_index"
)
val_frame_sequences_index__sample__with_frame_tensors = pl__X__sample_train_val_frame_sequences_index__with_frame_tensors(
    val_frame_sequences_index, 
    "val_frame_sequences_index"
)
train_frame_sequences_index__sample__with_frame_tensors = pl__X__sample_train_val_frame_sequences_index__with_frame_tensors(
    train_frame_sequences_index, 
    "train_frame_sequences_index"
)

In [None]:
# we require this in order to make use of ib.show() (which provides visualization of the pcolls specified) or ib.collect() (which creates a pandas dataframe from a pcoll)
    # but all pcolls we wish to visualize must be created prior to executing the following line
ib.watch(locals())

<p><br>

##### Show base train/val frame datasets

In [None]:
df_train_frame_sequences__assoc_index = ib.collect(train_frame_sequences__assoc_index)

In [None]:
df_train_frame_sequences__assoc_index.columns = fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX
df_train_frame_sequences__assoc_index.set_index(fidscs_globals.SCHEMA_PK__TRAIN_OR_VAL_INDEX, inplace=True)
df_train_frame_sequences__assoc_index.sort_values(axis=0, by=[fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX[6]], ignore_index=False, inplace=True)
df_train_frame_sequences__assoc_index.sort_index(inplace=True)

df_train_frame_sequences__assoc_index

In [None]:
df_val_frame_sequences_index = ib.collect(val_frame_sequences_index)

In [None]:
df_val_frame_sequences_index.columns = fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX
df_val_frame_sequences_index.set_index(fidscs_globals.SCHEMA_PK__TRAIN_OR_VAL_INDEX, inplace=True)
df_val_frame_sequences_index.sort_values(axis=0, by=[fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX[6]], ignore_index=False, inplace=True)
df_val_frame_sequences_index.sort_index(inplace=True)

df_val_frame_sequences_index

In [None]:
df_train_frame_sequences_index = ib.collect(train_frame_sequences_index)

In [None]:
df_train_frame_sequences_index.columns = fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX
df_train_frame_sequences_index.set_index(fidscs_globals.SCHEMA_PK__TRAIN_OR_VAL_INDEX, inplace=True)
df_train_frame_sequences_index.sort_values(axis=0, by=[fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX[6]], ignore_index=False, inplace=True)
df_train_frame_sequences_index.sort_index(inplace=True)

df_train_frame_sequences_index

<p><br>

##### Show sample of train/val frame datasets with loaded tensors (to demonstrate functional frame-to-tensor extraction)

In [None]:
df_train_frame_sequences__assoc_index__sample__with_frame_tensors = ib.collect(train_frame_sequences__assoc_index__sample__with_frame_tensors)

In [None]:
df_train_frame_sequences__assoc_index__sample__with_frame_tensors.columns = fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX + ['FrameTensor']
df_train_frame_sequences__assoc_index__sample__with_frame_tensors.set_index(['ASLConsultantID', 'TargetVideoFilename', 'CameraPerspective', 'UtteranceSequence', 'TokenSequence'], inplace=True)
df_train_frame_sequences__assoc_index__sample__with_frame_tensors.sort_values(axis=0, by=[fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX[6]], ignore_index=False, inplace=True)
df_train_frame_sequences__assoc_index__sample__with_frame_tensors.sort_index(inplace=True)

df_train_frame_sequences__assoc_index__sample__with_frame_tensors

In [None]:
df_val_frame_sequences_index__sample__with_frame_tensors = ib.collect(val_frame_sequences_index__sample__with_frame_tensors)

In [None]:
df_val_frame_sequences_index__sample__with_frame_tensors.columns = fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX + ['FrameTensor']
df_val_frame_sequences_index__sample__with_frame_tensors.set_index(['ASLConsultantID', 'TargetVideoFilename', 'CameraPerspective', 'UtteranceSequence', 'TokenSequence'], inplace=True)
df_val_frame_sequences_index__sample__with_frame_tensors.sort_values(axis=0, by=[fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX[6]], ignore_index=False, inplace=True)
df_val_frame_sequences_index__sample__with_frame_tensors.sort_index(inplace=True)

df_val_frame_sequences_index__sample__with_frame_tensors

In [None]:
df_train_frame_sequences_index__sample__with_frame_tensors = ib.collect(train_frame_sequences_index__sample__with_frame_tensors)

In [None]:
df_train_frame_sequences_index__sample__with_frame_tensors.columns = fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX + ['FrameTensor']
df_train_frame_sequences_index__sample__with_frame_tensors.set_index(['ASLConsultantID', 'TargetVideoFilename', 'CameraPerspective', 'UtteranceSequence', 'TokenSequence'], inplace=True)
df_train_frame_sequences_index__sample__with_frame_tensors.sort_values(axis=0, by=[fidscs_globals.SCHEMA_COL_NAMES__TRAIN_OR_VAL_INDEX[6]], ignore_index=False, inplace=True)
df_train_frame_sequences_index__sample__with_frame_tensors.sort_index(inplace=True)

df_train_frame_sequences_index__sample__with_frame_tensors