In [1]:
import pandas as pd
from pyspark.sql.dataframe import DataFrame as spark_DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
import numpy as np

In [2]:
spark = SparkSession.builder.appName('Leetcode').getOrCreate()

25/01/01 19:46:39 WARN Utils: Your hostname, madiv resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlo1)
25/01/01 19:46:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/01 19:46:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark_Employee = spark.read.option('header', True).option('delimiter', ',').csv('Employee.csv')
pandas_Employee = pd.read_csv('Employee.csv')

                                                                                

In [4]:
spark_Employee.show(100)

+---+-----+----------+---------+
| id| name|department|managerId|
+---+-----+----------+---------+
|101| John|         A|     null|
|102|  Dan|         A|      101|
|103|James|         A|      101|
|104|  Amy|         A|      101|
|105| Anne|         A|      101|
|106|  Ron|         B|      101|
+---+-----+----------+---------+



In [5]:
pandas_Employee.head(100)

Unnamed: 0,id,name,department,managerId
0,101,John,A,
1,102,Dan,A,101.0
2,103,James,A,101.0
3,104,Amy,A,101.0
4,105,Anne,A,101.0
5,106,Ron,B,101.0


### SQL

In [6]:
def find_managers(employee: spark_DataFrame) -> spark_DataFrame:
    # Create a temporary view of the Employee DataFrame for use in SQL queries
    employee.createOrReplaceTempView('Employee')

    # Define the SQL query to find managers with at least 5 direct reports
    sqlQuery = \
    '''
    -- Select the names of employees who manage at least 5 others
    SELECT e1.name
    FROM Employee e1
    -- Join the Employee table with itself to link managers (e1) to their direct reports (e2)
    INNER JOIN Employee e2
    ON e1.id = e2.managerId
    -- Group the results by manager (e1.name) and manager's ID (e2.managerId) to count their direct reports
    GROUP BY e1.name, e2.managerId
    -- Filter the managers who have 5 or more direct reports
    HAVING COUNT(*) >= 5
    '''

    # Execute the SQL query on the Spark session and store the result in `output`
    output = spark.sql(sqlQuery)
    
    # Return the result as a Spark DataFrame
    return output

output = find_managers(employee = spark_Employee)
output.show()

+----+
|name|
+----+
|John|
+----+



### PySpark

In [7]:
def find_managers(employee: spark_DataFrame) -> spark_DataFrame:
    # Step 1: Perform a self-join on the Employee DataFrame to link employees with their managers
    # 'e1' represents the employee (manager in this case)
    # 'e2' represents the employee's direct reports
    join_result = employee.alias('e1').join(
        employee.alias('e2'), 
        on = (col('e1.id') == col('e2.managerId')),  # Join condition: e1.id = e2.managerId
        how = 'inner'  # Inner join, as we want only employees with managers
    ) \
    .select(
        col('e1.name').alias('manager_name'),  # Selecting manager's name from 'e1'
        col('e2.managerId')  # Selecting the managerId from 'e2' (direct reports)
    )

    # Step 2: Group the results by manager's name and managerId
    # We need to count the number of direct reports for each manager
    grouped_result = join_result.groupby(
        'manager_name', 'managerId'  # Group by manager's name and managerId
    ).agg(
        count('managerId').alias('direct_reports_count')  # Count the number of direct reports
    )

    # Step 3: Filter to keep only the managers who have 5 or more direct reports
    filtered_result = grouped_result.filter(
        col('direct_reports_count') >= 5  # Filter condition: direct_reports_count >= 5
    )

    # Step 4: Select only the manager names (those who have 5 or more direct reports)
    final_result = filtered_result.select('manager_name')

    # Return the final DataFrame containing the names of the managers
    return final_result

output = find_managers(employee = spark_Employee)
output.show()

+------------+
|manager_name|
+------------+
|        John|
+------------+



### Pandas

In [8]:
def find_managers(employee: pd.DataFrame) -> pd.DataFrame:
    # Step 1: Create a DataFrame `e1` which is the original employee DataFrame with the 'managerId' column dropped
    # 'e1' will represent employees (not managers) in the merge
    e1 = employee.drop(columns = ['managerId'])

    # Step 2: Create a DataFrame `e2` where the 'id' column is dropped, and 'managerId' is renamed to 'id'
    # 'e2' will represent the direct reports of the managers
    e2 = employee.drop(columns = ['id']).rename(columns = {'managerId': 'id'})

    # Step 3: Perform an inner merge (join) of `e1` and `e2` on 'id' column (manager's 'id' and employee's 'managerId')
    # The merge links employees (e1) to their direct reports (e2) based on matching 'id' (manager's ID)
    join = e1.merge(e2, on = 'id', how = 'inner', suffixes = ['_e1', '_e2'])

    # Step 4: Group the merged DataFrame by manager's 'id' (from the 'e1' part of the DataFrame)
    # Count the number of direct reports (i.e., size of each group) for each manager and reset the index
    manager = join.groupby(['id']).size().reset_index(name = 'managerCount')

    # Step 5: Filter out the managers who have fewer than 5 direct reports
    # `managerCount` is the number of direct reports, so we select managers with at least 5 direct reports
    managerIds = manager.id[manager['managerCount'] >= 5]

    # Step 6: Create a list `names` to store the names of the managers who have at least 5 direct reports
    # Loop through the `managerIds` and fetch the unique names for each qualifying manager
    names = []
    for managerId in managerIds:
        # Get the names of employees whose 'id' matches the current managerId
        names += list(np.unique(employee.name[employee['id'] == managerId]))

    # Step 7: Return the result as a new DataFrame with the column 'name' for the managers
    return pd.DataFrame(names, columns = ['name'])

output = find_managers(employee = pandas_Employee)
output.head()

Unnamed: 0,name
0,John
