In [1]:
import os
import pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType)
from pyspark.sql.functions import (date_add,
                               date_sub,
                               add_months,
                               dayofmonth,
                               last_day,
                               datediff,
                               current_date,
                               date_format,
                               months_between,
                               year,
                               month,
                               round)

In [3]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"
spark_loca = "local[*]"
spark = SparkSession.builder.master(spark_loca).getOrCreate()

In [4]:
# Sample emp data
# this cell will be longer, as add the dummy data but almost similar to real data
emp = [("MR-001", "Alice", "Market Research", 7500, "F", "2017-02-10 9:30:00"),
       ("MR-002", "Bob", "Market Research", 7500, "M", "2018-04-10 9:30:00"),
       ("MR-003", "John", "Market Research", 9500, "M", "2019-01-01 10:30:00"),
       ("M-001", "Jonathan", "Marketing", 4500, "O", "2017-04-10 9:30:00"),
       ("M-002", "Ethan", "Marketing", 6000, "M", "2017-04-10 9:30:00"),
       ("M-003", "Elyra", "Marketing", 7500, "O", "2017-04-10 9:30:00"),
       ("M-004", "Sophia", "Marketing", 8000, "F", "2017-04-10 9:30:00"),
       ("M-005", "Mia", "Marketing", 5000, "F", "2017-04-10 9:30:00"),
       ("M-006", "Ava", "Marketing", 4500, "F", "2018-01-01 9:15:00"),
       ("M-007", "Olivia", "Marketing", 6500, "F", "2019-02-10 10:30:00"),
       ("M-008", "Luca", "Marketing", 3000, "M", "2019-05-01 9:15:00"),
       ("HR-001", "James", "Human Resource", 4500, "M", "2018-10-01 9:30:00"),
       ("HR-002", "Robert", "Human Resource", 5500, "M", "2018-10-01 9:30:00"),
       ("HR-003", "Joseph", "Human Resuorce", 6000, "M", "2019-01-01 10:30:00"),
       ("HR-004", "Mark", "Human Resource", 3500, "O", "2019-02-01 9:30:00"),
       ("HR-005", "Chloe", "Human Resource", 9000, "F", "2019-02-02 9:30:00"),
       ("HR-006", "Hazel", "Human Resource", 5500,  "F", "2019-05-01 9:30:00"),
       ("HR-007", "Zoey", "Human Resource", 4500, "F", "2019-06-01 9:30:00"),
       ("S-001", "Grace", "Sale", 8000, "F", "2017-04-10 9:30:00"),
       ("S-002", "Avery", "Sale", 6000, "F", "2017-04-10 9:30:00"),
       ("S-003", "Charles", "Sale", 5000, "M", "2017-04-10 9:30:00"),
       ("S-004", "Andrew", "Sale", 4500, "M", "2017-04-10 9:30:00"),
       ("S-005", "Paul", "Sale", 9500, "O", "2017-04-10 9:30:00"),
       ("S-006", "Joshua", "Sale", 4500, "M", "2017-04-10 9:30:00"),
       ("S-007", "Steven", "Sale", 5000, "M", "2017-04-10 9:30:00"),
       ("S-008", "Bella", "Sale", 4500, "F", "2017-04-10 9:30:00"),
       ("S-009", "Elena", "Sale", 5000, "X", "2018-02-10 9:30:00"),
       ("S-010", "Skylar", "Sale", 3500, "F", "2018-02-04 9:30:00"),
       ("S-011", "Maya", "Sale", 3500, "F", "2018-05-05 9:30:00"),
       ("S-012", "Sophie", "Sale", 3500, "X", "2018-05-05 9:30:00"),
       ("S-013", "Sara", "Sale", 8000, "F", "2018-06-01 9:30:00"),
       ("S-014", "Ada", "Sale", 5000, "X", "2018-06-01 9:30:00"),
       ("S-015", "Peter", "Sale", 3500, "M", "2018-10-01 9:30:00"),
       ("S-016", "Carl", "Sale", 3500, "M", "2018-10-01 9:30:00"),
       ("S-017", "Terry", "Sale", 3500, "X", "2018-10-01 9:30:00"),
       ("S-018", "Ethan", "Sale", 3500, "X", "2018-11-01 9:30:00"),
       ("S-019", "Ethan", "Sale", 3500, "M", "2018-11-01 9:30:00"),
       ("S-020", "Lola", "Sale", 6500, "F", "2018-11-01 9:30:00"),
       ("CS-001", "Olive", "Customer Service", 6000, "X", "2018-01-01 9:30:00"),
       ("CS-002", "Larry", "Customer Service", 6000, "X", "2018-01-01 9:30:00"),
       ("CS-003", "Jeffrey", "Customer Service", 9000, "X", "2018-01-01 10:30:00"),
       ("CS-004", "Alani", "Customer Service", 4500, "X", "2018-02-01 9:30:00"),
       ("F-001", "Frank", "Finance and Accounting", 9000, "M", "2017-04-10 10:30:00"),
       ("F-002", "Peter", "Finance and Accounting", 11500, "M", "2017-04-10 10:30:00"),
       ("F-003", "Nathan", "Finance and Accounting", 8500, "M", "2017-08-10 10:30:00"),
       ("F-004", "Walter", "Finace and Accounting", 9000, "M", "2018-10-01 10:30:00"),
       ("IT-001", "Roger", "IT", 4500, "M", "2018-12-01 9:30:00"),
       ("IT-002", "Philip", "IT", 4500, "M", "2018-12-01 9:20:00"),
       ("SEC-001", "Gabriel", "Security", 8000, "M", "2017-04-10 10:30:00"),
       ("SEC-002", "Bruce", "Security", 6500, "M", "2017-04-10 10:30:00"),
       ("SEC-003", "Arthur", "Security", 4500, "M", "2018-01-01 9:30:00"),
       ("SEC-004", "Noah", "Security", 4500, "M", "2018-01-01 9:30:00"),
       ("SEC-005", "Jerry", "Security", 3500, "M", "2018-01-01 9:30:00"),
       ("SEC-006", "Adam", "Security", 3500, "M", "2018-05-01 9:30:00")
      ]

