In [64]:
import json
# Expect: list of json objects.
import re
import typing

class MapperLocation:
    def __init__(self, location, column_name, coalesce_value=None, fail_if_missing=False, attempt_json_serializing=True, delimiter='::',  convert_func=None):
        self.location = location
        self.column_name = column_name
        self.coalesce_value=coalesce_value
        self.fail_if_missing=fail_if_missing
        self.attempt_json_serializing=True,
        self.delimiter=delimiter
        self.convert_func=convert_func
        
class RequiredElementMissingError(AttributeError):
    pass

class DictMapper:
    def __init__(self, in_dict: list, mapper: dict) -> None:
        self.in_dict = in_dict
        self.mapper = mapper
        self.table_names = [key for key in mapper]

        self.re_check_list_base_location = r"^.+(?=\[\])"
        self.re_check_list_specific_element = r"[a-zA-Z]+\[[0-9]+\]"
        self.re_check_list_unlimited_element = r"[a-zA-Z]+(?=\[\])"
        self.re_list_location_name = r"[a-zA-Z]+(?=\[)"
        self.re_list_location_index = r"(?<=\[)[0-9]+(?=\])"


    def get_records(self, table_name: str=None, extra_data=None) -> typing.Generator:
        if extra_data is None:
            extra_data = {}

        if table_name in self.table_names:
            # yield self._dump_to_records(in_rows=self.in_dict, mapper_set=self.mapper.get(table_name), extra_data=extra_data)
            return self._dump_to_records(in_rows=self.in_dict, mapper_set=self.mapper.get(table_name), extra_data=extra_data)

        else:
            raise ValueError(f'{table_name} is not defined in mapper. We got {self.table_names}')

    def _dump_single_row(self, in_row: list, mapper_set: list, extra_data: dict=None, ignore_location_str: str=None):
        extra_data = {} if extra_data is None else extra_data
        
        output_row = {}
        first_column = next(iter(mapper_set)).column_name

        # for column, location in column_set.items():
        for mapperlocation in mapper_set:
            column = mapperlocation.column_name
            location = mapperlocation.location
            delimiter = mapperlocation.delimiter

            # Hack to remove unneeded location string as we now have no reference to it.
            if ignore_location_str is not None:
                # ignore_location_str = ignore_location_str #+ f'[]{mapperlocation.delimiter}'
                location = location.replace(ignore_location_str, '')

            # If we only need to pull a single element.
            try:
                location_elements = location.split(delimiter)

                if location_elements[0] == 'root' and len(location_elements) == 1:
                    output_row.update({column: in_row})

                current_object = in_row
                
                for single_location in location_elements:
                    # Ignore the root element
                    if single_location == 'root':
                        continue

                    # If we specify we want just the first element in a list eg: payments[0].get('key')
                    if re.match(self.re_check_list_specific_element, single_location):
                        location_name = re.search(self.re_list_location_name, single_location).group()
                        location_index = int(re.search(self.re_list_location_index, single_location).group())
                        try:
                            current_object = current_object.get(location_name)[location_index]
                        except IndexError:
                            print(f'Warning: indexerror {location_name}[{location_index}] does not exist in {location}. First column is {first_column}, data is {output_row.get(first_column)}')
                            if mapperlocation.fail_if_missing:
                                raise RequiredElementMissingError(f'Required element {location_name}[{location_index}] does not exist in {location}. First column is {first_column}, data is {output_row.get(first_column)}')
                            output_row.update({column: mapperlocation.coalesce_value})
                            break
                        
                    else:
                        if not hasattr(current_object, 'get') and type(current_object) == str and mapperlocation.attempt_json_serializing:
                            current_object = json.loads(current_object)
                            
                        current_object = current_object.get(single_location)
                        if current_object is None:
                            print(f'Warning: elementnotfound {single_location} does not exist in {location}. First column is {first_column}, data is {output_row.get(first_column)}')
                            if mapperlocation.fail_if_missing:
                                raise RequiredElementMissingError(f'Required element {single_location} does not exist in {location}. First column is {first_column}, data is {output_row.get(first_column)}')
                            # derp
                            output_row.update({column: mapperlocation.coalesce_value})
                            break
                    
                    output_row.update({column: current_object})

            except AttributeError:
                print(f'Warning: attributeerror {single_location} does not exist in {location}. First column is {first_column}, data is {output_row.get(first_column)}')
                if mapperlocation.fail_if_missing:
                    raise RequiredElementMissingError(f'Required element {single_location} does not exist in {location}. First column is {first_column}, data is {output_row.get(first_column)}')
                output_row.update({column: mapperlocation.coalesce_value})
                continue

        output_row.update(extra_data)
        
        return output_row

    def _dump_to_records(self, in_rows, mapper_set, extra_data=None):
        if extra_data == None:
            extra_data = {}

        out_rows = []
        
        nested_list_mapper_set = []
        nested_list_base_location = set()

        single_mapper_set = []

        for mapper in mapper_set:
            if re.search(self.re_check_list_unlimited_element, mapper.location):
                nested_list_mapper_set.append(mapper)
                nested_list_base_location.add(re.match(self.re_check_list_base_location, mapper.location).group())
                nested_list_base_location_delimiter = mapper.delimiter
            else:
                single_mapper_set.append(mapper)

        if len(nested_list_base_location) > 1:
            raise AttributeError(f"This library does not currently supported multiple listed mappers in a single instance. Sorry. We found {nested_list_base_location}")

        nested_list_base_location = next(iter(nested_list_base_location))
        nested_list_base_location_elements = nested_list_base_location.split(nested_list_base_location_delimiter)
        nested_list_base_location = nested_list_base_location + f'[]{nested_list_base_location_delimiter}'

        for row in in_rows:
            output_row = self._dump_single_row(in_row=row, mapper_set=single_mapper_set, extra_data=extra_data)

            if len(nested_list_base_location) > 0:
                output_nested_rows = []
                nested_rows = row.copy()
                for element in nested_list_base_location_elements:
                    if element == 'root':
                        continue
                    else:
                        nested_rows = nested_rows.get(element, {})

                for nested_row in nested_rows:
                    output_nested_rows.append(self._dump_single_row(in_row=nested_row, mapper_set=nested_list_mapper_set, extra_data=output_row, ignore_location_str=nested_list_base_location))

                out_rows.extend(output_nested_rows)

            else:
                out_rows.append(output_row)


        return out_rows

