In [12]:
import re
import findspark  
findspark.init()
from pyspark.sql import *
from pyspark.sql.functions import udf, expr
from pyspark.sql.types import *
## Note: Work well in python 3.9.13 env for better result.

In [9]:
spark = SparkSession.builder  \
            .appName("MsicDemo") \
            .master("local[3]") \
            .getOrCreate()


In [13]:
data_list=[("Ravi", "28","1" , 2002),
        ("Abdul","23", "5", "81"), # 1981
        ("John", "12", "12", "6"), # 2006
        ("Rosy", "7", "8", "63"),  # 1963
        ("Abdul", "23", "5", "81")  # 1981
        ]

raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year").repartition(3)
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [14]:
from pyspark.sql.functions import monotonically_increasing_id


df1 = raw_df.withColumn("id", monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|  81|          1|
|Abdul| 23|    5|  81| 8589934592|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+



In [19]:
df2 = df1.withColumn("year", expr(
    """ case when year < 21 then year + 2000
        when year < 100 then  year + 1900 else year
        end """
))
df2.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         id|
+-----+---+-----+------+-----------+
| Ravi| 28|    1|  2002|          0|
|Abdul| 23|    5|1981.0|          1|
|Abdul| 23|    5|1981.0| 8589934592|
| John| 12|   12|2006.0|17179869184|
| Rosy|  7|    8|1963.0|17179869185|
+-----+---+-----+------+-----------+



In [18]:
df3 = df1.withColumn("year", expr(
    """ case when year < 21 then cast(year as int) + 2000
        when year < 100 then cast (year as int) + 1900 else year
        end """
))
df3.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



In [20]:
df4 = df1.withColumn("year", expr(
    """ case when year < 21 then year + 2000
        when year < 100 then year + 1900 else year
        end """).cast(IntegerType()))
df4.show()
df4.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [21]:
df1.show()
df1.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|  81|          1|
|Abdul| 23|    5|  81| 8589934592|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- id: long (nullable = false)



In [22]:
from pyspark.sql.functions import col


df5 = df1.withColumn("day", col("day").cast(IntegerType())) \
    .withColumn("month", col("month").cast(IntegerType())) \
    .withColumn("year", col("year").cast(IntegerType()))

df6 = df5.withColumn("year", expr(
    """ case when year < 21 then year + 2000
        when year < 100 then year + 1900 else year
        end """))

df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



In [23]:
from pyspark.sql.functions import when


df7 = df5.withColumn("year", when(col("year") < 21, col("year") + 2000) \
                    .when(col("year") < 100 , col("year") + 1900) \
                    .otherwise(col("year")))
df7.show()


+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



In [25]:
df8 = df7.withColumn("dob", expr("to_date(concat(day,'/', month, '/', year),'d/M/y')"))
df8.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         id|       dob|
+-----+---+-----+----+-----------+----------+
| Ravi| 28|    1|2002|          0|2002-01-28|
|Abdul| 23|    5|1981|          1|1981-05-23|
|Abdul| 23|    5|1981| 8589934592|1981-05-23|
| John| 12|   12|2006|17179869184|2006-12-12|
| Rosy|  7|    8|1963|17179869185|1963-08-07|
+-----+---+-----+----+-----------+----------+



In [27]:
from pyspark.sql.functions import to_date


df9 = df7.withColumn("dob", to_date(expr("concat(day,'/', month, '/', year)"),'d/M/y'))
df9.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         id|       dob|
+-----+---+-----+----+-----------+----------+
| Ravi| 28|    1|2002|          0|2002-01-28|
|Abdul| 23|    5|1981|          1|1981-05-23|
|Abdul| 23|    5|1981| 8589934592|1981-05-23|
| John| 12|   12|2006|17179869184|2006-12-12|
| Rosy|  7|    8|1963|17179869185|1963-08-07|
+-----+---+-----+----+-----------+----------+



In [28]:


df10 = df7.withColumn("dob", to_date(expr("concat(day,'/', month, '/', year)"),'d/M/y')) \
    .drop("day","month","year")
df10.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Ravi|          0|2002-01-28|
|Abdul|          1|1981-05-23|
|Abdul| 8589934592|1981-05-23|
| John|17179869184|2006-12-12|
| Rosy|17179869185|1963-08-07|
+-----+-----------+----------+



In [31]:
df11 = df7.withColumn("dob", to_date(expr("concat(day,'/', month, '/', year)"),'d/M/y')) \
    .drop("day","month","year") \
    .dropDuplicates(["name", "dob"])
df11.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Ravi|          0|2002-01-28|
|Abdul|          1|1981-05-23|
| Rosy|17179869185|1963-08-07|
| John|17179869184|2006-12-12|
+-----+-----------+----------+



In [32]:
df12 = df7.withColumn("dob", to_date(expr("concat(day,'/', month, '/', year)"),'d/M/y')) \
    .drop("day","month","year") \
    .dropDuplicates(["name", "dob"]) \
    .sort(expr("dob desc"))
df12.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Rosy|17179869185|1963-08-07|
|Abdul|          1|1981-05-23|
| Ravi|          0|2002-01-28|
| John|17179869184|2006-12-12|
+-----+-----------+----------+