In [5]:
# Define the Spark datatye here
labels = [
     ('id', StringType()),
     ('name', StringType()),
     ('dept', StringType()),
     ('salary', IntegerType()),
     ('Sex', StringType()),
     ('hired_date', StringType())
]
schema = StructType([StructField(x[0], x[1], True) for x in labels])

In [6]:
# create dataframe here
emp_df = spark.createDataFrame(data=emp, schema=schema)

In [7]:
emp_df.show(truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

+------+--------+---------------+------+---+-------------------+
|id    |name    |dept           |salary|Sex|hired_date         |
+------+--------+---------------+------+---+-------------------+
|MR-001|Alice   |Market Research|7500  |F  |2017-02-10 9:30:00 |
|MR-002|Bob     |Market Research|7500  |M  |2018-04-10 9:30:00 |
|MR-003|John    |Market Research|9500  |M  |2019-01-01 10:30:00|
|M-001 |Jonathan|Marketing      |4500  |O  |2017-04-10 9:30:00 |
|M-002 |Ethan   |Marketing      |6000  |M  |2017-04-10 9:30:00 |
|M-003 |Elyra   |Marketing      |7500  |O  |2017-04-10 9:30:00 |
|M-004 |Sophia  |Marketing      |8000  |F  |2017-04-10 9:30:00 |
|M-005 |Mia     |Marketing      |5000  |F  |2017-04-10 9:30:00 |
|M-006 |Ava     |Marketing      |4500  |F  |2018-01-01 9:15:00 |
|M-007 |Olivia  |Marketing      |6500  |F  |2019-02-10 10:30:00|
|M-008 |Luca    |Marketing      |3000  |M  |2019-05-01 9:15:00 |
|HR-001|James   |Human Resource |4500  |M  |2018-10-01 9:30:00 |
|HR-002|Robert  |Human Re

                                                                                

In [8]:
# last day of the months
lday_df = emp_df.select("hired_date").withColumn("lastdayofmonth", last_day("hired_date"))

In [9]:
lday_df.show(truncate=False)

+-------------------+--------------+
|hired_date         |lastdayofmonth|
+-------------------+--------------+
|2017-02-10 9:30:00 |2017-02-28    |
|2018-04-10 9:30:00 |2018-04-30    |
|2019-01-01 10:30:00|2019-01-31    |
|2017-04-10 9:30:00 |2017-04-30    |
|2017-04-10 9:30:00 |2017-04-30    |
|2017-04-10 9:30:00 |2017-04-30    |
|2017-04-10 9:30:00 |2017-04-30    |
|2017-04-10 9:30:00 |2017-04-30    |
|2018-01-01 9:15:00 |2018-01-31    |
|2019-02-10 10:30:00|2019-02-28    |
|2019-05-01 9:15:00 |2019-05-31    |
|2018-10-01 9:30:00 |2018-10-31    |
|2018-10-01 9:30:00 |2018-10-31    |
|2019-01-01 10:30:00|2019-01-31    |
|2019-02-01 9:30:00 |2019-02-28    |
|2019-02-02 9:30:00 |2019-02-28    |
|2019-05-01 9:30:00 |2019-05-31    |
|2019-06-01 9:30:00 |2019-06-30    |
|2017-04-10 9:30:00 |2017-04-30    |
|2017-04-10 9:30:00 |2017-04-30    |
+-------------------+--------------+
only showing top 20 rows



In [10]:
# remove the timestamp in dataframe
remove_timestamp = emp_df.select("hired_date").withColumn("date", last_day("hired_date"))

In [11]:
remove_timestamp.show(truncate=False)

+-------------------+----------+
|hired_date         |date      |
+-------------------+----------+
|2017-02-10 9:30:00 |2017-02-28|
|2018-04-10 9:30:00 |2018-04-30|
|2019-01-01 10:30:00|2019-01-31|
|2017-04-10 9:30:00 |2017-04-30|
|2017-04-10 9:30:00 |2017-04-30|
|2017-04-10 9:30:00 |2017-04-30|
|2017-04-10 9:30:00 |2017-04-30|
|2017-04-10 9:30:00 |2017-04-30|
|2018-01-01 9:15:00 |2018-01-31|
|2019-02-10 10:30:00|2019-02-28|
|2019-05-01 9:15:00 |2019-05-31|
|2018-10-01 9:30:00 |2018-10-31|
|2018-10-01 9:30:00 |2018-10-31|
|2019-01-01 10:30:00|2019-01-31|
|2019-02-01 9:30:00 |2019-02-28|
|2019-02-02 9:30:00 |2019-02-28|
|2019-05-01 9:30:00 |2019-05-31|
|2019-06-01 9:30:00 |2019-06-30|
|2017-04-10 9:30:00 |2017-04-30|
|2017-04-10 9:30:00 |2017-04-30|
+-------------------+----------+
only showing top 20 rows



In [12]:
# calcualate the working year in current work
# Equation -> today - hired_date
new_df = (emp_df
    .select("id", "name", "dept","hired_date")
    .withColumn("current_date", current_date())
    .withColumn("work_with_us(day)", datediff("current_date", "hired_date"))
    .withColumn("work_with_us(month)", round(months_between("current_date", "hired_date"),2)))
new_df.show()

+------+--------+---------------+-------------------+------------+-----------------+-------------------+
|    id|    name|           dept|         hired_date|current_date|work_with_us(day)|work_with_us(month)|
+------+--------+---------------+-------------------+------------+-----------------+-------------------+
|MR-001|   Alice|Market Research| 2017-02-10 9:30:00|  2021-10-19|             1712|              56.28|
|MR-002|     Bob|Market Research| 2018-04-10 9:30:00|  2021-10-19|             1288|              42.28|
|MR-003|    John|Market Research|2019-01-01 10:30:00|  2021-10-19|             1022|              33.57|
| M-001|Jonathan|      Marketing| 2017-04-10 9:30:00|  2021-10-19|             1653|              54.28|
| M-002|   Ethan|      Marketing| 2017-04-10 9:30:00|  2021-10-19|             1653|              54.28|
| M-003|   Elyra|      Marketing| 2017-04-10 9:30:00|  2021-10-19|             1653|              54.28|
| M-004|  Sophia|      Marketing| 2017-04-10 9:30:00|  

In [13]:
# substract 10 day from the hired_date from Sale group
sale_df = (emp_df.filter(emp_df['dept']=='Sale')
           .select("id", "name", "dept", "hired_date")
           .withColumn("cheat_ten_days", date_sub("hired_date", 10)))

In [14]:
sale_df.show(truncate=True)

+-----+-------+----+------------------+--------------+
|   id|   name|dept|        hired_date|cheat_ten_days|
+-----+-------+----+------------------+--------------+
|S-001|  Grace|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-002|  Avery|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-003|Charles|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-004| Andrew|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-005|   Paul|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-006| Joshua|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-007| Steven|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-008|  Bella|Sale|2017-04-10 9:30:00|    2017-03-31|
|S-009|  Elena|Sale|2018-02-10 9:30:00|    2018-01-31|
|S-010| Skylar|Sale|2018-02-04 9:30:00|    2018-01-25|
|S-011|   Maya|Sale|2018-05-05 9:30:00|    2018-04-25|
|S-012| Sophie|Sale|2018-05-05 9:30:00|    2018-04-25|
|S-013|   Sara|Sale|2018-06-01 9:30:00|    2018-05-22|
|S-014|    Ada|Sale|2018-06-01 9:30:00|    2018-05-22|
|S-015|  Peter|Sale|2018-10-01 9:30:00|    2018-09-21|
|S-016|   

In [15]:
# add the 5 day to the hired date from HR
hr_df = (emp_df.filter(emp_df['dept']=='Human Resource')
           .select("id", "name", "dept", "hired_date")
           .withColumn("cheat_five_days", date_add("hired_date", 5)))

In [16]:
hr_df.show()

+------+------+--------------+------------------+---------------+
|    id|  name|          dept|        hired_date|cheat_five_days|
+------+------+--------------+------------------+---------------+
|HR-001| James|Human Resource|2018-10-01 9:30:00|     2018-10-06|
|HR-002|Robert|Human Resource|2018-10-01 9:30:00|     2018-10-06|
|HR-004|  Mark|Human Resource|2019-02-01 9:30:00|     2019-02-06|
|HR-005| Chloe|Human Resource|2019-02-02 9:30:00|     2019-02-07|
|HR-006| Hazel|Human Resource|2019-05-01 9:30:00|     2019-05-06|
|HR-007|  Zoey|Human Resource|2019-06-01 9:30:00|     2019-06-06|
+------+------+--------------+------------------+---------------+



In [17]:
# less than 3 years of service
new_df.filter(new_df['work_with_us(month)'] < 36.0).show(truncate=False)

+------+------+---------------+-------------------+------------+-----------------+-------------------+
|id    |name  |dept           |hired_date         |current_date|work_with_us(day)|work_with_us(month)|
+------+------+---------------+-------------------+------------+-----------------+-------------------+
|MR-003|John  |Market Research|2019-01-01 10:30:00|2021-10-19  |1022             |33.57              |
|M-007 |Olivia|Marketing      |2019-02-10 10:30:00|2021-10-19  |982              |32.28              |
|M-008 |Luca  |Marketing      |2019-05-01 9:15:00 |2021-10-19  |902              |29.57              |
|HR-003|Joseph|Human Resuorce |2019-01-01 10:30:00|2021-10-19  |1022             |33.57              |
|HR-004|Mark  |Human Resource |2019-02-01 9:30:00 |2021-10-19  |991              |32.57              |
|HR-005|Chloe |Human Resource |2019-02-02 9:30:00 |2021-10-19  |990              |32.54              |
|HR-006|Hazel |Human Resource |2019-05-01 9:30:00 |2021-10-19  |902      

In [18]:
# more than 4 year service
new_df.filter(new_df['work_with_us(month)'] > 48.0).show(truncate=False)

+-------+--------+----------------------+-------------------+------------+-----------------+-------------------+
|id     |name    |dept                  |hired_date         |current_date|work_with_us(day)|work_with_us(month)|
+-------+--------+----------------------+-------------------+------------+-----------------+-------------------+
|MR-001 |Alice   |Market Research       |2017-02-10 9:30:00 |2021-10-19  |1712             |56.28              |
|M-001  |Jonathan|Marketing             |2017-04-10 9:30:00 |2021-10-19  |1653             |54.28              |
|M-002  |Ethan   |Marketing             |2017-04-10 9:30:00 |2021-10-19  |1653             |54.28              |
|M-003  |Elyra   |Marketing             |2017-04-10 9:30:00 |2021-10-19  |1653             |54.28              |
|M-004  |Sophia  |Marketing             |2017-04-10 9:30:00 |2021-10-19  |1653             |54.28              |
|M-005  |Mia     |Marketing             |2017-04-10 9:30:00 |2021-10-19  |1653             |54.2

In [23]:
# over 4 years and salary less than 6000
salary_df = (emp_df
    .select("id", "name", "dept", 'salary',"hired_date")
    .withColumn("current_date", current_date())
    .withColumn("work_with_us(day)", datediff("current_date", "hired_date"))
    .withColumn("work_with_us(month)", round(months_between("current_date", "hired_date"),2)))
salary_df.filter((salary_df['work_with_us(month)'] > 48.0)
              & (salary_df['salary'] < 6000 )).show(truncate=False)

+-----+--------+---------+------+------------------+------------+-----------------+-------------------+
|id   |name    |dept     |salary|hired_date        |current_date|work_with_us(day)|work_with_us(month)|
+-----+--------+---------+------+------------------+------------+-----------------+-------------------+
|M-001|Jonathan|Marketing|4500  |2017-04-10 9:30:00|2021-10-19  |1653             |54.28              |
|M-005|Mia     |Marketing|5000  |2017-04-10 9:30:00|2021-10-19  |1653             |54.28              |
|S-003|Charles |Sale     |5000  |2017-04-10 9:30:00|2021-10-19  |1653             |54.28              |
|S-004|Andrew  |Sale     |4500  |2017-04-10 9:30:00|2021-10-19  |1653             |54.28              |
|S-006|Joshua  |Sale     |4500  |2017-04-10 9:30:00|2021-10-19  |1653             |54.28              |
|S-007|Steven  |Sale     |5000  |2017-04-10 9:30:00|2021-10-19  |1653             |54.28              |
|S-008|Bella   |Sale     |4500  |2017-04-10 9:30:00|2021-10-19  

In [31]:
# Special bonus for over 4.5 year employee
special_salary = (salary_df
                  .filter((salary_df['work_with_us(month)'] > 52.0))
                  .withColumn("special_bonus", salary_df['salary']*0.2)
                  .withColumn("total salary", salary_df['salary']+salary_df['salary']*0.2))

In [32]:
special_salary.select("id", "name", "salary", "special_bonus", "total salary").show(truncate=False)

+-------+--------+------+-------------+------------+
|id     |name    |salary|special_bonus|total salary|
+-------+--------+------+-------------+------------+
|MR-001 |Alice   |7500  |1500.0       |9000.0      |
|M-001  |Jonathan|4500  |900.0        |5400.0      |
|M-002  |Ethan   |6000  |1200.0       |7200.0      |
|M-003  |Elyra   |7500  |1500.0       |9000.0      |
|M-004  |Sophia  |8000  |1600.0       |9600.0      |
|M-005  |Mia     |5000  |1000.0       |6000.0      |
|S-001  |Grace   |8000  |1600.0       |9600.0      |
|S-002  |Avery   |6000  |1200.0       |7200.0      |
|S-003  |Charles |5000  |1000.0       |6000.0      |
|S-004  |Andrew  |4500  |900.0        |5400.0      |
|S-005  |Paul    |9500  |1900.0       |11400.0     |
|S-006  |Joshua  |4500  |900.0        |5400.0      |
|S-007  |Steven  |5000  |1000.0       |6000.0      |
|S-008  |Bella   |4500  |900.0        |5400.0      |
|F-001  |Frank   |9000  |1800.0       |10800.0     |
|F-002  |Peter   |11500 |2300.0       |13800.0