## Initialize python code variables

In [17]:
from absl import app
from absl import flags
import apache_beam as beam
from proto.stu3 import google_extensions_pb2
from proto.stu3 import resources_pb2
from py.google.fhir.labels import encounter
from py.google.fhir.labels import label


## Initialize input and output gcs buckets

In [20]:
input_path = 'gs://hdlcluster3/synthea/bundles/bundles*'
output_path = 'gs://hdlcluster3/synthea/labels/label'

In [19]:
%%bash
#Ensure the output bucket is empty 
gsutil rm gs://hdlcluster3/synthea/labels/*
#List the contents of the output bucket
gsutil ls gs://hdlcluster3/synthea/labels

Removing gs://hdlcluster3/synthea/labels/label-00000-of-00001.tfrecords...
/ [1 objects]                                                                   
Operation completed over 1 objects.                                              
CommandException: One or more URLs matched no objects.


## helper function

In [21]:
@beam.typehints.with_input_types(resources_pb2.Bundle)
@beam.typehints.with_output_types(google_extensions_pb2.EventLabel)
class LengthOfStayRangeLabelAt24HoursFn(beam.DoFn):
  """Converts Bundle into length of stay range at 24 hours label.
    Cohort: inpatient encounter that is longer than 24 hours
    Trigger point: 24 hours after admission
    Label: multi-label for length of stay ranges, see label.py for detail
  """

  def __init__(self, for_synthea=False):
    self._for_synthea = for_synthea

  def process(self, bundle):
    """Iterate through bundle and yield label.
    Args:
      bundle: input stu3.Bundle proto
    Yields:
      stu3.EventLabel proto.
    """
    patient = encounter.GetPatient(bundle)
    if patient is not None:
      # Cohort: inpatient encounter > 24 hours.
      for enc in encounter.Inpatient24HrEncounters(bundle, self._for_synthea):
        for one_label in label.LengthOfStayRangeAt24Hours(patient, enc):
          yield one_label



## Initialize pipeline variables

In [22]:
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter

import apache_beam as beam
import re

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'dp-workspace'
google_cloud_options.job_name = 'job1'
google_cloud_options.staging_location = 'gs://hdlcluster3/staging'
google_cloud_options.temp_location = 'gs://hdlcluster3/temp'
options.view_as(StandardOptions).runner = 'DirectRunner'

## Initalize the beam job

In [23]:
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
p = beam.Pipeline(options=options)

bundles = p | 'read' >> beam.io.ReadFromTFRecord(
    input_path, coder=beam.coders.ProtoCoder(resources_pb2.Bundle))
    
labels = bundles | 'BundleToLabel' >> beam.ParDo(
    LengthOfStayRangeLabelAt24HoursFn(for_synthea=True))
_ = labels | beam.io.WriteToTFRecord(
    output_path,
    coder=beam.coders.ProtoCoder(google_extensions_pb2.EventLabel),
    file_name_suffix='.tfrecords')

I0125 14:07:26.789550 140097622570752 gcsio.py:446] Starting the size estimation of the input
I0125 14:07:26.792119 140097622570752 client.py:614] Attempting refresh to obtain initial access_token
I0125 14:07:26.897506 140097622570752 gcsio.py:460] Finished listing 9 files in 0.107965946198 seconds.


## Run the transform

In [24]:
   p.run().wait_until_finish()

I0125 14:07:31.576694 140097622570752 fn_api_runner.py:912] Running ((ref_AppliedPTransform_WriteToTFRecord/Write/WriteImpl/DoOnce/Read_9)+((ref_AppliedPTransform_WriteToTFRecord/Write/WriteImpl/InitializeWrite_10)+(ref_PCollection_PCollection_4/Write)))+(ref_PCollection_PCollection_3/Write)
I0125 14:07:31.585122 140097622570752 bundle_processor.py:291] start <DataOutputOperation ref_PCollection_PCollection_4/Write >
I0125 14:07:31.587593 140097622570752 bundle_processor.py:291] start <DataOutputOperation ref_PCollection_PCollection_3/Write >
I0125 14:07:31.590033 140097622570752 bundle_processor.py:291] start <DoOperation WriteToTFRecord/Write/WriteImpl/InitializeWrite output_tags=['out']>
I0125 14:07:31.594676 140097622570752 bundle_processor.py:291] start <ReadOperation WriteToTFRecord/Write/WriteImpl/DoOnce/Read source=SourceBundle(weight=1.0, source=<apache_beam.transforms.create_source._CreateSource object at 0x7f6ac6973d50>, start_position=None, stop_position=None)>
I0125 14:07:

I0125 14:07:52.807261 140097622570752 bundle_processor.py:303] finish <DataInputOperation WriteToTFRecord/Write/WriteImpl/GroupByKey/Read receivers=[ConsumerSet[WriteToTFRecord/Write/WriteImpl/GroupByKey/Read.out0, coder=WindowedValueCoder[TupleCoder[LengthPrefixCoder[FastPrimitivesCoder], IterableCoder[LengthPrefixCoder[FastPrimitivesCoder]]]], len(consumers)=1]]>
I0125 14:07:52.809660 140097622570752 bundle_processor.py:303] finish <DoOperation WriteToTFRecord/Write/WriteImpl/Extract output_tags=['out'], receivers=[ConsumerSet[WriteToTFRecord/Write/WriteImpl/Extract.out0, coder=WindowedValueCoder[LengthPrefixCoder[FastPrimitivesCoder]], len(consumers)=1]]>
I0125 14:07:52.812208 140097622570752 bundle_processor.py:303] finish <DataOutputOperation ref_PCollection_PCollection_11/Write >
I0125 14:07:52.820162 140097622570752 fn_api_runner.py:912] Running ((ref_PCollection_PCollection_3/Read)+(ref_AppliedPTransform_WriteToTFRecord/Write/WriteImpl/PreFinalize_19))+(ref_PCollection_PCollect

'DONE'

In [25]:
%%bash
#List the contents of the output bucket
gsutil ls gs://hdlcluster3/synthea/labels

gs://hdlcluster3/synthea/labels/label-00000-of-00001.tfrecords


YAY!!