## Importing Necessary Libraries

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum as _sum, avg as _avg, count, 
    from_unixtime, hour, minute, dayofweek,
    when, round, desc
)
from pyspark.sql.window import Window
import pyspark.sql.functions as F

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 23, Finished, Available, Finished)

## Load Data and View Data

In [22]:
# Stage 1: Load the data
print("\n=== Stage 1: Loading Data ===")
input_path = "Files/data_log/"
task_logs_df = spark.read.parquet(input_path)
print("Initial Schema:")
task_logs_df.printSchema()

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 24, Finished, Available, Finished)


=== Stage 1: Loading Data ===
Initial Schema:
root
 |-- task_id: string (nullable = true)
 |-- project_name: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- task_type: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- status: string (nullable = true)
 |-- hours_logged: long (nullable = true)
 |-- start_time: double (nullable = true)
 |-- end_time: double (nullable = true)



In [23]:
print("\nSample Raw Data:")
display(task_logs_df)

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 25, Finished, Available, Finished)


Sample Raw Data:


SynapseWidget(Synapse.DataFrame, 5d4990ca-19bd-45bd-b071-1d318aad3f48)

In [13]:
task_logs_df.describe().show()

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 15, Finished, Available, Finished)

+-------+--------------------+------------+-----------------+---------+--------+-----------+------------------+--------------------+--------------------+
|summary|             task_id|project_name|      employee_id|task_type|priority|     status|      hours_logged|          start_time|            end_time|
+-------+--------------------+------------+-----------------+---------+--------+-----------+------------------+--------------------+--------------------+
|  count|            16400000|    16400000|         16400000| 16400000|16400000|   16400000|          16400000|            16400000|            16400000|
|   mean|                NULL|        NULL|5499.547062439025|     NULL|    NULL|       NULL| 4.500124756097561|1.737103230721814...|1.737118533136339...|
| stddev|                NULL|        NULL|2598.316682543834|     NULL|    NULL|       NULL|2.2912455739522457| 8.161888136768842E8|  8.16228265201312E8|
|    min|000000c7-1147-4f6...|    American|             1000|   Design|    H

## Convert date Column to Timestamp

In [24]:
# Stage 2: Convert timestamps and add time-based columns
print("\n=== Stage 2: Time Transformations ===")
time_transformed_df = task_logs_df \
    .withColumn("start_datetime", 
                from_unixtime((col("start_time") / 1000)).cast("timestamp")) \
    .withColumn("end_datetime", 
                from_unixtime((col("end_time") / 1000)).cast("timestamp")) \
    .withColumn("day_of_week", dayofweek("start_datetime")) \
    .withColumn("start_hour", hour("start_datetime")) \
    .withColumn("duration_hours", 
                round((col("end_time") - col("start_time")) / (1000 * 3600), 2))

print("\nData with Time Transformations:")
display(time_transformed_df)

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 26, Finished, Available, Finished)


=== Stage 2: Time Transformations ===

Data with Time Transformations:


SynapseWidget(Synapse.DataFrame, 5be46128-8d62-4c24-b7f1-7409b3f63ff1)

## Data Quality Check

In [25]:
# Stage 3: Basic data quality checks
print("\n=== Stage 3: Data Quality Metrics ===")
total_records = time_transformed_df.count()
null_counts = time_transformed_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in time_transformed_df.columns]
)
print("\nNull Counts per Column:")
null_counts.show()

print("\nStatus Distribution:")
time_transformed_df.groupBy("status").count() \
    .withColumn("percentage", round(col("count") * 100 / total_records, 2)) \
    .show()

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 27, Finished, Available, Finished)


=== Stage 3: Data Quality Metrics ===

Null Counts per Column:
+-------+------------+-----------+---------+--------+------+------------+----------+--------+--------------+------------+-----------+----------+--------------+
|task_id|project_name|employee_id|task_type|priority|status|hours_logged|start_time|end_time|start_datetime|end_datetime|day_of_week|start_hour|duration_hours|
+-------+------------+-----------+---------+--------+------+------------+----------+--------+--------------+------------+-----------+----------+--------------+
|      0|           0|          0|        0|       0|     0|           0|         0|       0|             0|           0|          0|         0|             0|
+-------+------------+-----------+---------+--------+------+------------+----------+--------+--------------+------------+-----------+----------+--------------+


Status Distribution:
+-----------+-------+----------+
|     status|  count|percentage|
+-----------+-------+----------+
|  Completed|6

## Create Project Level Analytics Table

In [26]:
# Stage 4: Project-level Analytics
print("\n=== Stage 4: Project Analytics ===")
project_metrics = time_transformed_df \
    .groupBy("project_name") \
    .agg(
        count("task_id").alias("total_tasks"),
        _sum(when(col("status") == "Completed", 1).otherwise(0)).alias("completed_tasks"),
        round(_avg("duration_hours"), 2).alias("avg_duration"),
        round(_sum("hours_logged"), 2).alias("total_hours_logged"),
        round(_avg("hours_logged"), 2).alias("avg_hours_per_task")
    ) \
    .withColumn("completion_rate", 
                round(col("completed_tasks") * 100 / col("total_tasks"), 2))

print("\nProject-level Metrics:")
project_metrics.orderBy(desc("total_tasks")).show(10)

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 28, Finished, Available, Finished)


=== Stage 4: Project Analytics ===

