<a href="https://colab.research.google.com/github/vamshap/PySpark-Challenges/blob/main/EmployeeHours.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.functions import to_date,lead,col
from pyspark.sql.window import Window
from datetime import datetime

# find the total hours spend by an employee at desk and also in office example 1 total hours 9 hrs and at desk 8 hrs
# Initialize Spark Session
spark = SparkSession.builder.appName("SwipeTable").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("activity_type", StringType(), True),
    StructField("activity_time", TimestampType(), True)
])

# Data to insert into the DataFrame
data = [
    (1, 'login', datetime.strptime('2024-07-23 08:00:00', '%Y-%m-%d %H:%M:%S')),
    (1, 'logout', datetime.strptime('2024-07-23 12:00:00', '%Y-%m-%d %H:%M:%S')),
    (1, 'login', datetime.strptime('2024-07-23 13:00:00', '%Y-%m-%d %H:%M:%S')),
    (1, 'logout', datetime.strptime('2024-07-23 17:00:00', '%Y-%m-%d %H:%M:%S')),
    (2, 'login', datetime.strptime('2024-07-23 09:00:00', '%Y-%m-%d %H:%M:%S')),
    (2, 'logout', datetime.strptime('2024-07-23 11:00:00', '%Y-%m-%d %H:%M:%S')),
    (2, 'login', datetime.strptime('2024-07-23 12:00:00', '%Y-%m-%d %H:%M:%S')),
    (2, 'logout', datetime.strptime('2024-07-23 15:00:00', '%Y-%m-%d %H:%M:%S')),
    (1, 'login', datetime.strptime('2024-07-24 08:30:00', '%Y-%m-%d %H:%M:%S')),
    (1, 'logout', datetime.strptime('2024-07-24 12:30:00', '%Y-%m-%d %H:%M:%S')),
    (2, 'login', datetime.strptime('2024-07-24 09:30:00', '%Y-%m-%d %H:%M:%S')),
    (2, 'logout', datetime.strptime('2024-07-24 10:30:00', '%Y-%m-%d %H:%M:%S'))
]

# Create the DataFrame
swipe_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
swipe_df.show()

WindowSpec = Window.partitionBy("employee_id",to_date("activity_time")).orderBy("activity_time")

lead_df = swipe_df.withColumn("LeadTime",lead("activity_time").over(WindowSpec)).filter(col("activity_type")!= "logout")

lead_df.show()

lead_df = lead_df.withColumn("TimeDiff",(lead_df.LeadTime.cast(IntegerType()) - lead_df.activity_time.cast(IntegerType()))/3600)

Login_df = lead_df.groupBy("employee_id",to_date("activity_time")).agg({"TimeDiff":"sum"})
Login_df.show()


+-----------+-------------+-------------------+
|employee_id|activity_type|      activity_time|
+-----------+-------------+-------------------+
|          1|        login|2024-07-23 08:00:00|
|          1|       logout|2024-07-23 12:00:00|
|          1|        login|2024-07-23 13:00:00|
|          1|       logout|2024-07-23 17:00:00|
|          2|        login|2024-07-23 09:00:00|
|          2|       logout|2024-07-23 11:00:00|
|          2|        login|2024-07-23 12:00:00|
|          2|       logout|2024-07-23 15:00:00|
|          1|        login|2024-07-24 08:30:00|
|          1|       logout|2024-07-24 12:30:00|
|          2|        login|2024-07-24 09:30:00|
|          2|       logout|2024-07-24 10:30:00|
+-----------+-------------+-------------------+

+-----------+-------------+-------------------+-------------------+
|employee_id|activity_type|      activity_time|           LeadTime|
+-----------+-------------+-------------------+-------------------+
|          1|        login|