In [65]:
mapper = {
    'realtime_trips': [
        MapperLocation(location='root::id', column_name='message_id'),
        MapperLocation(location='root::tripUpdate::trip::tripId', column_name='trip_id'),
        MapperLocation(location='root::tripUpdate::trip::startTime', column_name='start_time'),
        MapperLocation(location='root::tripUpdate::trip::startDate', column_name='start_date'),
        MapperLocation(location='root::tripUpdate::trip::routeId', column_name='route_id'),
        MapperLocation(location='root::tripUpdate::trip::[transit_realtime.nyct_trip_descriptor]::trainId', column_name='train_id'),
        MapperLocation(location='root::tripUpdate::trip::[transit_realtime.nyct_trip_descriptor]::isAssigned', column_name='is_assigned'),
        MapperLocation(location='root::tripUpdate::trip::[transit_realtime.nyct_trip_descriptor]::direction', column_name='direction'),
        MapperLocation(location='root::tripUpdate::trip::[transit_realtime.nyct_trip_descriptor]::direction', column_name='direction'),
        MapperLocation(location='root::tripUpdate::stopTimeUpdate[]::arrival::time', column_name='scheduled_train_arrival_time'),
        MapperLocation(location='root::tripUpdate::stopTimeUpdate[]::departure::time', column_name='scheduled_train_departure_time'),
        MapperLocation(location='root::tripUpdate::stopTimeUpdate[]::stopId', column_name='scheduled_train_stop_id'),
        MapperLocation(location='root::tripUpdate::stopTimeUpdate[]::[transit_realtime.nyct_stop_time_update]::scheduledTrack', column_name='scheduled_train_scheduled_track'),
        MapperLocation(location='root::tripUpdate::stopTimeUpdate[]::[transit_realtime.nyct_stop_time_update]::actualTrack', column_name='scheduled_train_actual_track'),
        
    ]
}

with open('in.json', 'r') as f:
    in_json = json.loads(f.read())

realtime_trips_root = in_json.get('messages')
print(f'Got {len(realtime_trips_root)} realtime trips')

rt = DictMapper(in_dict=realtime_trips_root, mapper=mapper)

realtime_trips_data = rt.get_records(table_name='realtime_trips')


Got 74 realtime trips


In [66]:
import pandas as pd

df = pd.DataFrame.from_records(realtime_trips_data)

display(df)

Unnamed: 0,scheduled_train_arrival_time,scheduled_train_departure_time,scheduled_train_stop_id,scheduled_train_scheduled_track,scheduled_train_actual_track,message_id,trip_id,start_time,start_date,route_id,train_id,is_assigned,direction
0,1701938910,1701938910,N09S,E1,E1,000001N,013738_N..S,02:17:23,20231207,N,1N 0210+ DIT/STL,True,SOUTH
1,1701939165,1701939165,N10S,E1,E1,000001N,013738_N..S,02:17:23,20231207,N,1N 0210+ DIT/STL,True,SOUTH
2,1701939645,1701939645,D43S,0B,0B,000001N,013738_N..S,02:17:23,20231207,N,1N 0210+ DIT/STL,True,SOUTH
3,1701938900,1701938900,R05N,G2,G2,000003N,013300_N..N,02:13:00,20231207,N,1N 0213 STL/DIT,True,
4,1701938910,1701938910,R04N,G2,G2,000003N,013300_N..N,02:13:00,20231207,N,1N 0213 STL/DIT,True,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
865,1701943800,1701943800,R30N,B2,B2,000011R,028600_R..N,04:46:00,20231207,R,1R 0446 95S/WHL,,
866,1701943890,1701943890,R29N,B2,B2,000011R,028600_R..N,04:46:00,20231207,R,1R 0446 95S/WHL,,
867,1701943980,1701943980,R28N,B2,B2,000011R,028600_R..N,04:46:00,20231207,R,1R 0446 95S/WHL,,
868,1701944130,1701944130,R65N,B2,B2,000011R,028600_R..N,04:46:00,20231207,R,1R 0446 95S/WHL,,


In [67]:
df.to_csv('blah.csv', index=False)