In [0]:
%run ../../../../../../includes/main/python/global_operations

In [0]:
%run ../../schemas

In [0]:
%run ./system_to_vehicle_mapping

In [0]:
PARTITION_COLUMN = 'p_Date'


In [0]:
# sample paylaod as per 12/10/2022

# {
#   "events": "[]", 
#   "type": "GPSData", 
#   "sentAt": "2022/07/31 11:13 ", 
#   "supplier": "Icomera", 
#   "system_ID": "1450350259"
#   "dataPoint": [
#     {
#       "system_name": "350259", 
#       "mode": 3, 
#       "position": {
#         "altitude": 60.9, 
#         "longitude": -2.432628, 
#         "latitude": 53.088512, 
#         "type": "GeoPosition"
#       }, 
#       "speed": 0.005, 
#       "type": "UnitPoint", 
#       "deviceCount": 23, 
#       "system": "1450350259", 
#       "age": "1", 
#       "numberOfSatellites": 10, 
#       "time": "1659262386", 
#       "cmg": 345.56, 
#       "system_ID": "1450350259"
#     }, 
#     {
#       "system_name": "350259", 
#       "mode": 0, 
#       "position": {
#         "altitude": 60.9, 
#         "longitude": -2.432628, 
#         "latitude": 53.088512, 
#         "type": "GeoPosition"
#       }, 
#       "speed": 0.005, 
#       "type": "UnitPoint", 
#       "deviceCount": 23, 
#       "system": "1450350259", 
#       "age": "2", 
#       "numberOfSatellites": 10, 
#       "time": "1659262386", 
#       "cmg": 345.56, 
#       "system_ID": "1450350259"
#     }
#   ]
# }

In [0]:
def read_stream_raw(spark: SparkSession, ehConf: dict) -> DataFrame:
    eh_schema = """
      `body` BINARY,`partition` STRING,`offset` STRING,`sequenceNumber` BIGINT,`enqueuedTime` TIMESTAMP,`publisher` STRING,`partitionKey` STRING,`properties` MAP<STRING, STRING>,`systemProperties` MAP<STRING, STRING>
     """
    return spark.readStream.format("eventhubs").schema(eh_schema).options(**ehConf).load()


In [0]:
def transform_raw(raw: DataFrame) -> DataFrame:
    return raw.select(
        F.col("*"),
        F.col("body").cast("string").alias("Body_Decoded"),
        F.lit("evhns-gps-prod-ne.evh-gps-prod-ne").alias("_Data_Source"),  # TODO: Find way to parameterise the $env
        F.col("enqueuedTime").alias("_Ingest_Timestamp"),
        F.lit("new").alias("_Status"),
        F.col("enqueuedTime").cast("date").alias("p_Ingest_Date"),
    )


In [0]:
def compute_date_range(df:DataFrame, date_field:str) -> (str, str):
    min_date = df.selectExpr(f"min({date_field})").collect()[0][0]
    max_date = df.selectExpr(f"max({date_field})").collect()[0][0]
    return str(min_date), str(max_date)

In [0]:
def load_rake_hist_by_unit_df(min_date:str, max_date:str) -> DataFrame:
    rake_history_df = load_rake_history()
    return (
        rake_history_df
        .filter((rake_history_df.Date >= min_date) & (rake_history_df.Date <= max_date))
        .select("Date", "Vehicle", "Unit", "TOC", "Fleet_Name", "Vehicle_Class", "Owning_Company", "Vehicle_Sub_Class")
        .withColumnRenamed("Date", "Rake_Date")
        .dropDuplicates(['Unit', 'Rake_Date'])
    )


In [0]:
def load_rake_hist_by_vehicle_df(min_date:str, max_date:str) -> DataFrame:
    rake_history_df = load_rake_history()
    return (
        rake_history_df
        .filter((rake_history_df.Date >= min_date) & (rake_history_df.Date <= max_date))
        .select("Date", "Vehicle", "Unit", "TOC", "Fleet_Name", "Vehicle_Class", "Owning_Company", "Vehicle_Sub_Class")
        .withColumnRenamed("Date", "Rake_Date")
        .dropDuplicates(['Vehicle', 'Rake_Date'])
    )


In [0]:
def add_rake_metadata(df: DataFrame) -> DataFrame:
    min_date,max_date=compute_date_range(df, 'Date')
    rake_hist_by_unit_df=load_rake_hist_by_unit_df(min_date,max_date)
    rake_hist_by_vehicle_df=load_rake_hist_by_vehicle_df(min_date,max_date)
    #rake_hist_df = load_rake_history() #wrong
    #rake_hist_df=load_rake_hist_df(min_date,max_date)  #wrong
    number_regex = r'([a-zA-Z]+)?(\d+)'
    
    #TRISTAN
    # join_exp_vehicle = (
    #     (F.regexp_extract(df.System_Name, number_regex, 2) == rake_hist_df.Vehicle) &
    #     (df.Date == rake_hist_df.Rake_Date) 
    # )    
    
    #STEVEN
    join_exp_vehicle = (
        (F.regexp_extract(df.System_Name, number_regex, 2) == rake_hist_by_vehicle_df.Vehicle) &
        (df.Date == rake_hist_by_vehicle_df.Rake_Date) 
    )   

    #TRISTAN
    # join_exp_unit = (
    #     (F.regexp_extract(df.System_Name, number_regex, 2) == rake_hist_df.Unit) &
    #     (df.Date == rake_hist_df.Rake_Date)
    # )

    #STEVEN
    join_exp_unit = (
        (F.regexp_extract(df.System_Name, number_regex, 2) == rake_hist_by_unit_df.Unit) &
        (df.Date == rake_hist_by_unit_df.Rake_Date)
    )

    df_with_rake_unit = df.join(rake_hist_by_vehicle_df, join_exp_vehicle, "left")
    #df_with_rake_history = df_with_rake_unit(rake_hist_by_unit_df, join_exp_unit, "left")
    df_with_rake_history = df_with_rake_unit.join(rake_hist_by_unit_df, join_exp_unit, "left")
    return df_with_rake_history


