In [0]:
from pyspark import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,DateType
from pyspark.sql.functions import col,rank
from pyspark.sql.window import Window
from datetime import datetime

spark = SparkSession.builder.appName("app").master("local[3]").getOrCreate()

In [0]:
schema = StructType([
    StructField("product_id",IntegerType(),False),
    StructField("new_price",IntegerType(),False),
    StructField("change_date",DateType(),False)
])
data = [
( 1          , 20        , datetime(2019,8,14 )) ,
( 2          , 50        , datetime(2019,8,14 )) ,
( 1          , 30        , datetime(2019,8,15 )) ,
( 1          , 35        , datetime(2019,8,16 )) ,
( 2          , 65        , datetime(2019,8,17 )) ,
( 3          , 20        , datetime(2019,8,18 )) 
]
products = spark.createDataFrame(data,schema)
products.show()

+----------+---------+-----------+
|product_id|new_price|change_date|
+----------+---------+-----------+
|         1|       20| 2019-08-14|
|         2|       50| 2019-08-14|
|         1|       30| 2019-08-15|
|         1|       35| 2019-08-16|
|         2|       65| 2019-08-17|
|         3|       20| 2019-08-18|
+----------+---------+-----------+



In [0]:
unique_products = products.alias("p1").select("product_id").distinct()
window_spec = Window.partitionBy("product_id").orderBy(col("change_date").desc())
price_on_16 = products.alias("p1").filter(col('change_date')<='2019-08-16').select("product_id",col("new_price").alias("price"),rank().over(window_spec).alias('rnk')).filter(col("rnk")==1)
unique_products.alias("up").join(price_on_16.alias("p16"),col("up.product_id")==col("p16.product_id"),'left').select(col("up.product_id"),col("p16.price")).fillna(10,['p16.price']).show()

+----------+-----+
|product_id|price|
+----------+-----+
|         1|   35|
|         2|   50|
|         3|   10|
+----------+-----+



In [0]:
products.createOrReplaceTempView("products")
spark.sql("""
            with p2 as(
                select distinct product_id,new_price as price, rank() over(partition by product_id order by change_date desc) as rnk from Products where change_date <= '2019-08-16'
            )
            select distinct p1.product_id, ifnull(p2.price,10) as price from Products p1 
            left join  
            p2  
            on p1.product_id=p2.product_id and p2.rnk=1""").show()

+----------+-----+
|product_id|price|
+----------+-----+
|         1|   35|
|         2|   50|
|         3|   10|
+----------+-----+



In [0]:
spark.stop()