In [4]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [5]:
spark = SparkSession.builder \
            .master("local[3]") \
            .appName("MiscDemo") \
            .getOrCreate()

data_list = [("Ravi", "28", "1", "2002"),
             ("Abdul", "23", "5", "81"), # 1981
             ("John", "12", "12", "6"), # 2006
             ("Rosy", "7", "8", "63"), # 1963
             ("Abdul", "23", "5", "81") # 1981
            ]
raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year").repartition(3)
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [6]:
df1 = raw_df.withColumn("id", monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|  81|          0|
| Rosy|  7|    8|  63|          1|
| Ravi| 28|    1|2002| 8589934592|
| John| 12|   12|   6|17179869184|
|Abdul| 23|    5|  81|17179869185|
+-----+---+-----+----+-----------+



In [7]:
df2 = df1.withColumn("year", expr("""
         case when year < 21 then year + 2000
         when year < 100 then year + 1900
         else year
         end"""))
df2.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         id|
+-----+---+-----+------+-----------+
|Abdul| 23|    5|1981.0|          0|
| Rosy|  7|    8|1963.0|          1|
| Ravi| 28|    1|  2002| 8589934592|
| John| 12|   12|2006.0|17179869184|
|Abdul| 23|    5|1981.0|17179869185|
+-----+---+-----+------+-----------+



In [8]:
df3 = df1.withColumn("year", expr("""
         case when year < 21 then cast(year as int) + 2000
         when year < 100 then cast(year as int) + 1900
         else year
         end"""))
df3.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Rosy|  7|    8|1963|          1|
| Ravi| 28|    1|2002| 8589934592|
| John| 12|   12|2006|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [9]:
df4 = df1.withColumn("year", expr("""
         case when year < 21 then year + 2000
         when year < 100 then year + 1900
         else year
         end""").cast(IntegerType()))
df4.show()
df4.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Rosy|  7|    8|1963|          1|
| Ravi| 28|    1|2002| 8589934592|
| John| 12|   12|2006|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [10]:
df5 = df1.withColumn("day", col("day").cast(IntegerType())) \
         .withColumn("month", col("month").cast(IntegerType())) \
         .withColumn("year", col("year").cast(IntegerType())) 

df6 = df5.withColumn("year", expr("""
          case when year < 21 then year + 2000
          when year < 100 then year + 1900
          else year
          end"""))
df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Rosy|  7|    8|1963|          1|
| Ravi| 28|    1|2002| 8589934592|
| John| 12|   12|2006|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [11]:
df7 = df5.withColumn("year", \
                    when(col("year") < 21, col("year") + 2000) \
                    .when(col("year") < 100, col("year") + 1900) \
                    .otherwise(col("year")))
df7.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Rosy|  7|    8|1963|          1|
| Ravi| 28|    1|2002| 8589934592|
| John| 12|   12|2006|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [12]:
df8 = df7.withColumn("dob", expr("to_date(concat(day,'/',month,'/',year), 'd/M/y')"))
df8.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         id|       dob|
+-----+---+-----+----+-----------+----------+
|Abdul| 23|    5|1981|          0|1981-05-23|
| Rosy|  7|    8|1963|          1|1963-08-07|
| Ravi| 28|    1|2002| 8589934592|2002-01-28|
| John| 12|   12|2006|17179869184|2006-12-12|
|Abdul| 23|    5|1981|17179869185|1981-05-23|
+-----+---+-----+----+-----------+----------+



In [13]:
df9 = df7.withColumn("dob", to_date(expr("concat(day,'/',month,'/',year)"), 'd/M/y')) \
         .drop("day", "month", "year") \
         .dropDuplicates(["name", "dob"]) \
         .sort(expr("dob desc"))
df9.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Rosy|          1|1963-08-07|
|Abdul|          0|1981-05-23|
| Ravi| 8589934592|2002-01-28|
| John|17179869184|2006-12-12|
+-----+-----------+----------+