In [0]:
def filter_icomera_assets(df: DataFrame) -> DataFrame:
    icomera_assets = [*SYSTEM_ID_TO_ASSET_MAP.values()]

    return (
        df.filter(
            (F.col("Unit").isin(icomera_assets) | (F.col("Vehicle").isin(icomera_assets))) &
            (F.col("Date") > '2021-01-01')
        )
    )


In [0]:

class Upsert:
    def __init__(self, update_condition_string=None, delta_table_path=None):
        self.update_condition_string = update_condition_string
        self.delta_table_path = delta_table_path

    def _transform_bronze(self, df: DataFrame) -> DataFrame:
        transformed_bronze_df = (
            # Convert JSON body string into struct

            df.withColumn(
                "Body_JSON", F.from_json(F.col("Body_Decoded"), JSON_SCHEMA)
            )
            .drop("body", "Body_Decoded")  # no longer need the body in bytes, JSON string, or System ID (included in dataPoint)
            .select(
                F.col("*"),
                F.col("Body_JSON.*"),
                F.explode("Body_JSON.dataPoint").alias("Data_Point_Exploded")
                # unpack each element of the array dataPoint into its own row
            )
            .drop("dataPoint", "Body_JSON", "system_ID")
            .withColumnRenamed("p_Ingest_Date", "Ingest_Date")
            .withColumnRenamed("type", "Payload_Type")
            .select(
                F.col("*"),
                F.col("Data_Point_Exploded.*")
            )
            .drop("Data_Point_Exploded")
            .withColumnRenamed("type", "Datapoint_Type")
            .select(
                F.col("*"),
                F.col("position.*")
            )
            .drop("position")
            .dropDuplicates(["system_ID", "time"])

            # rename fields as per Governance
            .withColumnRenamed("partition", "Partition")
            .withColumnRenamed("offset", "Offset")
            .withColumnRenamed("sequenceNumber", "Sequence_Number")
            .withColumnRenamed("enqueuedTime", "Enqueued_Time")
            .withColumnRenamed("publisher", "Publisher")
            .withColumnRenamed("partitionKey", "Partition_Key")
            .withColumnRenamed("properties", "Properties")
            .withColumnRenamed("systemProperties", "System_Properties")
            .withColumnRenamed("events", "Events")
            .withColumnRenamed("supplier", "Supplier")
            .withColumnRenamed("sentAt", "Sent_At")
            .withColumnRenamed("mode", "Mode")
            .withColumnRenamed("time", "Unix_Time")
            .withColumnRenamed("numberOfSatellites", "Number_Of_Satellites")
            .withColumnRenamed("system", "System")
            .withColumnRenamed("system_name", "System_Name")
            .withColumnRenamed("cmg", "CMG")
            .withColumnRenamed("speed", "Speed")
            .withColumnRenamed("age", "Age")
            .withColumnRenamed("system_ID", "System_ID")
            .withColumnRenamed("altitude", "Altitude")
            .withColumnRenamed("latitude", "Latitude")
            .withColumnRenamed("longitude", "Longitude")
            .withColumnRenamed("type", "Position_Type")
            .withColumnRenamed("deviceCount", "Device_Count")

            # casting
            .withColumn("Sent_At", F.to_timestamp(F.trim(F.col("Sent_At")), "yyyy/MM/dd HH:mm"))
            .withColumn("Enqueued_Time", F.to_timestamp(F.col("Enqueued_Time"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
            .withColumn("Unix_Time", (F.col("Unix_Time")).cast('integer'))
            .withColumn("CMG", (F.col("CMG")).cast('float'))
            .withColumn("Device_Count", (F.col("Device_Count")).cast('integer'))
            .withColumn("Speed", (F.col("Speed")).cast('float'))
            .withColumn("Age", (F.col("Age")).cast('integer'))
            .withColumn("Altitude", (F.col("Altitude")).cast('float'))
            .withColumn("Latitude", (F.col("Latitude")).cast('float'))
            .withColumn("Longitude", (F.col("Longitude")).cast('float'))
            # derived columns
            .withColumn("Timestamp",
                        F.to_timestamp(F.from_unixtime(F.col("Unix_Time"), "yyyy-MM-dd HH:mm:ss"),
                                       "yyyy-MM-dd HH:mm:ss"))
            .withColumn('Date', F.to_date(F.from_unixtime(F.col("Unix_Time"))))
            .withColumn(PARTITION_COLUMN, F.col('Date'))
        )
        return add_rake_metadata(transformed_bronze_df)

    def _transform_silver(self, df: DataFrame) -> DataFrame:
        return (
            df
        )

    def _upsert_to_delta(self, micro_batch_df, batch, func: Callable[[DataFrame], None]):
        # merge microbatch into target table

        from delta.tables import DeltaTable

        if (self.update_condition_string is None) or (self.delta_table_path is None):
            return False

        delta_table = DeltaTable.forPath(spark, self.delta_table_path)

        transformed_df = func(micro_batch_df)

        (delta_table.alias('table')
         .merge(
            transformed_df.alias('updates'),
            self.update_condition_string
        )
         .whenNotMatchedInsertAll()
         .execute()
         )

    def upsert_to_silver(self, micro_batch_df, batch):
        self._upsert_to_delta(micro_batch_df, batch, self._transform_bronze)

    def upsert_to_silver_unified(self, micro_batch_df, batch):
        self._upsert_to_delta(micro_batch_df, batch, self._transform_silver)