In [0]:
dbutils.fs.rm("/scenerios/duplicates.csv")
dbutils.fs.put("/scenerios/duplicates.csv","""id,name,loc,updated_date
1,ravi,banglore,2021-01-01
2,ravi,chennai,2022-02-02
3,ravi,Hyderabad,2022-06-10
4,Raj,banglore,2021-01-01
5,Raj,chennai,2022-02-02
6,Raj,Hyderabad,2022-06-10
7,Mahesh,banglore,2021-01-01
8,Prasad,chennai,2022-02-02
9,Prasad,Hyderabad,2022-06-10

""")

Wrote 272 bytes.
Out[80]: True

In [0]:
df = spark.read.csv("/scenerios/duplicates.csv",header=True,inferSchema=True)
df.printSchema()
display(df)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- loc: string (nullable = true)
 |-- updated_date: date (nullable = true)



id,name,loc,updated_date
1,ravi,banglore,2021-01-01
2,ravi,chennai,2022-02-02
3,ravi,Hyderabad,2022-06-10
4,Raj,banglore,2021-01-01
5,Raj,chennai,2022-02-02
6,Raj,Hyderabad,2022-06-10
7,Mahesh,banglore,2021-01-01
8,Prasad,chennai,2022-02-02
9,Prasad,Hyderabad,2022-06-10


# Using distinct() won't help here

In [0]:
from pyspark.sql.functions import col
df_distinct = df.distinct().orderBy(col("id"))
display(df_distinct)

id,name,loc,updated_date
1,ravi,banglore,2021-01-01
2,ravi,chennai,2022-02-02
3,ravi,Hyderabad,2022-06-10
4,Raj,banglore,2021-01-01
5,Raj,chennai,2022-02-02
6,Raj,Hyderabad,2022-06-10
7,Mahesh,banglore,2021-01-01
8,Prasad,chennai,2022-02-02
9,Prasad,Hyderabad,2022-06-10


# Using dropDuplicates will remove duplicates and keep the first one

In [0]:
display(df)
df_drop_duplicates = df.orderBy(col("updated_date").desc()).dropDuplicates(["name"])
display(df_drop_duplicates)

id,name,loc,updated_date
1,ravi,banglore,2021-01-01
2,ravi,chennai,2022-02-02
3,ravi,Hyderabad,2022-06-10
4,Raj,banglore,2021-01-01
5,Raj,chennai,2022-02-02
6,Raj,Hyderabad,2022-06-10
7,Mahesh,banglore,2021-01-01
8,Prasad,chennai,2022-02-02
9,Prasad,Hyderabad,2022-06-10


id,name,loc,updated_date
7,Mahesh,banglore,2021-01-01
9,Prasad,Hyderabad,2022-06-10
6,Raj,Hyderabad,2022-06-10
3,ravi,Hyderabad,2022-06-10


In [0]:
df2 = df.dropDuplicates(["name"])
display(df2)


id,name,loc,updated_date
7,Mahesh,banglore,2021-01-01
8,Prasad,chennai,2022-02-02
4,Raj,banglore,2021-01-01
1,ravi,banglore,2021-01-01


# Window function with Row_number()

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
df_window = df.withColumn("rowid",row_number().over(Window.partitionBy("name").orderBy(col("updated_date").desc())))
display(df_window)

id,name,loc,updated_date,rowid
7,Mahesh,banglore,2021-01-01,1
9,Prasad,Hyderabad,2022-06-10,1
8,Prasad,chennai,2022-02-02,2
6,Raj,Hyderabad,2022-06-10,1
5,Raj,chennai,2022-02-02,2
4,Raj,banglore,2021-01-01,3
3,ravi,Hyderabad,2022-06-10,1
2,ravi,chennai,2022-02-02,2
1,ravi,banglore,2021-01-01,3


In [0]:
df_final_recent_without_duplicates = df_window.select("id","name","loc","updated_date").filter("rowid=1").orderBy("id")
display(df_final_recent_without_duplicates)

id,name,loc,updated_date
3,ravi,Hyderabad,2022-06-10
6,Raj,Hyderabad,2022-06-10
7,Mahesh,banglore,2021-01-01
9,Prasad,Hyderabad,2022-06-10


In [0]:
df_with_duplicates = df_window.select("id","name","loc","updated_date").filter("rowid>1").orderBy("id")
display(df_with_duplicates)

id,name,loc,updated_date
1,ravi,banglore,2021-01-01
2,ravi,chennai,2022-02-02
4,Raj,banglore,2021-01-01
5,Raj,chennai,2022-02-02
8,Prasad,chennai,2022-02-02


# Make window function seperately then use in the df statement

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

partition_window = row_number().over(Window.partitionBy("name").orderBy(col("updated_date").desc()))

df_window = df.withColumn("rowid",partition_window)
display(df_window)

id,name,loc,updated_date,rowid
7,Mahesh,banglore,2021-01-01,1
9,Prasad,Hyderabad,2022-06-10,1
8,Prasad,chennai,2022-02-02,2
6,Raj,Hyderabad,2022-06-10,1
5,Raj,chennai,2022-02-02,2
4,Raj,banglore,2021-01-01,3
3,ravi,Hyderabad,2022-06-10,1
2,ravi,chennai,2022-02-02,2
1,ravi,banglore,2021-01-01,3
