In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder \
    .master("local[3]") \
        .appName("MiscDemo") \
            .getOrCreate()
        

# Creating a dataframe quickly

In [4]:
data_list = [("Ravi", 28, 1, 2002),
                 ("Abdul", 23, 5, 81),  
                 ("John", 12, 12, 6),  
                 ("Rosy", 7, 8, 63),  
                 ("Abdul", 23, 5, 81)] 

In [5]:
raw_df = spark.createDataFrame(data_list)
# since the schema is not specified int he above line, the schema is inferred and thus its a bit meaningless
raw_df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)



In [6]:
# below is an approach which can be used for unit testing/exploration purposes.
# in the below approach, we skipped the following: paralllalizing our data, creating RDD, creating schema definition etc....
raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year")
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: long (nullable = true)
 |-- month: long (nullable = true)
 |-- year: long (nullable = true)



In [7]:
# if we want to adjust the data types, you can add "" to the data.
data_list = [("Ravi", "28", "1", "2002"),
                 ("Abdul", "23", "5", "81"),  
                 ("John", "12", "12", "6"),  
                 ("Rosy", "7", "8", "63"),  
                 ("Abdul", "23", "5", "81")] 

In [8]:
# below is an approach which can be used for unit testing/exploration purposes.
# in the below approach, we skipped the following: paralllalizing our data, creating RDD, creating schema definition etc....
raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year")
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



# How to add monotonically increasing id

## This is a function which generates nonotonically increasing integer number which is guaranteed to be unique across all partitions
## However do not expect this to be consecutive

In [9]:
# below is an approach which can be used for unit testing/exploration purposes.
# in the below approach, we skipped the following: paralllalizing our data, creating RDD, creating schema definition etc....
# Here we also added repartition to ensure that we have more than 1 partition. (this is imp to ensure while we are working on local machine)
# But ensure that the unnecesary repartitioning parts are removed from the final code version.
raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year").repartition(3)
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [10]:
# We previously saw that the withcolumn method is used to transform a single column
# In this case since ID do not exist, it will create that column
df1 = raw_df.withColumn("id", monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|  81|          1|
|Abdul| 23|    5|  81| 8589934592|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+



# How to use Case When Then

In [11]:
df2 = df1.withColumn("year", expr("""
                                 case when year < 21 then year + 2000
                                 when year < 100 then year + 1900
                                 else year
                                 end"""))
df2.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         id|
+-----+---+-----+------+-----------+
| Ravi| 28|    1|  2002|          0|
|Abdul| 23|    5|1981.0|          1|
|Abdul| 23|    5|1981.0| 8589934592|
| John| 12|   12|2006.0|17179869184|
| Rosy|  7|    8|1963.0|17179869185|
+-----+---+-----+------+-----------+



In [12]:
# In the above output you can see that the year automatically got converted to decimal, this happened due to the following:
# Automatic Type Promotion & Schema inference incorrect data type.
# In the above command since we did an arithametic operation the spark engine converted it into decimal did the operation
# and then converted back to string, since the inferschema had set the dataframe schema as string.

# How to cast the fields

In [13]:
# Method 1: Inline Casting
df3 = df1.withColumn("year", expr("""
                                 case when year < 21 then cast(year as int) + 2000
                                 when year < 100 then cast(year as int) + 1900
                                 else year
                                 end"""))
df3.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



In [14]:
# Method 2: Change the Schema
df3 = df1.withColumn("year", expr("""
                                 case when year < 21 then year + 2000
                                 when year < 100 then year + 1900
                                 else year
                                 end""").cast(IntegerType()))
df3.show()
df3.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [15]:
df1.show()
df1.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|  81|          1|
|Abdul| 23|    5|  81| 8589934592|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- id: long (nullable = false)



In [16]:
df4 = df1.withColumn("day", col("day").cast(IntegerType())) \
    .withColumn("month", col("month").cast(IntegerType())) \
        .withColumn("year", col("year").cast(IntegerType()))
        
df5 = df4.withColumn("year", expr("""
                                 case when year < 21 then year + 2000
                                 when year < 100 then year + 1900
                                 else year
                                 end""").cast(IntegerType()))

df5.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



# Alternate way to write CASE WHEN, using column objects

In [17]:
df6 = df4.withColumn("year", \
    when(col("year") < 21, col("year") + 2000) \
        .when(col("year") < 100, col("year") + 1900) \
            .otherwise(col("year")))

df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981|          1|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



# Addding and removing columns and duplicates

In [18]:
df7 = df6

In [19]:
df8 = df7.withColumn("dob", expr("to_date(concat(day,'/',month,'/',year),'d/M/y')"))
df8.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         id|       dob|
+-----+---+-----+----+-----------+----------+
| Ravi| 28|    1|2002|          0|2002-01-28|
|Abdul| 23|    5|1981|          1|1981-05-23|
|Abdul| 23|    5|1981| 8589934592|1981-05-23|
| John| 12|   12|2006|17179869184|2006-12-12|
| Rosy|  7|    8|1963|17179869185|1963-08-07|
+-----+---+-----+----+-----------+----------+



In [21]:
df9 = df7.withColumn("dob", to_date(expr("concat(day, '/', month, '/', year)"),'d/M/y'))
df9.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         id|       dob|
+-----+---+-----+----+-----------+----------+
| Ravi| 28|    1|2002|          0|2002-01-28|
|Abdul| 23|    5|1981|          1|1981-05-23|
|Abdul| 23|    5|1981| 8589934592|1981-05-23|
| John| 12|   12|2006|17179869184|2006-12-12|
| Rosy|  7|    8|1963|17179869185|1963-08-07|
+-----+---+-----+----+-----------+----------+



In [22]:
df9 = df7.withColumn("dob", to_date(expr("concat(day, '/', month, '/', year)"),'d/M/y')) \
    .drop("day", "month", "year")
df9.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Ravi|          0|2002-01-28|
|Abdul|          1|1981-05-23|
|Abdul| 8589934592|1981-05-23|
| John|17179869184|2006-12-12|
| Rosy|17179869185|1963-08-07|
+-----+-----------+----------+



# Dropping Duplicate Rows

In [23]:
df9 = df7.withColumn("dob", to_date(expr("concat(day, '/', month, '/', year)"),'d/M/y')) \
    .drop("day", "month", "year") \
        .dropDuplicates(['name', "dob"])
df9.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Ravi|          0|2002-01-28|
|Abdul|          1|1981-05-23|
| Rosy|17179869185|1963-08-07|
| John|17179869184|2006-12-12|
+-----+-----------+----------+



In [24]:
df9 = df7.withColumn("dob", to_date(expr("concat(day, '/', month, '/', year)"),'d/M/y')) \
    .drop("day", "month", "year") \
        .dropDuplicates(['name', "dob"]) \
            .sort(expr("dob desc"))
df9.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Rosy|17179869185|1963-08-07|
|Abdul|          1|1981-05-23|
| Ravi|          0|2002-01-28|
| John|17179869184|2006-12-12|
+-----+-----------+----------+

