In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 44.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3fe5a3e47bddd2da3e60b4cc58a729d94491aa346ca7f2cf578923e19ed3faa8
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, when , expr
from pyspark.sql.types import * 


In [3]:
data_list = [("Ravi", "28", "1", "2002"),
                 ("Abdul", "23", "5", "81"),  # 1981
                 ("John", "12", "12", "6"),  # 2006
                 ("Rosy", "7", "8", "63"),  # 1963
                 ("Abdul", "23", "5", "81")] 

In [7]:
spark = SparkSession.builder.appName('Misc Demo').master('local[2]').getOrCreate()

In [8]:
raw_df = spark.createDataFrame(data_list).toDF("name", 'day', 'month', 'year')\
.repartition(3)

In [9]:
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [14]:
raw_df.show()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| John| 12|   12|   6|
| Ravi| 28|    1|2002|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
|Abdul| 23|    5|  81|
+-----+---+-----+----+



In [11]:
final_df = raw_df.withColumn("id", monotonically_increasing_id()) \
        .withColumn("day", col("day").cast(IntegerType())) \
        .withColumn("month", col("month").cast(IntegerType())) \
        .withColumn("year", col("year").cast(IntegerType())) \
        .withColumn("year", when(col("year") < 20, col("year") + 2000)
                    .when(col("year") < 100, col("year") + 1900)
                    .otherwise(col("year"))) \
        .withColumn("dob", expr("to_date(concat(day,'/',month,'/',year), 'd/M/y')")) \
        .drop("day", "month", "year") \
        .dropDuplicates(["name", "dob"]) \
        .sort(col("dob").desc())

In [12]:
final_df

DataFrame[name: string, id: bigint, dob: date]

In [13]:
final_df.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| John|          0|2006-12-12|
| Ravi| 8589934592|2002-01-28|
|Abdul|17179869184|1981-05-23|
| Rosy| 8589934593|1963-08-07|
+-----+-----------+----------+



In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [17]:
spark = SparkSession.builder.master('local[3]').appName('MiscDemo').getOrCreate()

data_list = [("Ravi", "28", "1", "2002"),
             ("Abdul", "23", "5", "81"), # 1981
             ("John", "12", "12", "6"), # 2006
             ("Rosy", "7", "8", "63"), # 1963
             ("Abdul", "23", "5", "81") # 1981
             ]

In [18]:
raw_df = spark.createDataFrame(data_list).toDF('name', 
                              'day', 'month', 'year').repartition(3)

In [19]:
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [20]:
raw_df.show()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| John| 12|   12|   6|
| Ravi| 28|    1|2002|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
|Abdul| 23|    5|  81|
+-----+---+-----+----+



In [21]:
df1 = raw_df.withColumn('id', monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|   6|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|  63| 8589934593|
|Abdul| 23|    5|  81|17179869184|
|Abdul| 23|    5|  81|17179869185|
+-----+---+-----+----+-----------+



In [22]:
df2 = df1.withColumn('year', expr("""
case when year < 21 then year + 2000
when year < 100 then year + 1900
else year
end
"""))

df2.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         id|
+-----+---+-----+------+-----------+
| John| 12|   12|2006.0|          0|
| Ravi| 28|    1|  2002| 8589934592|
| Rosy|  7|    8|1963.0| 8589934593|
|Abdul| 23|    5|1981.0|17179869184|
|Abdul| 23|    5|1981.0|17179869185|
+-----+---+-----+------+-----------+



In [23]:
df3 = df1.withColumn("year", expr("""
case when year<21 then cast(year as int) + 2000
when year < 100 then cast (year as int) + 1900
else year
end"""))

df3.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [24]:
df4 = df1.withColumn('year', expr("""

case when year < 21 then year + 2000
when year < 100 then year + 1900
else year
end""").cast(IntegerType()))

df4.show()
df4.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [28]:
df4 = df1.withColumn("year", expr("""
         case when year < 21 then year + 2000
         when year < 100 then year + 1900
         else year
         end""").cast(IntegerType()))
df4.show()
df4.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [33]:
df5 = df1.withColumn("day", col("day").cast(IntegerType())) \
         .withColumn("month", col("month").cast(IntegerType())) \
         .withColumn("year", col("year").cast(IntegerType())) 

df6 = df5.withColumn("year", expr("""
          case when year < 21 then year + 2000
          when year < 100 then year + 1900
          else year
          end"""))
df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [34]:
df7 = df5.withColumn("year", \
                    when(col("year") < 21, col("year") + 2000) \
                    .when(col("year") < 100, col("year") + 1900) \
                    .otherwise(col("year")))
df7.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+

