### Import Relevant Libraries and start Spark Session

In [12]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg as _avg, count, when
from pyspark.sql.functions import col, from_unixtime


# Initialize Spark Session if need be. (Not necessary in Fabric)
spark = SparkSession.builder \
    .appName("TaskLogAnalysis") \
    .getOrCreate()

StatementMeta(, 46a8c2d5-f07c-4de6-8ebe-4b99321bd0cf, 14, Finished, Available, Finished)

### Read Task Logs from Lakehouse Filepath

In [13]:
# Define the path to the Lakehouse file 
lakehouse_path = "Files/task_logs/task_logs_2024"  

# Read the Parquet files into a DataFrame
task_logs_df = spark.read.parquet(lakehouse_path)

# Show the data
task_logs_df.show()

StatementMeta(, 46a8c2d5-f07c-4de6-8ebe-4b99321bd0cf, 15, Finished, Available, Finished)

+--------------------+--------------------+--------------------+-------------+-------------+-----------+--------+------------+
|             task_id|             user_id|    task_description|   start_time|     end_time|     status|priority|hours_logged|
+--------------------+--------------------+--------------------+-------------+-------------+-----------+--------+------------+
|3052fe18-182b-4c4...|6b15895f-e655-467...|Explain girl succ...|1733218118000|1733342882000|     Failed|     Low|        7.17|
|ed18a383-fd1f-490...|431602ef-42b0-4ed...|Adult it still fo...|1733195099000|1733336246000|  Completed|    High|        6.96|
|33b19f47-76ea-40f...|070b2904-a255-42c...|Capital drug elec...|1733208970000|1733348187000|  Completed|  Medium|        9.41|
|ce46acd6-1a6d-4fb...|03bf91ea-2bed-499...|Soon address fly ...|1733188024000|1733355327000|  Completed|     Low|        0.59|
|905d1207-f632-4bb...|8b8952cb-f96f-449...|Future return thr...|1733266107000|1733328368000|In Progress|    Hig

### Perform Transformations

#### Transformations:
1. Adjust Timestamp columns to Datetime columns
2. Filter out failed tasks
3. Aggregate total hours, average hours, and task count per day

In [15]:
# 1. Convert Timestamp columns (start_time and end_time) into Datetime columns

# 1A. start_time
transformed_task_logs_df = task_logs_df.withColumn(
    "start_time",
    from_unixtime((col("start_time") / 1000).cast("long")).cast("timestamp")
    )


# 1B. end_time
transformed_task_logs_df = transformed_task_logs_df.withColumn(
    "end_time",
    from_unixtime((col("end_time") / 1000).cast("long")).cast("timestamp")
    )

# View Result:
transformed_task_logs_df.show()

StatementMeta(, 46a8c2d5-f07c-4de6-8ebe-4b99321bd0cf, 17, Finished, Available, Finished)

+--------------------+--------------------+--------------------+-------------------+-------------------+-----------+--------+------------+
|             task_id|             user_id|    task_description|         start_time|           end_time|     status|priority|hours_logged|
+--------------------+--------------------+--------------------+-------------------+-------------------+-----------+--------+------------+
|3052fe18-182b-4c4...|6b15895f-e655-467...|Explain girl succ...|2024-12-03 09:28:38|2024-12-04 20:08:02|     Failed|     Low|        7.17|
|ed18a383-fd1f-490...|431602ef-42b0-4ed...|Adult it still fo...|2024-12-03 03:04:59|2024-12-04 18:17:26|  Completed|    High|        6.96|
|33b19f47-76ea-40f...|070b2904-a255-42c...|Capital drug elec...|2024-12-03 06:56:10|2024-12-04 21:36:27|  Completed|  Medium|        9.41|
|ce46acd6-1a6d-4fb...|03bf91ea-2bed-499...|Soon address fly ...|2024-12-03 01:07:04|2024-12-04 23:35:27|  Completed|     Low|        0.59|
|905d1207-f632-4bb...|8b895

In [16]:
# 2. Filter out failed tasks
completed_tasks_df = transformed_task_logs_df.filter(task_logs_df.status == "Completed")

# View Result
completed_tasks_df.show()


StatementMeta(, 46a8c2d5-f07c-4de6-8ebe-4b99321bd0cf, 18, Finished, Available, Finished)

+--------------------+--------------------+--------------------+-------------------+-------------------+---------+--------+------------+
|             task_id|             user_id|    task_description|         start_time|           end_time|   status|priority|hours_logged|
+--------------------+--------------------+--------------------+-------------------+-------------------+---------+--------+------------+
|ed18a383-fd1f-490...|431602ef-42b0-4ed...|Adult it still fo...|2024-12-03 03:04:59|2024-12-04 18:17:26|Completed|    High|        6.96|
|33b19f47-76ea-40f...|070b2904-a255-42c...|Capital drug elec...|2024-12-03 06:56:10|2024-12-04 21:36:27|Completed|  Medium|        9.41|
|ce46acd6-1a6d-4fb...|03bf91ea-2bed-499...|Soon address fly ...|2024-12-03 01:07:04|2024-12-04 23:35:27|Completed|     Low|        0.59|
|884ae8b3-ff2b-43b...|b56e1c5a-5ec9-455...|He sell staff mis...|2024-12-03 11:24:33|2024-12-04 17:37:12|Completed|  Medium|         9.3|
|3812b285-3414-429...|728f5765-eac7-44f..

In [17]:
# 3. Aggregate: Total and average hours logged, task count by priority and day
aggregated_df = completed_tasks_df.groupBy(
    col("priority"), col("start_time").cast("date").alias("task_date")
).agg(
    _sum("hours_logged").alias("total_hours_logged"),
    _avg("hours_logged").alias("avg_hours_logged"),
    count("task_id").alias("task_count")
)

# Show the aggregated results
aggregated_df.show(10)

StatementMeta(, 46a8c2d5-f07c-4de6-8ebe-4b99321bd0cf, 19, Finished, Available, Finished)

+--------+----------+------------------+------------------+----------+
|priority| task_date|total_hours_logged|  avg_hours_logged|task_count|
+--------+----------+------------------+------------------+----------+
|     Low|2024-12-03| 692289.9699999993| 6.248950399422298|    110785|
|    High|2024-12-03| 695464.5499999982|  6.25704729687175|    111149|
|  Medium|2024-12-03| 697177.3500000004| 6.249965037786089|    111549|
|     Low|2024-12-15|  692116.900000003| 6.235736809860198|    110992|
|  Medium|2024-12-15| 698196.0900000087| 6.260893765076255|    111517|
|    High|2024-12-15| 693512.0899999993| 6.253434053795721|    110901|
|  Medium|2024-12-10| 696134.6200000098|6.2555342684867385|    111283|
|     Low|2024-12-10| 692429.4900000069| 6.259136467589348|    110627|
|    High|2024-12-10|  694436.470000008| 6.241957250591067|    111253|
|  Medium|2024-12-13| 695082.5199999958| 6.229677708467733|    111576|
+--------+----------+------------------+------------------+----------+
only s

### Save file as `CSV`

In [23]:
# Define output path for the CSV
output_csv_path = "Files/processed_logs/processed_task_logs_2024/"

# Write aggregated results to a CSV file
aggregated_df.coalesce(1).write.option("header", "true").mode("overwrite").csv(output_csv_path)       

StatementMeta(, 46a8c2d5-f07c-4de6-8ebe-4b99321bd0cf, 25, Finished, Available, Finished)