# TFMA Visualization

In [None]:
# To enable TFMA visulization, please enable the Notebook Extention and restart kernel.
!jupyter nbextension enable --py widgetsnbextension
!jupyter nbextension enable --py tensorflow_model_analysis

In [None]:
import tensorflow_model_analysis as tfma
import tensorflow as tf
import os
import json

eval_result_folder = "/var/tmp/santander/keras-tft/Evaluator/evaluation/46"
# replace your evaluation result path which should contains a "eval_config.json"
eval_result = tfma.load_eval_result(eval_result_folder)
print('eval result loaded')

In [None]:
def get_slicing_spec(eval_result_folder):
    config_file=tf.io.gfile.GFile(os.path.join(eval_result_folder, 'eval_config.json'), 'r')
    config=json.loads(config_file.read())
    feature_keys=list(filter(lambda x: 'featureKeys' in x, config['evalConfig']['slicingSpecs']))
    columns=[] if len(feature_keys) == 0 else feature_keys[0]['featureKeys']
    return tfma.slicer.SingleSliceSpec(columns=columns)

# Or hardcode one
# tfma.view.render_slicing_metrics(eval_result, slicing_column='var_0')
slicing_spec = get_slicing_spec(eval_result_folder)
tfma.view.render_slicing_metrics(eval_result, slicing_spec=slicing_spec)

# TensorBoard Visualization on TFT


```shell
curl https://raw.githubusercontent.com/tensorflow/tensorflow/master/tensorflow/python/tools/import_pb_to_tensorboard.py > import_pb_to_tensorboard.py

python import_pb_to_tensorboard.py --model_dir /var/tmp/santander/pusher/1587532679/ --log_dir /var/tmp/santander/tmp/

tensorboard --logdir=/var/tmp/santander/tmp/
```



# Beam

In [14]:
import apache_beam as beam

data = []
with beam.Pipeline() as pipeline:
  intrim = pipeline | 'Data' >> beam.Create([
          ('p', 1),
          ('a', 2),
          ('z', 3),
          ('m', 2),])
  intrim = intrim | 'Sink' >> beam.Map(lambda item: data.append(item))

print(data)
data.sort(key = lambda item: item[0] )
print(data)

with open('/var/tmp/tmp.csv', 'w') as file:
  file.write('ID_code,target\n')
  for item in data:
    file.write('{0},{1}\n'.format(item[0], item[1])) 
    
!cat /var/tmp/tmp.csv

[('p', 1), ('a', 2), ('z', 3), ('m', 2)]
[('a', 2), ('m', 2), ('p', 1), ('z', 3)]
ID_code,target
a,2
m,2
p,1
z,3


In [24]:
import apache_beam as beam
from apache_beam.transforms.userstate import CombiningValueStateSpec
from apache_beam.transforms.core import GroupByKey

class IndexAssigningStatefulDoFn(beam.DoFn):
  # per-key-and-window, first in element is key
  index_state = beam.DoFn.StateParam(CombiningValueStateSpec('index', sum))
  
  def __init__(self):
    # unmanaged state only can work in single-processor multi-thread env
    self.unmanaged_state = 10
  
  def process(self, element, index=index_state):
    current_index = index.read()
    index.add(1)
    self.unmanaged_state = self.unmanaged_state + 1
    print(self.unmanaged_state)
    yield (element, current_index)

def sort_data(data):
  result = data.copy()
  result.sort(key=lambda item: item[0])
  return result

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(direct_num_workers=4)) as pipeline:
  intrim = pipeline | 'Data' >> beam.Create([
          ('p', 1),
          ('a', 2),
          ('p', 3),
          ('m', 2),])
  intrim = intrim | beam.Map(lambda it: (0, it)) # same key
  intrim = intrim | 'window' >> beam.WindowInto(beam.window.GlobalWindows()) # same window
  intrim = intrim | GroupByKey() # all to one
  # intrim = intrim | beam.ParDo(IndexAssigningStatefulDoFn())
  intrim = intrim | beam.Map(lambda item: item[1]) # remove dummy key
  intrim = intrim | beam.Map(sort_data) # sort all-in-one
  intrim = intrim | beam.Map(print)


[('a', 2), ('m', 2), ('p', 1), ('p', 3)]


In [3]:
def compare(a, b):
  ka = a[0]
  kb = b[0]
  return int(ka[5:]) <= int(kb[5:])

a = [('test_0', 1), ('test_10', 2), ('test_2', 3)]
a.sortAll(cmp=compare)
a

AttributeError: 'list' object has no attribute 'sortAll'