# Processing Column Data

As part of this module we will explore the functions available under `pyspark.sql.functions` to derive new values from existing column values with in a Data Frame.

* Pre-defined Functions
* Create Dummy Data Frame
* Categories of Functions
* Special Functions - col and lit
* String Manipulation Functions - 1
* String Manipulation Functions - 2
* Date and Time Overview
* Date and Time Arithmetic
* Date and Time - trunc and date_trunc
* Date and Time - Extracting Information
* Dealing with Unix Timestamp
* Example - Word Count
* Conclusion

%md
## Pre-defined Functions

We typically process data in the columns using functions in `pyspark.sql.functions`. Let us understand details about these functions in detail as part of this module.

%md
* Let us recap about Functions or APIs to process Data Frames.
 * Projection - `select` or `withColumn` or `drop` or `selectExpr`
 * Filtering - `filter` or `where`
 * Grouping data by key and perform aggregations - `groupBy`
 * Sorting data - `sort` or `orderBy` 
* We can pass column names or literals or expressions to all the Data Frame APIs.
* Expressions include arithmetic operations, transformations using functions from `pyspark.sql.functions`.
* There are approximately 300 functions under `pyspark.sql.functions`.
* We will talk about some of the important functions used for String Manipulation, Date Manipulation etc.


* String Manipulation Functions
  * Case Conversion - `lower`,  `upper`
  * Getting Length -  `length`
  * Extracting substrings - `substring`, `split`
  * Trimming - `trim`, `ltrim`, `rtrim`
  * Padding - `lpad`, `rpad`
  * Concatenating string - `concat`, `concat_ws`
* Date Manipulation Functions
  * Getting current date and time - `current_date`, `current_timestamp`
  * Date Arithmetic - `date_add`, `date_sub`, `datediff`, `months_between`, `add_months`, `next_day`
  * Beginning and Ending Date or Time - `last_day`, `trunc`, `date_trunc`
  * Formatting Date - `date_format`
  * Extracting Information - `dayofyear`, `dayofmonth`, `dayofweek`, `year`, `month`
* Aggregate Functions
  * `count`, `countDistinct`
  * `sum`, `avg`
  * `min`, `max`
* Other Functions - We will explore depending on the use cases.
  * `CASE` and `WHEN`
  * `CAST` for type casting
  * Functions to manage special types such as `ARRAY`, `MAP`, `STRUCT` type columns
  * Many others

In [0]:
# Reading data 
# note: the public retail_db is 

orders = spark.read.csv(
    '/FileStore/tables/orders.csv',
    schema='order_id INT, order_date STRING, order_customer_id INT, order_status STRING'
)

# note: /FileStore/tables/orders.csv nee to uploaded at dbfs 
orders.show(12)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|             1837|         CLOSED|
|      13|

In [0]:
# Importing functions

from pyspark.sql.functions import date_format,grouping
# Function as part of projections

print(orders.select('*', date_format('order_date', 'yyyyMM').alias('order_month')).show(2))

print(orders.withColumn('order_month', date_format('order_date', 'yyyyMM')).show(2))

#  note : withColumn will add the  column to original dataFrame  ,
#  syntax ("<NewColumnName>",'old ColumnName')
# Function as part of groupBy
print(orders.groupBy(date_format('order_date', 'yyyyMM').alias('order_month')).count().show(2))



+--------+--------------------+-----------------+---------------+-----------+
|order_id|          order_date|order_customer_id|   order_status|order_month|
+--------+--------------------+-----------------+---------------+-----------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|     201307|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|     201307|
+--------+--------------------+-----------------+---------------+-----------+
only showing top 2 rows

None
+--------+--------------------+-----------------+---------------+-----------+
|order_id|          order_date|order_customer_id|   order_status|order_month|
+--------+--------------------+-----------------+---------------+-----------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|     201307|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|     201307|
+--------+--------------------+-----------------+---------------+-----------+
only showing top 2 rows