Project-level Metrics:
+------------+-----------+---------------+------------+------------------+------------------+---------------+
|project_name|total_tasks|completed_tasks|avg_duration|total_hours_logged|avg_hours_per_task|completion_rate|
+------------+-----------+---------------+------------+------------------+------------------+---------------+
|          in|      21188|           7005|        4.26|             94619|              4.47|          33.06|
|     benefit|      21022|           7086|        4.26|             94929|              4.52|          33.71|
|     company|      21005|           6907|        4.26|             94630|              4.51|          32.88|
|       visit|      20991|           6941|        4.25|             94273|              4.49|          33.07|
|       along|      20981|           6961|        4.25|             94375|               4.5|          33.18|
|  difference|      20980|           7078|        4.25|     

## Employee Performance Analytics Table

In [27]:
# Stage 5: Employee Performance Analytics
print("\n=== Stage 5: Employee Analytics ===")
employee_metrics = time_transformed_df \
    .groupBy("employee_id") \
    .agg(
        count("task_id").alias("tasks_assigned"),
        _sum(when(col("status") == "Completed", 1).otherwise(0)).alias("tasks_completed"),
        round(_sum("hours_logged"), 2).alias("total_hours_logged"),
        round(_avg("duration_hours"), 2).alias("avg_task_duration")
    ) \
    .withColumn("completion_rate", 
                round(col("tasks_completed") * 100 / col("tasks_assigned"), 2)) \
    .withColumn("productivity_score", 
                round(col("tasks_completed") / col("total_hours_logged"), 2))

print("\nTop Performers by Completion Rate:")
employee_metrics.orderBy(desc("completion_rate")).show(10)

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 29, Finished, Available, Finished)


=== Stage 5: Employee Analytics ===

Top Performers by Completion Rate:
+-----------+--------------+---------------+------------------+-----------------+---------------+------------------+
|employee_id|tasks_assigned|tasks_completed|total_hours_logged|avg_task_duration|completion_rate|productivity_score|
+-----------+--------------+---------------+------------------+-----------------+---------------+------------------+
|       6836|          2209|            824|             10170|             4.28|           37.3|              0.08|
|       3980|          2235|            828|             10064|             4.26|          37.05|              0.08|
|       3838|          2251|            831|             10180|             4.17|          36.92|              0.08|
|       1363|          2269|            837|             10132|             4.15|          36.89|              0.08|
|       9724|          2242|            826|             10184|             4.31|          36.84|           

## Time-based Analysis Table

In [28]:
# Stage 6: Time-based Analysis
print("\n=== Stage 6: Time-based Analysis ===")
time_analysis = time_transformed_df \
    .groupBy("day_of_week", "start_hour") \
    .agg(
        count("task_id").alias("task_count"),
        round(_avg("duration_hours"), 2).alias("avg_duration"),
        round(_sum("hours_logged"), 2).alias("total_hours")
    )

print("\nTask Distribution by Day and Hour:")
time_analysis.orderBy("day_of_week", "start_hour").show(10)

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 30, Finished, Available, Finished)


=== Stage 6: Time-based Analysis ===

Task Distribution by Day and Hour:
+-----------+----------+----------+------------+-----------+
|day_of_week|start_hour|task_count|avg_duration|total_hours|
+-----------+----------+----------+------------+-----------+
|          1|         0|    126947|        4.25|     572029|
|          1|         1|    127315|        4.25|     573516|
|          1|         2|    127547|        4.25|     574348|
|          1|         3|    127260|        4.26|     573257|
|          1|         4|    127273|        4.26|     572315|
|          1|         5|    127002|        4.25|     571768|
|          1|         6|    127243|        4.27|     570870|
|          1|         7|    127266|        4.25|     573265|
|          1|         8|    127583|        4.25|     573500|
|          1|         9|    127476|        4.26|     574784|
+-----------+----------+----------+------------+-----------+
only showing top 10 rows



## Priority-based Analysis

In [29]:
# Stage 7: Priority-based Analysis
print("\n=== Stage 7: Priority Analysis ===")
priority_metrics = time_transformed_df \
    .groupBy("priority") \
    .agg(
        count("task_id").alias("task_count"),
        round(_avg("duration_hours"), 2).alias("avg_duration"),
        round(_sum("hours_logged"), 2).alias("total_hours"),
        round(_avg(when(col("status") == "Completed", 1).otherwise(0)) * 100, 2).alias("completion_rate")
    )

print("\nMetrics by Priority Level:")
priority_metrics.show()

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 31, Finished, Available, Finished)


=== Stage 7: Priority Analysis ===

Metrics by Priority Level:
+--------+----------+------------+-----------+---------------+
|priority|task_count|avg_duration|total_hours|completion_rate|
+--------+----------+------------+-----------+---------------+
|    High|   6666786|        4.25|   30004192|          33.35|
|     Low|   6668672|        4.25|   30009070|          33.34|
|  Medium|   6664542|        4.25|   29991806|          33.32|
+--------+----------+------------+-----------+---------------+



## Save Files as CSV

In [31]:
# Stage 8: Save transformed and analyzed data
print("\n=== Stage 8: Saving Results ===")
output_path = "Files/clean_data_log/"

# Save project metrics
project_metrics \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_path}project_metrics")

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 33, Finished, Available, Finished)


=== Stage 8: Saving Results ===


In [32]:
# Save employee metrics
employee_metrics \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_path}employee_metrics")

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 34, Finished, Available, Finished)

In [33]:
# Save time analysis
time_analysis \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_path}time_analysis")

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 35, Finished, Available, Finished)

In [34]:
# Save priority metrics
priority_metrics \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_path}priority_metrics")

print("\nTransformation and analysis complete. Results saved to:", output_path)

StatementMeta(, 92834a41-b259-4719-b4b0-37758e9094a8, 36, Finished, Available, Finished)


Transformation and analysis complete. Results saved to: Files/clean_data_log/
