In [13]:
# from pyspark.sql.functions import to_date,datediff,lit,udf,sum,avg,col,count,lag
# from pyspark.sql.types import StringType,LongType,StructType,StructField,DateType,IntegerType,DoubleType
# from datetime import datetime
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    
    ('oracle','Sales',1,'12-05-2020','12-06-2025',1000),
    ('oracle','Sales',2,'03-07-2020','20-08-2023',1200),
    ('oracle','Finance',3,'02-08-2020','12-10-2024', 3000),
    ('oracle','Finance',4,'02-09-2020','17-10-2022', 1800),
    ('microsoft','Finance', 1,'02-05-2020','02-07-2023', 1400),
    ('microsoft','Sales',2,'16-05-2020','16-06-2025', 5000),
    ('microsoft','Finance',3,'02-07-2020','02-08-2026', 1600),
    ('microsoft','Sales',4,'05-09-2020','23-09-2027', 2600)],
    
    ['company','department','employee_id','employee_joining_date','employee_leave_date','employee_salary']
)
df.show()

+---------+----------+-----------+---------------------+-------------------+---------------+
|  company|department|employee_id|employee_joining_date|employee_leave_date|employee_salary|
+---------+----------+-----------+---------------------+-------------------+---------------+
|   oracle|     Sales|          1|           12-05-2020|         12-06-2025|           1000|
|   oracle|     Sales|          2|           03-07-2020|         20-08-2023|           1200|
|   oracle|   Finance|          3|           02-08-2020|         12-10-2024|           3000|
|   oracle|   Finance|          4|           02-09-2020|         17-10-2022|           1800|
|microsoft|   Finance|          1|           02-05-2020|         02-07-2023|           1400|
|microsoft|     Sales|          2|           16-05-2020|         16-06-2025|           5000|
|microsoft|   Finance|          3|           02-07-2020|         02-08-2026|           1600|
|microsoft|     Sales|          4|           05-09-2020|         23-09

In [14]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- department: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- employee_joining_date: string (nullable = true)
 |-- employee_leave_date: string (nullable = true)
 |-- employee_salary: long (nullable = true)



In [15]:
from pyspark.sql.functions import to_date

df = df.withColumn('employee_joining_date', to_date(df['employee_joining_date'],format='dd-MM-yyyy'))
df = df.withColumn('employee_leave_date', to_date(df['employee_leave_date'],format='dd-MM-yyyy'))

In [16]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- department: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- employee_joining_date: date (nullable = true)
 |-- employee_leave_date: date (nullable = true)
 |-- employee_salary: long (nullable = true)



## how much company spent on employee salary over time

In [17]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy('company').orderBy('employee_joining_date').rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [18]:
from pyspark.sql import functions as func
df = df.withColumn('spending_over_emp_salary',func.sum('employee_salary').over(window_spec))

In [19]:
df.show()

+---------+----------+-----------+---------------------+-------------------+---------------+------------------------+
|  company|department|employee_id|employee_joining_date|employee_leave_date|employee_salary|spending_over_emp_salary|
+---------+----------+-----------+---------------------+-------------------+---------------+------------------------+
|microsoft|   Finance|          1|           2020-05-02|         2023-07-02|           1400|                    1400|
|microsoft|     Sales|          2|           2020-05-16|         2025-06-16|           5000|                    6400|
|microsoft|   Finance|          3|           2020-07-02|         2026-08-02|           1600|                    8000|
|microsoft|     Sales|          4|           2020-09-05|         2027-09-23|           2600|                   10600|
|   oracle|     Sales|          1|           2020-05-12|         2025-06-12|           1000|                    1000|
|   oracle|     Sales|          2|           2020-07-03|

## how much companies spend over departments

