In [1]:
from pyspark.sql import DataFrame, SparkSession, Window, functions as F

In [2]:
from pyspark.sql.types import (
    IntegerType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

In [3]:
from datetime import datetime

In [4]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/23 09:14:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
schema = StructType(
        [
            StructField("location", StringType(), True),
            StructField("item_code", IntegerType(), True),
            StructField("colour", StringType(), True),
            StructField("append_timestamp", TimestampType(), True),
        ]
    )

In [6]:
data = [
    ("london", 111111, "yellow", datetime(2024, 1, 1)),
    ("london", 222222, "blue", datetime(2024, 1, 1)),
    ("not london", 222222, "red", datetime(2024, 4, 1)),
    ("not london", 222222, "blue", datetime(2024, 4, 1)),
]

In [7]:
df = spark.createDataFrame(schema=schema, data=data)

In [8]:
df.show()

                                                                                

+----------+---------+------+-------------------+
|  location|item_code|colour|   append_timestamp|
+----------+---------+------+-------------------+
|    london|   111111|yellow|2024-01-01 00:00:00|
|    london|   222222|  blue|2024-01-01 00:00:00|
|not london|   222222|   red|2024-04-01 00:00:00|
|not london|   222222|  blue|2024-04-01 00:00:00|
+----------+---------+------+-------------------+



In [19]:
df2 = df.withColumn("latest_time", F.max("append_timestamp").over(Window.partitionBy("item_code", "colour").orderBy()))

In [20]:
df2.show()

+----------+---------+------+-------------------+-------------------+
|  location|item_code|colour|   append_timestamp|        latest_time|
+----------+---------+------+-------------------+-------------------+
|    london|   111111|yellow|2024-01-01 00:00:00|2024-01-01 00:00:00|
|    london|   222222|  blue|2024-01-01 00:00:00|2024-04-01 00:00:00|
|not london|   222222|  blue|2024-04-01 00:00:00|2024-04-01 00:00:00|
|not london|   222222|   red|2024-04-01 00:00:00|2024-04-01 00:00:00|
+----------+---------+------+-------------------+-------------------+



In [21]:
df2 = df2.filter(F.col("append_timestamp") == F.col("latest_time")).drop("latest_time")

In [22]:
df2.show()

+----------+---------+------+-------------------+
|  location|item_code|colour|   append_timestamp|
+----------+---------+------+-------------------+
|    london|   111111|yellow|2024-01-01 00:00:00|
|not london|   222222|  blue|2024-04-01 00:00:00|
|not london|   222222|   red|2024-04-01 00:00:00|
+----------+---------+------+-------------------+