None
+---

In [0]:
# Create Dummy Data Frame
# Let us go ahead and create data frame using dummy data to explore Spark functions.
l = [('X', )]
from pyspark.sql.functions import current_date
# Oracle dual (view)
# dual - dummy CHAR(1)
# "X" - One record
df = spark.createDataFrame(l, "dummy STRING")
df.printSchema()
df.show(1)
df.select(current_date()).show(1)

#  note : pyspark.sql. functions ie s 


root
 |-- dummy: string (nullable = true)

+-----+
|dummy|
+-----+
|    X|
+-----+

+--------------+
|current_date()|
+--------------+
|    2022-12-12|
+--------------+



In [0]:
employees = [
    (1, "Scott", "Tiger", 1000.0, 
      "united states", "+1 123 456 7890", "123 45 6789"
    ),
     (2, "Henry", "Ford", 1250.0, 
      "India", "+91 234 567 8901", "456 78 9123"
     ),
     (3, "Nick", "Junior", 750.0, 
      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
     ),
     (4, "Bill", "Gomes", 1500.0, 
      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
     )
]

employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING,
                    phone_number STRING, ssn STRING"""
                   )

employeesDF.show(truncate=False)

+-----------+----------+---------+------+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|nationality   |phone_number    |ssn        |
+-----------+----------+---------+------+--------------+----------------+-----------+
|1          |Scott     |Tiger    |1000.0|united states |+1 123 456 7890 |123 45 6789|
|2          |Henry     |Ford     |1250.0|India         |+91 234 567 8901|456 78 9123|
|3          |Nick      |Junior   |750.0 |united KINGDOM|+44 111 111 1111|222 33 4444|
|4          |Bill      |Gomes    |1500.0|AUSTRALIA     |+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+--------------+----------------+-----------+



In [0]:
from pyspark.sql.functions import date_format, col, lit, concat, concat_ws

#  note: Let us understand special functions such as col and lit. These functions are typically used to convert the strings to column type.
# * If there are no transformations on any column in any function then we should be able to pass all column names as strings.
# * If not we need to pass all columns as type column by using col function.
# * If we want to apply transformations using some of the functions then passing column names as strings will not suffice. We have to pass them as column type.

In [0]:
employees = [(1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]


employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING,
                    phone_number STRING, ssn STRING"""
                   )

from pyspark.sql.functions import col, upper
employeesDF.groupBy(upper(col("nationality"))).count().show()

## here we are 

+------------------+-----+
|upper(nationality)|count|
+------------------+-----+
|     UNITED STATES|    1|
|             INDIA|    1|
|    UNITED KINGDOM|    1|
|         AUSTRALIA|    1|
+------------------+-----+



In [0]:
# Alternative - we can also refer column names using Data Frame like this
employeesDF.orderBy(upper(employeesDF.first_name).alias('first_name')).show()
# Alternative - we can also refer column names using Data Frame like this
employeesDF.orderBy(upper(employeesDF['first_name']).alias('first_name')).show()
# Alternative - we can also refer column names using Data Frame like this
employeesDF.orderBy(upper(employeesDF.first_name).alias('first_name')).show()


#  note all are same 

In [0]:
from pyspark.sql.functions import concat, col, lit

employeesDF.select(concat(col("first_name"),lit(", "),col("last_name")).alias("full_name")).show(truncate=False)

In [0]:
## functions upper, lower , initcap,length  take 4 argument from 
from pyspark.sql.functions import lower,upper,length,initcap
employeesDF. \
  select("employee_id", "nationality"). \
  withColumn("nationality_upper", upper(col("nationality"))). \
  withColumn("nationality_lower", lower(col("nationality"))). \
  withColumn("nationality_initcap", initcap(col("nationality"))). \
  withColumn("nationality_length", length(col("nationality"))). \
  show()