In [28]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy('department').orderBy('employee_joining_date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

In [29]:
from pyspark.sql import functions as func

df = df.withColumn('dept_level_expenditure', func.sum('employee_salary').over(window_spec))

In [30]:
df.select('company','department','employee_joining_date','dept_level_expenditure').show()

+---------+----------+---------------------+----------------------+
|  company|department|employee_joining_date|dept_level_expenditure|
+---------+----------+---------------------+----------------------+
|   oracle|     Sales|           2020-05-12|                  9800|
|microsoft|     Sales|           2020-05-16|                  9800|
|   oracle|     Sales|           2020-07-03|                  9800|
|microsoft|     Sales|           2020-09-05|                  9800|
|microsoft|   Finance|           2020-05-02|                  7800|
|microsoft|   Finance|           2020-07-02|                  7800|
|   oracle|   Finance|           2020-08-02|                  7800|
|   oracle|   Finance|           2020-09-02|                  7800|
+---------+----------+---------------------+----------------------+



## how companies spend over departments increase over time

In [32]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy('department').orderBy('employee_joining_date').rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [34]:
from pyspark.sql import functions as func

df = df.withColumn('increase_in_dept_level_expenditure', func.sum('employee_salary').over(window_spec))

In [36]:
df.select('company','department','employee_joining_date','increase_in_dept_level_expenditure').show()

+---------+----------+---------------------+----------------------------------+
|  company|department|employee_joining_date|increase_in_dept_level_expenditure|
+---------+----------+---------------------+----------------------------------+
|   oracle|     Sales|           2020-05-12|                              1000|
|microsoft|     Sales|           2020-05-16|                              6000|
|   oracle|     Sales|           2020-07-03|                              7200|
|microsoft|     Sales|           2020-09-05|                              9800|
|microsoft|   Finance|           2020-05-02|                              1400|
|microsoft|   Finance|           2020-07-02|                              3000|
|   oracle|   Finance|           2020-08-02|                              6000|
|   oracle|   Finance|           2020-09-02|                              7800|
+---------+----------+---------------------+----------------------------------+



## how many employee were working with company between employees joining date and left date

In [49]:
from pyspark.sql.functions import datediff, lit, sum

df = df.withColumn('window_size_helper',lit(1))

In [60]:
from pyspark.sql import functions as func

left_before_window = Window.partitionBy('company').orderBy('employee_leave_date').rowsBetween(Window.unboundedPreceding, Window.currentRow)

n_employee_in_cmpny_window = Window.partitionBy('company').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

In [63]:
df = df.withColumn('n_employees',func.sum('window_size_helper').over(n_employee_in_cmpny_window) - func.sum('window_size_helper').over(left_before_window) + 1)

In [64]:
df.orderBy(['employee_leave_date']).select('company','employee_joining_date','employee_leave_date','n_days_working','n_employees').show()

+---------+---------------------+-------------------+--------------+-----------+
|  company|employee_joining_date|employee_leave_date|n_days_working|n_employees|
+---------+---------------------+-------------------+--------------+-----------+
|   oracle|           2020-09-02|         2022-10-17|           775|          4|
|microsoft|           2020-05-02|         2023-07-02|          1156|          4|
|   oracle|           2020-07-03|         2023-08-20|          1143|          3|
|   oracle|           2020-08-02|         2024-10-12|          1532|          2|
|   oracle|           2020-05-12|         2025-06-12|          1857|          1|
|microsoft|           2020-05-16|         2025-06-16|          1857|          3|
|microsoft|           2020-07-02|         2026-08-02|          2222|          2|
|microsoft|           2020-09-05|         2027-09-23|          2574|          1|
+---------+---------------------+-------------------+--------------+-----------+



In [65]:
## how many employees have salary in the range of +-1000 of the current employee at a company

In [70]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy('company').orderBy('employee_salary').rangeBetween(-1000,+1000)

# -1 to minus current employee
df = df.withColumn('n_employees_with_salary+-1000',func.sum('window_size_helper').over(window_spec) - 1)

In [71]:
df.select('company','employee_salary','n_employees_with_salary+-1000').show()

+---------+---------------+-----------------------------+
|  company|employee_salary|n_employees_with_salary+-1000|
+---------+---------------+-----------------------------+
|microsoft|           1400|                            1|
|microsoft|           1600|                            2|
|microsoft|           2600|                            1|
|microsoft|           5000|                            0|
|   oracle|           1000|                            2|
|   oracle|           1200|                            2|
|   oracle|           1800|                            2|
|   oracle|           3000|                            0|
+---------+---------------+-----------------------------+

