In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
emp_data = [
    (1,'manish', 50000,'IT','m'),
    (2,'vikash', 60000,'sales','m'),
    (3,'raushan',70000,'marketing','m'),
    (4,'mukesh',80000,'IT','m'),
    (5,'priti', 90000,'sales','f'),
    (6,'nikita',45000,'marketing','f'),
    (7,'ragini',55000,'marketing','f'),
    (8,'rashi',100000,'IT','f'),
    (9,'aditya',65000,'IT','m'),
    (10,'rahul',50000,'marketing','m'),
    (11,'rakhi',50000,'IT','f'),
    (12,'akhilesh',90000,'sales','m')
]

mySchema = ["id","name","salary","department","gender"]


emp_df = spark.createDataFrame(emp_data,mySchema)

emp_df.show()

#### rank() , dense_rank() , row_number()

Applying Window Functions

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("department").orderBy(desc("salary"))

row_number() : Ranking the Salary in Each Department

In [0]:
df1 = emp_df.withColumn('Row_number',row_number().over(window_spec))
df1.show()

Lets Use Rank Function to Rank() and Dense_Rank() salary in Each Department

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("Department").orderBy(desc("salary"))

df2 = emp_df.withColumn("Rank",rank().over(window_spec))\
  .withColumn("Dense_Rank",dense_rank().over(window_spec))

df2.show()

##### Partitioning Data On Multiple Columns

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("department","gender").orderBy(desc("salary"))

# OR We can Use
#window_spec = Window.partitionBy(["department","gender"]).orderBy(desc("salary"))

df3 = emp_df.withColumn("Row_number",row_number().over(window_spec))

df3.show()

##### Ordering Data On Multiple-Columns

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("department").orderBy(desc("salary"),desc("id"))

# OR We can Use
#window_spec = Window.partitionBy("department").orderBy([desc("salary"),desc("id")])

df4 = emp_df.withColumn("Row_number",row_number().over(window_spec))
df4.show()


### LEAD - and - LAG

In [0]:
product_data = [
    (1,"iphone","01-01-2023",1500000),
    (2,"samsung","01-01-2023",1100000),
    (3,"oneplus","01-01-2023",1100000),
    (1,"iphone","01-02-2023",1300000),
    (2,"samsung","01-02-2023",1120000),
    (3,"oneplus","01-02-2023",1120000),
    (1,"iphone","01-03-2023",1600000),
    (2,"samsung","01-03-2023",1080000),
    (3,"oneplus","01-03-2023",1160000),
    (1,"iphone","01-04-2023",1700000),
    (2,"samsung","01-04-2023",1800000),
    (3,"oneplus","01-04-2023",1170000),
    (1,"iphone","01-05-2023",1200000),
    (2,"samsung","01-05-2023",980000),
    (3,"oneplus","01-05-2023",1175000),
    (1,"iphone","01-06-2023",1100000),
    (2,"samsung","01-06-2023",1100000),
    (3,"oneplus","01-06-2023",1200000)
]

cols = ["product_id","product_name","sales_date","sales"]

sales_df = spark.createDataFrame(product_data,cols)

sales_df.show()

lag() : Q:> What is The Loss Or Gain As Compared to Previous Month sales for Each Product

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col

window_spec = Window.partitionBy("product_id").orderBy("sales_date")

lag_df = sales_df.withColumn("Loss_Gain",col("sales") - (lag("sales",1).over(window_spec)))

lag_df.show()

lead() : find the Difference between current month and NEXT month sale for each product

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("product_id").orderBy(desc("sales_date"))

df3 = sales_df.withColumn("Sales_diff",col("sales") - lead("sales",1).over(window_spec))

df3.show()

### Range : rowsBetween()
- unboundedPreceding : all rows till current row (including Current Row)
- unboundedFollowing : all rows after/next current row (including Current Row)

Q : Find the Difference in sales, of Each Product from there first month sale to latest sales for each product (there will be only 1 lastest(last) sales date for Each product)

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("product_id").orderBy("sales_date")

# Lets See FIRST month sales and LATEST (or LAST) month sales for Each Product

df4 = sales_df.withColumn('first_month_sale',first('sales').over(window_spec))\
    .withColumn('latest_sales',last('sales').over(window_spec))
df4.show()



Above we can see first_month_sales is Correct but last/latest_month_sales is wrong as latest_sales should be also same for each product. We can Fix it using unboundedFollowing

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

# Lets See FIRST month sales and LATEST (or LAST) month sales for Each Product

