In [1]:
%load_ext lab_black

In [2]:
import pandas as pd
import numpy as np
import json
import uuid
from typing import Tuple, List, Optional
from dataclasses import dataclass

from joblib import Parallel, delayed

In [3]:
@dataclass
class Drop:
    drop_type: str
    latitude: float
    longitude: float
    leg_distance: int
    estimated_arrival: pd.Timestamp
    customer_id: Optional[str] = None
    delivery_window_start: Optional[pd.Timestamp] = None
    delivery_window_end: Optional[pd.Timestamp] = None

    def to_pandas(self) -> pd.Series:
        return pd.Series(
            data={
                "drop_type": self.drop_type,
                "latitude": self.latitude,
                "longitude": self.longitude,
                "leg_distance": self.leg_distance,
                "estimated_arrival": self.estimated_arrival,
                "customer_id": self.customer_id,
                "delivery_window_start": self.delivery_window_start,
                "delivery_window_end": self.delivery_window_end,
            }
        )

    @classmethod
    def from_json(cls, input_obj: dict):
        if input_obj["drop_type"] == "DEPOT_VISIT":
            return cls(
                drop_type=input_obj["drop_type"],
                latitude=input_obj["coordinates"]["latitude"],
                longitude=input_obj["coordinates"]["longitude"],
                leg_distance=input_obj["leg_distance"],
                estimated_arrival=input_obj["estimated_arrival"],
            )
        elif input_obj["drop_type"] == "CUSTOMER_DROP":
            return cls(
                drop_type=input_obj["drop_type"],
                latitude=input_obj["customer"]["coordinates"]["latitude"],
                longitude=input_obj["customer"]["coordinates"]["longitude"],
                leg_distance=input_obj["leg_distance"],
                estimated_arrival=input_obj["estimated_arrival"],
                customer_id=input_obj["customer"]["id"],
                delivery_window_start=input_obj["delivery_window"]["start"],
                delivery_window_end=input_obj["delivery_window"]["end"],
            )

        else:
            raise NotImplementedError(
                "No such decoder for drop_type: " + {input_obj["drop_type"]}
            )

In [4]:
@dataclass
class Route:
    route_id: str
    vehicle_id: str
    loaded_drops: List[Drop]
    unloaded_drops: List[Drop]

    @classmethod
    def from_json(cls, input_obj: dict):
        return cls(
            route_id=input_obj["id"],
            vehicle_id=input_obj["vehicle_id"],
            loaded_drops=[Drop.from_json(d) for d in input_obj["loaded_drops"]],
            unloaded_drops=[Drop.from_json(d) for d in input_obj["unloaded_drops"]],
        )

    def to_pandas(self):
        loaded_drops = pd.DataFrame([d.to_pandas() for d in self.loaded_drops])
        loaded_drops["type"] = "loaded_drop"

        unloaded_drops = pd.DataFrame([d.to_pandas() for d in self.unloaded_drops])
        unloaded_drops["type"] = "unloaded_drop"

        drops = pd.concat([loaded_drops, unloaded_drops], axis=0)
        drops["route_id"] = self.route_id
        drops["vehicle_id"] = self.vehicle_id

        return drops.reset_index(drop=True)

In [5]:
@dataclass
class Episode:
    uuid: str
    episode_index: int
    shift_start: pd.Timestamp
    shift_end: pd.Timestamp
    order_timestamp: pd.Timestamp
    routes: List[Route]
    route_update: List[Route]
    revenue: float

    def to_pandas(self) -> pd.Series:
        episode_data = pd.Series(
            {
                "uuid": self.uuid,
                "episode_index": self.episode_index,
                "shift_start": self.shift_start,
                "shift_end": self.shift_end,
                "order_timestamp": self.order_timestamp,
                "revenue": self.revenue,
            }
        )
        routes_data = self.__get_routes()

        return episode_data, routes_data

    def __get_routes(self):
        routes = pd.concat([r.to_pandas() for r in self.routes], axis=0)
        routes["update"] = False

        route_update = self.route_update.to_pandas()
        route_update["update"] = True

        routes_data = pd.concat([routes, route_update], axis=0)
        routes_data["episode_index"] = self.episode_index
        routes_data["episode_uuid"] = self.uuid

        return routes_data

In [6]:
class EpisodeDecoder(json.JSONDecoder):
    def decode(self, input_string: str):
        parsed_objects = super(EpisodeDecoder, self).decode(input_string)

        return self.__do_decode(parsed_objects).to_pandas()

    @staticmethod
    def __do_decode(input_object):
        return Episode(
            uuid=str(uuid.uuid4()),
            episode_index=input_object["episode_index"],
            shift_start=input_object["shift_start"],
            shift_end=input_object["shift_end"],
            order_timestamp=input_object["order_timestamp"],
            routes=[Route.from_json(r) for r in input_object["routes"]],
            route_update=Route.from_json(input_object["route_update"]),
            revenue=input_object["revenue"],
        )

In [7]:
with open("real_experiences_m_ch.jsonl", "r") as f:
    lines = f.readlines()

In [8]:
len(lines)

6000

In [9]:
%%time

episodes, routes = zip(*Parallel(n_jobs=4)(
    delayed(json.loads)(l, cls=EpisodeDecoder) for l in lines
))

Wall time: 1min 38s


In [10]:
episodes = pd.DataFrame(episodes)
routes = pd.concat(routes, axis=0).reset_index(drop=True)

In [11]:
episodes.shape

(6000, 6)

In [12]:
routes.shape

(269387, 14)

In [13]:
routes["episode_uuid"].nunique()

6000