<a href="https://colab.research.google.com/github/v-vipriy/data-eng-tech-assessment/blob/Vpriya_assignment/script/Data_Eng_Tech_Assessment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
try:
  from apache_beam.options.pipeline_options import PipelineOptions
  import   apache_beam as beam
  import urllib.request
  import logging
  import json
except:
  ! pip   install apache-beam[interactive]
  from apache_beam.options.pipeline_options import PipelineOptions
  import   apache_beam as beam
  import urllib.request
  import logging
  import json

In [None]:
class load_json_as_dictionary(beam.DoFn):
  def process(self, element):
    result = [{k: int(v) if k == 'locationid' else v for k, v in d.items()} for d in map(lambda x: dict(x), json.loads(element))]
    return result

class select_elements_location_data(beam.DoFn):
  def process(self, element):
    # print(type(element))
    result = [{k: v for k, v in element.items() if k in ('location_id', 'sensor_description','sensor_name')}]
    return result

class drop_redundant_fields(beam.DoFn):
  def process(self, element):
    result = [{k: v for k, v in element.items() if k not in ('location_id')}]
    return result

class LeftJoin(beam.PTransform):
    """This PTransform performs a left join given source_pipeline_name, source_data,
     join_pipeline_name, join_data, common_key constructors"""

    def __init__(self, source_pipeline_name, source_data, join_pipeline_name, join_data, left_key,right_key):
        self.join_pipeline_name = join_pipeline_name
        self.source_data = source_data
        self.source_pipeline_name = source_pipeline_name
        self.join_data = join_data
        self.left_key = left_key
        self.right_key = right_key
        # print("Inside LeftJoin -> __init))")

    def expand(self, pcolls):
        # print("Inside LeftJoin -> expand : 1")
        def _format_as_common_key_tuple(data_dict, left_key,right_key):
            # print("Inside LeftJoin -> expand : map(data_dict[common_key],int)")
            # print(type(data_dict), type(common_key))
            # print(data_dict)
            try:
              # print("data_dict[left_key] ", data_dict[left_key])
              return data_dict[left_key], data_dict
            except:
              # print("data_dict[right_key] ", data_dict[right_key])
              return data_dict[right_key], data_dict

        # print("Inside LeftJoin -> expand : 2")
        return ({pipeline_name: pcoll | 'Convert to ({0}/{1}, object) for {2}'
                .format(self.left_key,self.right_key, pipeline_name)
                                >> beam.Map(_format_as_common_key_tuple, self.left_key, self.right_key)
                 for (pipeline_name, pcoll) in pcolls.items()}
                | 'CoGroupByKey {0}'.format(pcolls.keys()) >> beam.CoGroupByKey()
                | 'Unnest Cogrouped' >> beam.ParDo(UnnestCoGrouped(),
                                                   self.source_pipeline_name,
                                                   self.join_pipeline_name)
                )

class UnnestCoGrouped(beam.DoFn):
    """This DoFn class unnests the CogroupBykey output and emits """

    def process(self, input_element, source_pipeline_name, join_pipeline_name):
        group_key, grouped_dict = input_element
        join_dictionary = grouped_dict[join_pipeline_name]
        source_dictionaries = grouped_dict[source_pipeline_name]
        for source_dictionary in source_dictionaries:
            try:
                source_dictionary.update(join_dictionary[0])
                yield source_dictionary
            except IndexError:  # found no join_dictionary
                yield source_dictionary

class LogContents(beam.DoFn):
    """This DoFn class logs the content of that which it receives """

    def process(self, input_element):
        logging.info("Contents: {}".format(input_element))
        logging.info("Contents type: {}".format(type(input_element)))
        # logging.info("Contents Access input_element['Country']: {}".format(input_element['Country']))
        return

def run(argv=None):
    """Main entry point"""
    pedestrian_data_url = 'https://data.melbourne.vic.gov.au/api/v2/catalog/datasets/pedestrian-counting-system-monthly-counts-per-hour/exports/json'
    location_data_url = 'https://data.melbourne.vic.gov.au/api/v2/catalog/datasets/pedestrian-counting-system-sensor-locations/exports/json'

    # Download the file from `url`, save it in a temporary directory and get the path to it in the `file_name` variable:
    pedestrian_data_file, pedestrian_data_headers = urllib.request.urlretrieve(pedestrian_data_url)
    location_data_file, location_data_headers = urllib.request.urlretrieve(location_data_url)
    left_key = 'locationid'
    right_key = 'location_id'

    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)

    # Create Example read Dictionary data
    pedestrian_pipeline_name = 'pedestrian_data'
    pedestrian_data = (p | 'Read pedestrian data' >> beam.io.ReadFromText(pedestrian_data_file)
                          | 'JSON_to_Dict pedestrian data' >> beam.ParDo(load_json_as_dictionary())
                      )
    location_pipeline_name = 'location_data'
    location_data = (p | 'Read location data' >> beam.io.ReadFromText(location_data_file)
                        | 'JSON_to_Dict location data' >> beam.ParDo(load_json_as_dictionary())
                        | 'Select fields location data' >> beam.ParDo(select_elements_location_data())
                    )

    pipelines_dictionary = {pedestrian_pipeline_name: pedestrian_data,
                            location_pipeline_name: location_data}
    # print("Inside run : pipelines_dictionary")
    # print(pipelines_dictionary)
    test_pipeline = (pipelines_dictionary
                     | 'Left join' >> LeftJoin(
                pedestrian_pipeline_name, pedestrian_data,
                location_pipeline_name, location_data, left_key,right_key)
                    #  | 'Select fields final data' >> beam.ParDo(drop_redundant_fields())
                    #  |beam.io.WriteToText('/content/drive/MyDrive/Colab Notebooks/location_output_data.txt')
                     )

    output  = ( test_pipeline
                | 'format' >> beam.ParDo(drop_redundant_fields())
                |beam.io.WriteToText('/location_output_data.txt')
    )

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
    run()
