In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/09 18:53:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.functions import date_format
from pyspark.sql.functions import to_date

23/12/09 18:53:52 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
23/12/09 18:53:52 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("new_price", IntegerType(), True),
    StructField("change_date", StringType(), True)
])

# Data to be loaded into the DataFrame
data = [
    (1, 20, '2019-08-14'),
    (2, 50, '2019-08-14'),
    (1, 30, '2019-08-15'),
    (1, 35, '2019-08-16'),
    (2, 65, '2019-08-17'),
    (3, 20, '2019-08-18')
]

df = spark.createDataFrame(data, schema)

df.show()
print(df.dtypes)

                                                                                

+----------+---------+-----------+
|product_id|new_price|change_date|
+----------+---------+-----------+
|         1|       20| 2019-08-14|
|         2|       50| 2019-08-14|
|         1|       30| 2019-08-15|
|         1|       35| 2019-08-16|
|         2|       65| 2019-08-17|
|         3|       20| 2019-08-18|
+----------+---------+-----------+

[('product_id', 'int'), ('new_price', 'int'), ('change_date', 'string')]


In [7]:
df.createOrReplaceTempView("Products")

In [8]:
result = spark.sql(
    """
    -- using UNION ALL --
    select product_id, 10 as price from Products group by 1 having min(change_date) > '2019-08-16' 
union all
select p.product_id, new_price as price
from Products p inner join (select product_id, max(change_date) as change_date
        from Products where change_date <= '2019-08-16'
        group by 1
        ) r on p.product_id = r.product_id and p.change_date = r.change_date

    """
)
result.show()

[Stage 5:>                                                          (0 + 8) / 8]

+----------+-----+
|product_id|price|
+----------+-----+
|         3|   10|
|         2|   50|
|         1|   35|
+----------+-----+



                                                                                

In [12]:
result = spark.sql(
    """
    -- using WINDOW functions
    -- without distinct, the window function result is repeated every row
    select DISTINCT product_id, first_value(new_price) over(partition by product_id order by change_date desc) as price
    from Products
    where change_date <= '2019-08-16'
    """
)
result.show()

+----------+-----+
|product_id|price|
+----------+-----+
|         1|   35|
|         2|   50|
+----------+-----+



In [19]:
result = spark.sql(
    """
    -- using WINDOW functions, solution --
    select p.product_id, ifnull(price,10) as price
    from (select distinct product_id from Products) p left join
    (select DISTINCT product_id, first_value(new_price) over(partition by product_id order by change_date desc) as price
    from Products
    where change_date <= '2019-08-16') r on p.product_id = r.product_id

    """
)
result.show()



+----------+-----+
|product_id|price|
+----------+-----+
|         1|   35|
|         2|   50|
|         3|   10|
+----------+-----+



                                                                                

In [23]:
result = spark.sql(
    """
    -- using JOIN --
    select p.product_id, ifnull(price,10) as price
    from (select distinct product_id from Products) p 
    left join (select Products.product_id, new_price as price
                from Products join 
                    (select product_id, max(change_date) as change_date 
                        from Products where change_date <= '2019-08-16' group by 1) d
                        on Products.product_id = d.product_id and Products.change_date = d.change_date) r 
            on P.product_id = r.product_id

    """
)
result.show()

[Stage 34:>                                                         (0 + 8) / 8]

+----------+-----+
|product_id|price|
+----------+-----+
|         1|   35|
|         2|   50|
|         3|   10|
+----------+-----+



                                                                                

In [26]:
result = spark.sql(
    """
    -- using JOIN, (id, date IN) --
    select p.product_id, ifnull(price,10) as price
    from (select distinct product_id from Products) p 
    left join (select Products.product_id, new_price as price
                from Products where (product_id, change_date) IN
                    (select product_id, max(change_date) as change_date 
                        from Products where change_date <= '2019-08-16' group by 1) ) r 
            on P.product_id = r.product_id

    """
)
result.show()

                                                                                

+----------+-----+
|product_id|price|
+----------+-----+
|         1|   35|
|         2|   50|
|         3|   10|
+----------+-----+