df4 = sales_df.withColumn('first_month_sale',first('sales').over(window_spec))\
    .withColumn('latest_sales',last('sales').over(window_spec))
df4.show()

# Now we can calculate Diff

In [0]:
df5 = df4.withColumn('sales_diff',col("first_month_sale") - col("latest_sales")).drop("first_month_sale","latest_sales")
df5.show()

----

#### Q> Send a mail to Employee who came to office but not completed 8 hourse in office

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.window import Window

In [0]:
data = [
    # emp_id, emp_name, check_in, check_out, work_date
    (101, "Rajat", "09:00:00", "11:30:00", "2025-01-02"),
    (101, "Rajat", "12:00:00", "15:00:00", "2025-01-02"),
    (101, "Rajat", "15:30:00", "17:00:00", "2025-01-02"),

    (102, "Amit",  "09:15:00", "13:00:00", "2025-01-02"),
    (102, "Amit",  "14:00:00", "18:30:00", "2025-01-02"),

    (103, "Neha",  "10:00:00", "13:00:00", "2025-01-02"),
    (103, "Neha",  "14:00:00", "16:00:00", "2025-01-02"),

    (104, "Pooja", "09:30:00", "12:30:00", "2025-01-02"),
    (104, "Pooja", "13:00:00", "18:00:00", "2025-01-02"),

    (105, "Arjun", "11:00:00", "14:00:00", "2025-01-02"),
    (105, "Arjun", "15:00:00", "17:00:00", "2025-01-02")
]

mySchema= ["emp_id", "emp_name", "check_in_time", "check_out_time", "work_date"]

df = spark.createDataFrame(data,mySchema)
df.show()

In [0]:
window_spec = Window.partitionBy("emp_id").orderBy("check_in_time").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

temp_df = df.withColumn("in_time", first("check_in_time").over(window_spec))\
    .withColumn("out_time", last("check_out_time").over(window_spec))

temp_df2 = temp_df.withColumn("in_time", to_timestamp("in_time", "HH:mm:ss"))\
    .withColumn("out_time", to_timestamp("out_time", "HH:mm:ss"))

temp_df2 = temp_df2.withColumn("work_hours", (unix_timestamp("out_time") - unix_timestamp("in_time")) / 3600)

temp_df2.show()
temp_df.printSchema()

In [0]:

# Calculate Difference between time

defaulter_df = temp_df2.filter(col('work_hours') < 8).drop("check_in_time","check_out_time","in_time","out_time").distinct()


defaulter_df.show(truncate=False)
defaulter_df.printSchema()

-----

#### Q> Find The performance of sales based on last 3 month average

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.window import Window

In [0]:
product_data = [
    (1,"iphone","01-01-2023",1500000),
    (2,"samsung","01-01-2023",1100000),
    (3,"oneplus","01-01-2023",1100000),
    (1,"iphone","01-02-2023",1300000),
    (2,"samsung","01-02-2023",1120000),
    (3,"oneplus","01-02-2023",1120000),
    (1,"iphone","01-03-2023",1600000),
    (2,"samsung","01-03-2023",1080000),
    (3,"oneplus","01-03-2023",1160000),
    (1,"iphone","01-04-2023",1700000),
    (2,"samsung","01-04-2023",1800000),
    (3,"oneplus","01-04-2023",1170000),
    (1,"iphone","01-05-2023",1200000),
    (2,"samsung","01-05-2023",980000),
    (3,"oneplus","01-05-2023",1175000),
    (1,"iphone","01-06-2023",1100000),
    (2,"samsung","01-06-2023",1100000),
    (3,"oneplus","01-06-2023",1200000)
]

cols = ["product_id","product_name","sales_date","sales"]

sales_df = spark.createDataFrame(product_data,cols)

sales_df.show()

- Lets See Last 3 Month Average and then removing first 2 month record for each record (becoz here avg. is of (2 data) / 3 which is Wrong)

In [0]:
window_spec = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(-2,0)

temp_df = sales_df.withColumn("3_month_sum",sum("sales").over(window_spec))\
    .withColumn("3_month_avg",round(col("3_month_sum")/3,2))

temp_df.show()

In [0]:
# Now Remove first 2 record of Each Product - (Becoz Wrong AVG)

window_spec2 = Window.partitionBy("product_id").orderBy("sales_date")

row_number_df  = temp_df.withColumn("Row_Number",row_number().over(window_spec2)).filter(col("Row_Number") > 2).drop("Row_number")
row_number_df.show()