In [1]:
# Importing the necessary tables
import findspark
import pyspark

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import to_date, col, column, expr, dayofmonth, month, \
    year, monotonically_increasing_id, when, exp, desc
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

In [2]:
findspark.init()

In [3]:
spark = SparkSession.builder.master('local[3]').appName('MiscDemo').getOrCreate()

In [4]:
data_list = [ # Name, day, month, year
    ('Ravi', 28, 1, 2002),
    ('Abdul', 23, 5, 81),
    ('John', 12, 12, 6),
    ('Rosy', 7, 8, 63),
    ('Abdul', 23, 5, 81)
]

# No names in the columns
raw_df = spark.createDataFrame(data_list)
raw_df.show()
raw_df.printSchema()

+-----+---+---+----+
|   _1| _2| _3|  _4|
+-----+---+---+----+
| Ravi| 28|  1|2002|
|Abdul| 23|  5|  81|
| John| 12| 12|   6|
| Rosy|  7|  8|  63|
|Abdul| 23|  5|  81|
+-----+---+---+----+

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)



In [5]:
# Try it again, and names to the columns
raw_df = spark.createDataFrame(data_list).toDF('name', 'day','month', 'year')
raw_df.show()
raw_df.printSchema()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| Ravi| 28|    1|2002|
|Abdul| 23|    5|  81|
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
+-----+---+-----+----+

root
 |-- name: string (nullable = true)
 |-- day: long (nullable = true)
 |-- month: long (nullable = true)
 |-- year: long (nullable = true)



# Let's prepare a new example | Unique identifier

In [6]:
data_list = [ # Name, day, month, year
    ('Ravi', '28', '1', '2002'),
    ('Abdul', '23', '5', '81'),
    ('John', '12', '12', '6'),
    ('Rosy', '7', '8', '63'),
    ('Abdul', '23', '5', '81')
]

# No names in the columns
raw_df = spark.createDataFrame(data_list).toDF('name', 'day','month', 'year').repartition(3)
raw_df.show()
raw_df.printSchema()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
|Abdul| 23|    5|  81|
| Ravi| 28|    1|2002|
|Abdul| 23|    5|  81|
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
+-----+---+-----+----+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [7]:
df1 = raw_df.withColumn('id', monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|  81|          0|
| Ravi| 28|    1|2002| 8589934592|
|Abdul| 23|    5|  81| 8589934593|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+



# Switch case to fix the year format

In [8]:
df2 = df1.withColumn('year', expr('''
            case when year < 21 then year + 2000
            when year < 100 then year + 1900
            else year
            end
        '''))
df2.show()
df2.printSchema()

+-----+---+-----+------+-----------+
| name|day|month|  year|         id|
+-----+---+-----+------+-----------+
|Abdul| 23|    5|1981.0|          0|
| Ravi| 28|    1|  2002| 8589934592|
|Abdul| 23|    5|1981.0| 8589934593|
| John| 12|   12|2006.0|17179869184|
| Rosy|  7|    8|1963.0|17179869185|
+-----+---+-----+------+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- id: long (nullable = false)



## Approaches to fix data type

* Inline Cast
* Change the schema

In [9]:
# Inline cast
df3 = df1.withColumn('year', expr('''
            case when year < 21 then cast(year as int) + 2000
            when year < 100 then cast(year as int) + 1900
            else cast(year as int)
            end
        '''))
df3.show()
df3.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|Abdul| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [10]:
# Changing the schema
df4 = df1.withColumn('year', expr('''
            case when year < 21 then year + 2000
            when year < 100 then year + 1900
            else year
            end
        ''').cast(IntegerType()))
df4.show()
df4.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|Abdul| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [11]:
# Casting all the 3 fields: day, month, year
df5 = df1.withColumn('day', col('day').cast(IntegerType()))\
         .withColumn('month', col('month').cast(IntegerType()))\
         .withColumn('year', col('year').cast(IntegerType()))\

df5.show()
df5.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|  81|          0|
| Ravi| 28|    1|2002| 8589934592|
|Abdul| 23|    5|  81| 8589934593|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [12]:
df6 = df5.withColumn('year', expr('''
            case when year < 21 then year + 2000
            when year < 100 then year + 1900
            else year
            end
        '''))
df6.show()
df6.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|Abdul| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [13]:
df7 = df5.withColumn('year', when(col('year') < 21, col('year') + 2000)\
                             .when(col('year') < 100, col('year') + 1900)\
                             .otherwise(col('year')))
df7.show()
df7.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|Abdul| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|Abdul| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



# How to add/remove columns and duplicates

In [14]:
df8 = df7.withColumn('dob', expr("to_date(concat(day, '/', month, '/', year), 'd/M/y')"))
df8.show()
df8.printSchema()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         id|       dob|
+-----+---+-----+----+-----------+----------+
|Abdul| 23|    5|1981|          0|1981-05-23|
| Ravi| 28|    1|2002| 8589934592|2002-01-28|
|Abdul| 23|    5|1981| 8589934593|1981-05-23|
| John| 12|   12|2006|17179869184|2006-12-12|
| Rosy|  7|    8|1963|17179869185|1963-08-07|
+-----+---+-----+----+-----------+----------+

root
 |-- name: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)
 |-- dob: date (nullable = true)



In [15]:
# Drop duplicated rows and unnecessary cols
df9 = df7.withColumn('dob', to_date(expr("concat(day, '/', month, '/', year)"), 'd/M/y'))\
         .drop('day', 'month', 'year')\
         .dropDuplicates(['name', 'dob'])\
         .sort(desc('dob'))
df9.show()
df9.printSchema()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| John|17179869184|2006-12-12|
| Ravi| 8589934592|2002-01-28|
|Abdul|          0|1981-05-23|
| Rosy|17179869185|1963-08-07|
+-----+-----------+----------+

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- dob: date (nullable = true)



In [16]:
spark.stop()