+-----------+--------------+-----------------+-----------------+-------------------+------------------+
|employee_id|   nationality|nationality_upper|nationality_lower|nationality_initcap|nationality_length|
+-----------+--------------+-----------------+-----------------+-------------------+------------------+
|          1| united states|    UNITED STATES|    united states|      United States|                13|
|          2|         India|            INDIA|            india|              India|                 5|
|          3|united KINGDOM|   UNITED KINGDOM|   united kingdom|     United Kingdom|                14|
|          4|     AUSTRALIA|        AUSTRALIA|        australia|          Australia|                 9|
+-----------+--------------+-----------------+-----------------+-------------------+------------------+



In [0]:
# * `substring` function takes 3 arguments, **column**, **position**, **length**. We can also provide position from the end by passing negative value.
## substring also take negative value 


In [0]:
from pyspark.sql.functions import substring, lit

In [0]:
employees = [(1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]

df_emp= spark.createDataFrame(employees,schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING,
                    phone_number STRING, ssn STRING""")

In [0]:
print(df_emp.show(4,truncate=False))

## find last 4 digit of phone no 
employeesDF. \
    select("employee_id", "phone_number", "ssn"). \
    withColumn("phone_last4", substring(col("phone_number"), -4, 4).cast("int")). \
    withColumn("ssn_last4", substring(col("ssn"), 8, 4).cast("int")). \
    show()

help(substring)
# # note: substring(str, pos, len) where position -4 position from last start and take 4 values 
## ie from postion it start taking values of certain length 
## ssn are 11 char length 

+-----------+----------+---------+------+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|nationality   |phone_number    |ssn        |
+-----------+----------+---------+------+--------------+----------------+-----------+
|1          |Scott     |Tiger    |1000.0|united states |+1 123 456 7890 |123 45 6789|
|2          |Henry     |Ford     |1250.0|India         |+91 234 567 8901|456 78 9123|
|3          |Nick      |Junior   |750.0 |united KINGDOM|+44 111 111 1111|222 33 4444|
|4          |Bill      |Gomes    |1500.0|AUSTRALIA     |+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+--------------+----------------+-----------+

None
+-----------+----------------+-----------+-----------+---------+
|employee_id|    phone_number|        ssn|phone_last4|ssn_last4|
+-----------+----------------+-----------+-----------+---------+
|          1| +1 123 456 7890|123 45 6789|       7890|     6789|
|          2|+91 234 567 8901|456 78 9123|    

In [0]:
# split takes 2 arguments, column and delimiter.
# split convert each string into array and we can access the elements using index.
## note : split () ===convert stringh to array then can be access via array 
#  split (colName , pattern ,limit ) === no of times regex pattern can be applied
from pyspark.sql.functions import explode, split
employeesDF.select("employee_id", "phone_number", "ssn"). \
    withColumn("area_code", split("phone_number", " ")[1].cast("int")). \
    withColumn("phone_last4", split("phone_number", " ")[3].cast("int")). \
    withColumn("ssn_last4", split("ssn", " ")[2].cast("int")). \
    show()
## access array element s by [] index start from 0 in array *


+-----------+----------------+-----------+---------+-----------+---------+
|employee_id|    phone_number|        ssn|area_code|phone_last4|ssn_last4|
+-----------+----------------+-----------+---------+-----------+---------+
|          1| +1 123 456 7890|123 45 6789|      123|       7890|     6789|
|          2|+91 234 567 8901|456 78 9123|      234|       8901|     9123|
|          3|+44 111 111 1111|222 33 4444|      111|       1111|     4444|
|          4|+61 987 654 3210|789 12 6118|      987|       3210|     6118|
+-----------+----------------+-----------+---------+-----------+---------+



In [0]:
# * Both lpad and rpad, take 3 arguments - column or expression, desired length and the character need to be padded.
## lpad , rpad 
from pyspark.sql.functions import lpad, rpad, concat


In [0]:
from pyspark.sql.functions import lpad, rpad, concat

employees = [(1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]
employeesDF = spark.createDataFrame(employees). \
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )
print(employeesDF.show())

empFixedDF = employeesDF.select(
    concat(
        lpad("employee_id", 5, "0"), 
        rpad("first_name", 10, "-"), 
        rpad("last_name", 10, "-"),
        lpad("salary", 10, "0"), 
        rpad("nationality", 15, "-"), 
        rpad("phone_number", 17, "-"), 
        "ssn"
    ).alias("employee")
)
empFixedDF.show(2, truncate=False)

#  note padding means adding data ata start or end  , syntax pad(colName, totalengthofstringtoshown,chartobepadded )

+-----------+----------+---------+------+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0| united states| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0|         India|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|united KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+--------------+----------------+-----------+

None
+------------------------------------------------------------------------------+
|employee                                                                      |
+------------------------------------------------------------------------------+
|00001Scott-----Tiger-----00001000.0united states--+1 123 456 7

In [0]:
# As of now Spark trim functions take the column as argument and remove leading or trailing spaces. However, we can use expr or selectExpr to use Spark SQL based trim functions to remove leading or trailing spaces or any other such characters.
#     Trim spaces towards left - ltrim
#     Trim spaces towards right - rtrim
#     Trim spaces on both sides - trim
# note:  rtrim(str) - Removes the trailing space characters from `str`
#  ltrim(str) --- remove leading space char from string 
# # if we do not specify trimStr, it will be defaulted to space
## syntax trim('chartoTrim ',columns) === both left and right side 
## rtrim 
#     trim(str) - Removes the leading and trailing space characters from `str`.

#     trim(BOTH FROM str) - Removes the leading and trailing space characters from `str`.

#     trim(LEADING FROM str) - Removes the leading space characters from `str`.

#     trim(TRAILING FROM str) - Removes the trailing space characters from `str`.


from pyspark.sql.functions import ltrim,rtrim,trim 
print(employeesDF.show(4))
employeesDF.withColumn('firstnameFirst 2 char',ltrim('first_name'))

+-----------+----------+---------+------+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0| united states| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0|         India|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|united KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+--------------+----------------+-----------+

None
Out[49]: DataFrame[employee_id: bigint, first_name: string, last_name: string, salary: double, nationality: string, phone_number: string, ssn: string, firstnameFirst 2 char: string]

In [0]:
l = [("   Hello.    ",) ]
df = spark.createDataFrame(l).toDF("dummy")
df.show()
from pyspark.sql.functions import col, ltrim, rtrim, trim,expr
df.withColumn("ltrim", ltrim(col("dummy"))).withColumn("rtrim", rtrim(col("dummy"))).withColumn("trim", trim(col("dummy"))).show()

# if we do not specify trimStr, it will be defaulted to space
df.withColumn("ltrim", expr("ltrim(dummy)")). \
  withColumn("rtrim", expr("rtrim('.', rtrim(dummy))")). \
  withColumn("trim", trim(col("dummy"))). \
  show()

# note : we jhave imported expr 
df.withColumn("ltrim", expr("trim(LEADING ' ' FROM dummy)")). \
  withColumn("rtrim", expr("trim(TRAILING '.' FROM rtrim(dummy))")). \
  withColumn("trim", expr("trim(BOTH ' ' FROM dummy)")). \
  show()

## leading " char" being removed 

+-------------+
|        dummy|
+-------------+
|   Hello.    |
+-------------+

+-------------+----------+---------+------+
|        dummy|     ltrim|    rtrim|  trim|
+-------------+----------+---------+------+
|   Hello.    |Hello.    |   Hello.|Hello.|
+-------------+----------+---------+------+

+-------------+----------+--------+------+
|        dummy|     ltrim|   rtrim|  trim|
+-------------+----------+--------+------+
|   Hello.    |Hello.    |   Hello|Hello.|
+-------------+----------+--------+------+

+-------------+----------+--------+------+
|        dummy|     ltrim|   rtrim|  trim|
+-------------+----------+--------+------+
|   Hello.    |Hello.    |   Hello|Hello.|
+-------------+----------+--------+------+



In [0]:
from pyspark.sql.functions import current_date, current_timestamp,lit,to_date
l = [("X", )]
df = spark.createDataFrame(l).toDF("dummy")
df.select(current_timestamp()).show(truncate=False) #yyyy-MM-dd HH:mm:ss.SSS
df.select(to_date(lit('20210228'), 'yyyyMMdd').alias('to_date')).show()

+-----------------------+
|current_timestamp()    |
+-----------------------+
|2022-12-12 03:33:18.953|
+-----------------------+

+----------+
|   to_date|
+----------+
|2021-02-28|
+----------+



In [0]:
# * Adding days to a date or timestamp - `date_add`
# * Subtracting days from a date or timestamp - `date_sub`
# * Getting difference between 2 dates or timestamps - `datediff`
# * Getting the number of months between 2 dates or timestamps - `months_between`
# * Adding months to a date or timestamp - `add_months`
# * Getting next day from a given date - `next_day`
# * All the functions are self explanatory. We can apply these on standard date or timestamp. All the functions return date even when applied on timestamp field.
# date_add == add days to date or timestamp 
# date_sub === substracting days from date or timestamp 
# datediff === difference between 2 dates 
# add_month === add month to date
# next_date --- get next date to given date 
datetimes = [("2014-02-28", "2014-02-28 10:00:00.123"),
                     ("2016-02-29", "2016-02-29 08:08:08.999"),
                     ("2017-10-31", "2017-12-31 11:59:59.123"),
                     ("2019-11-30", "2019-08-31 00:00:00.000")
                ]
datetimesDF = spark.createDataFrame(datetimes, schema="date STRING, time STRING")
from pyspark.sql.functions import date_add, date_sub

datetimesDF. \
    withColumn("date_add_date", date_add("date", 10)). \
    withColumn("date_add_time", date_add("time", 10)). \
    withColumn("date_sub_date", date_sub("date", 10)). \
    withColumn("date_sub_time", date_sub("time", 10)). \
    show()


+----------+--------------------+-------------+-------------+-------------+-------------+
|      date|                time|date_add_date|date_add_time|date_sub_date|date_sub_time|
+----------+--------------------+-------------+-------------+-------------+-------------+
|2014-02-28|2014-02-28 10:00:...|   2014-03-10|   2014-03-10|   2014-02-18|   2014-02-18|
|2016-02-29|2016-02-29 08:08:...|   2016-03-10|   2016-03-10|   2016-02-19|   2016-02-19|
|2017-10-31|2017-12-31 11:59:...|   2017-11-10|   2018-01-10|   2017-10-21|   2017-12-21|
|2019-11-30|2019-08-31 00:00:...|   2019-12-10|   2019-09-10|   2019-11-20|   2019-08-21|
+----------+--------------------+-------------+-------------+-------------+-------------+



In [0]:
from pyspark.sql.functions import months_between, add_months, round
datetimesDF. \
    withColumn("months_between_date", round(months_between(current_date(), "date"), 2)). \
    withColumn("months_between_time", round(months_between(current_timestamp(), "time"), 2)). \
    withColumn("add_months_date", add_months("date", 3)). \
    withColumn("add_months_time", add_months("time", 3)). \
    show(truncate=False)

+----------+-----------------------+-------------------+-------------------+---------------+---------------+
|date      |time                   |months_between_date|months_between_time|add_months_date|add_months_time|
+----------+-----------------------+-------------------+-------------------+---------------+---------------+
|2014-02-28|2014-02-28 10:00:00.123|105.48             |105.48             |2014-05-28     |2014-05-28     |
|2016-02-29|2016-02-29 08:08:08.999|81.45              |81.45              |2016-05-29     |2016-05-29     |
|2017-10-31|2017-12-31 11:59:59.123|61.39              |59.38              |2018-01-31     |2018-03-31     |
|2019-11-30|2019-08-31 00:00:00.000|36.42              |39.39              |2020-02-29     |2019-11-30     |
+----------+-----------------------+-------------------+-------------------+---------------+---------------+



In [0]:
# * We can use `trunc` or `date_trunc` for the same to get the beginning date of the week, month, current year etc by passing date or timestamp to it.
# * We can use `trunc` to get beginning date of the month or year by passing date or timestamp to it - for example `trunc(current_date(), "MM")` will give the first of the current month.
# * We can use `date_trunc` to get beginning date of the month or year as well as beginning time of the day or hour by passing timestamp to it.
#   * Get beginning date based on month - `date_trunc("MM", current_timestamp())`
#   * Get beginning time based on day - `date_trunc("DAY", current_timestamp())`

In [0]:
from pyspark.sql.functions import date_trunc
datetimesDF. \
    withColumn("date_dt", date_trunc("HOUR", "date")). \
    withColumn("time_dt", date_trunc("HOUR", "time")). \
    withColumn("time_dt1", date_trunc("dd", "time")). \
    show(truncate=False)

##  We can use `trunc` or `date_trunc` for the same to get the beginning date of the week, month, current year etc by passing date or timestamp to it.
## trunc === on time is getting begnning of day,time,month,current ,year

+----------+-----------------------+-------------------+-------------------+-------------------+
|date      |time                   |date_dt            |time_dt            |time_dt1           |
+----------+-----------------------+-------------------+-------------------+-------------------+
|2014-02-28|2014-02-28 10:00:00.123|2014-02-28 00:00:00|2014-02-28 10:00:00|2014-02-28 00:00:00|
|2016-02-29|2016-02-29 08:08:08.999|2016-02-29 00:00:00|2016-02-29 08:00:00|2016-02-29 00:00:00|
|2017-10-31|2017-12-31 11:59:59.123|2017-10-31 00:00:00|2017-12-31 11:00:00|2017-12-31 00:00:00|
|2019-11-30|2019-08-31 00:00:00.000|2019-11-30 00:00:00|2019-08-31 00:00:00|2019-08-31 00:00:00|
+----------+-----------------------+-------------------+-------------------+-------------------+



In [0]:
# * `year`,* `month`,* `weekofyear`,* `dayofyear`, `dayofmonth`,* `dayofweek`,* `hour`,* `minute`,* `second`
l = [("X", )]
df = spark.createDataFrame(l).toDF("dummy")
from pyspark.sql.functions import year, month, weekofyear, dayofmonth,dayofyear, dayofweek, current_date
from pyspark.sql.functions import current_timestamp, hour, minute, second
df.select(
    current_timestamp().alias('current_timestamp'), 
    year(current_timestamp()).alias('year'),
    month(current_timestamp()).alias('month'),
    dayofmonth(current_timestamp()).alias('dayofmonth'),
    hour(current_timestamp()).alias('hour'),
    minute(current_timestamp()).alias('minute'),
    second(current_timestamp()).alias('second')
).show(truncate=False) #yyyy-MM-dd HH:mm:ss.SSS

+-----------------------+----+-----+----------+----+------+------+
|current_timestamp      |year|month|dayofmonth|hour|minute|second|
+-----------------------+----+-----+----------+----+------+------+
|2022-12-12 03:45:22.095|2022|12   |12        |3   |45    |22    |
+-----------------------+----+-----+----------+----+------+------+



In [0]:
# * `yyyy-MM-dd` is the standard date format
# * `yyyy-MM-dd HH:mm:ss.SSS` is the standard timestamp format

from pyspark.sql.functions import col, to_date, to_timestamp
datetimes = [(20140228, "28-Feb-2014 10:00:00.123"),
                     (20160229, "20-Feb-2016 08:08:08.999"),
                     (20171031, "31-Dec-2017 11:59:59.123"),
                     (20191130, "31-Aug-2019 00:00:00.000")
                ]
datetimesDF = spark.createDataFrame(datetimes, schema="date BIGINT, time STRING")
datetimesDF. \
    withColumn('to_date', to_date(col('date').cast('string'), 'yyyyMMdd')). \
    withColumn('to_timestamp', to_timestamp(col('time'), 'dd-MMM-yyyy HH:mm:ss.SSS')). \
    show(truncate=False)

+--------+------------------------+----------+-----------------------+
|date    |time                    |to_date   |to_timestamp           |
+--------+------------------------+----------+-----------------------+
|20140228|28-Feb-2014 10:00:00.123|2014-02-28|2014-02-28 10:00:00.123|
|20160229|20-Feb-2016 08:08:08.999|2016-02-29|2016-02-20 08:08:08.999|
|20171031|31-Dec-2017 11:59:59.123|2017-10-31|2017-12-31 11:59:59.123|
|20191130|31-Aug-2019 00:00:00.000|2019-11-30|2019-08-31 00:00:00    |
+--------+------------------------+----------+-----------------------+



In [0]:
# 

#     We can use date_format to extract the requ
# ired information in a desired format from standard date or timestamp. Earlier we have explored to_date and to_timestamp to convert non standard date or timestamp to standard ones respectively.
from pyspark.sql.functions import date_format


datetimesDF.withColumn("day_name_abbr",to_date(col('date').cast('string'),'yyyyMMdd')).\
            withColumn("day_name_abbr",date_format("day_name_abbr",'EEEE')).show()


### spark 3 : day name in string can be shown with "EEEE"


+--------+--------------------+-------------+
|    date|                time|day_name_abbr|
+--------+--------------------+-------------+
|20140228|28-Feb-2014 10:00...|       Friday|
|20160229|20-Feb-2016 08:08...|       Monday|
|20171031|31-Dec-2017 11:59...|      Tuesday|
|20191130|31-Aug-2019 00:00...|     Saturday|
+--------+--------------------+-------------+



In [0]:
# It is an integer and started from January 1st 1970 Midnight UTC.
# Beginning time is also known as epoch and is incremented by 1 every second.
# We can convert Unix Timestamp to regular date or timestamp and vice versa.
# We can use unix_timestamp to convert regular date or timestamp to a unix timestamp value. For example unix_timestamp(lit("2019-11-19 00:00:00"))
# We can use from_unixtime to convert unix timestamp to regular date or timestamp. For example from_unixtime(lit(1574101800))

In [0]:
from pyspark.sql.functions import unix_timestamp, col,from_unixtime
unixtimes = [(1393561800, ),
             (1456713488, ),
             (1514701799, ),
             (1567189800, )
            ]
unixtimesDF = spark.createDataFrame(unixtimes).toDF("unixtime")
unixtimesDF.show()

unixtimesDF.withColumn("date", from_unixtime("unixtime", "yyyyMMdd")).withColumn("time", from_unixtime("unixtime")). \
    show()
#yyyyMMdd

### from_unixtime ===  to convert unix time to date string 
## iff not specified * Get date in yyyyMMdd format and also complete timestamp.

+----------+
|  unixtime|
+----------+
|1393561800|
|1456713488|
|1514701799|
|1567189800|
+----------+

+----------+--------+-------------------+
|  unixtime|    date|               time|
+----------+--------+-------------------+
|1393561800|20140228|2014-02-28 04:30:00|
|1456713488|20160229|2016-02-29 02:38:08|
|1514701799|20171231|2017-12-31 06:29:59|
|1567189800|20190830|2019-08-30 18:30:00|
+----------+--------+-------------------+



In [0]:
### dealing with nulls 

from pyspark.sql.functions import coalesce
from pyspark.sql.functions import lit


employees = [(1, "Scott", "Tiger", 1000.0, 10,
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, None,
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, '',
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 10,
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]

employeesDF = spark.createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, bonus STRING, nationality STRING,
                    phone_number STRING, ssn STRING"""
                   )

employeesDF.withColumn('bonus', expr("nvl(nullif(bonus, ''), 0)")).show()


employeesDF.withColumn('bonus', expr("nvl(nullif(bonus, ''), 0)")).show()

employeesDF.withColumn('payment', col('salary') + (col('salary') * coalesce(col('bonus').cast('int'), lit(0)) / 100)).show()
### note: expr  null and nullIf will remove  nulls 

+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0|   10| united states| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0|    0|         India|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|    0|united KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|   10|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+

+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+-----+--------------+----------------+

In [0]:
## using case when 
# * `CASE` and `WHEN` is typically used to apply transformations based up on conditions. We can use `CASE` and `WHEN` similar to SQL using `expr` or `selectExpr`.
# * If we want to use APIs, Spark provides functions such as `when` and `otherwise`. `when` is available as part of `pyspark.sql.functions`. On top of column type that is generated using `when` 
# we should be able to invoke `otherwise`.

# note: In spark api  we use when function imported from pyspark.sql.functions 
## .when have otherwise function on columns 
from pyspark.sql.functions import when
from pyspark.sql.functions import coalesce, lit, col
#when(condition, value)
#df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
## when (<conditiontrue >, <do or Assign>)
employees = [(1, "Scott", "Tiger", 1000.0, 10,
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, None,
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, '',
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 10,
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]


employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, bonus STRING, nationality STRING,
                    phone_number STRING, ssn STRING"""
                   )

employeesDF.show()





+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0|   10| united states| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0| null|         India|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|     |united KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|   10|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+



In [0]:
## use of case when in pyspark

employeesDF.withColumn('bonus',when((col('bonus').isNull()) | (col('bonus') == lit('')), 0).otherwise(col('bonus'))).show()

### to handle null and '' values in when 
## when can take multiple conditions with and shown as  & == "|", 




+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0|   10| united states| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0|    0|         India|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|    0|united KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|   10|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+



In [0]:
persons = [
    (1, 1),
    (2, 13),
    (3, 18),
    (4, 60),
    (5, 120),
    (6, 0),
    (7, 12),
    (8, 160)
]

personsDF = spark.createDataFrame(persons, schema='id INT, age INT')


personsDF.withColumn(
        'category',
        when(col('age').between(0, 2), 'New Born').
        when((col('age') > 2) & (col('age') <= 12), 'Infant').
        when((col('age') > 12) & (col('age') <= 48), 'Toddler').
        when((col('age') > 48) & (col('age') <= 144), 'Kid').
        otherwise('Teenager or Adult')
    ).show()


+---+---+-----------------+
| id|age|         category|
+---+---+-----------------+
|  1|  1|         New Born|
|  2| 13|          Toddler|
|  3| 18|          Toddler|
|  4| 60|              Kid|
|  5|120|              Kid|
|  6|  0|         New Born|
|  7| 12|           Infant|
|  8|160|Teenager or Adult|
+---+---+-----------------+



In [0]:
### between (no,no)
personsDF. \
    withColumn(
        'category',
        when(col('age').between(0, 2), 'New Born').
        when((col('age') > 2) & (col('age') <= 12), 'Infant').
        when((col('age') > 12) & (col('age') <= 48), 'Toddler').
        when((col('age') > 48) & (col('age') <= 144), 'Kid').
        otherwise('Teenager or Adult')
    ). \
    show()

+---+---+-----------------+
| id|age|         category|
+---+---+-----------------+
|  1|  1|         New Born|
|  2| 13|          Toddler|
|  3| 18|          Toddler|
|  4| 60|              Kid|
|  5|120|              Kid|
|  6|  0|         New Born|
|  7| 12|           Infant|
|  8|160|Teenager or Adult|
+---+---+-----------------+

