In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('LeetCode').getOrCreate()

24/12/27 20:26:36 WARN Utils: Your hostname, madiv resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlo1)
24/12/27 20:26:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/27 20:26:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/27 20:26:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark_Employee = spark.read.option('header', True).option('delimiter', ',').csv('Employee.csv')
spark_Bonus = spark.read.option('header', True).option('delimiter', ',').csv('Bonus.csv')

spark_Employee.show()
spark_Bonus.show()

+-----+------+----------+------+
|empId|  name|supervisor|salary|
+-----+------+----------+------+
|    3|  Brad|      null|  4000|
|    1|  John|         3|  1000|
|    2|   Dan|         3|  2000|
|    4|Thomas|         3|  4000|
+-----+------+----------+------+

+-----+-----+
|empId|bonus|
+-----+-----+
|    2|  500|
|    4| 2000|
+-----+-----+



In [4]:
pandas_Employee = pd.read_csv('Employee.csv')
pandas_Bonus = pd.read_csv('Bonus.csv')

In [5]:
pandas_Employee.head()

Unnamed: 0,empId,name,supervisor,salary
0,3,Brad,,4000
1,1,John,3.0,1000
2,2,Dan,3.0,2000
3,4,Thomas,3.0,4000


In [6]:
pandas_Bonus.head()

Unnamed: 0,empId,bonus
0,2,500
1,4,2000


### SQL

In [11]:
def employee_bonus(employee: pyspark.sql.dataframe.DataFrame,
                   bonus: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    # Register both DataFrames (employee and bonus) as temporary SQL views for SQL querying.
    # This step allows us to use SQL queries directly on the DataFrames.
    employee.createOrReplaceTempView('Employee')
    bonus.createOrReplaceTempView('Bonus')
    
    # Define the SQL query to join the 'Employee' and 'Bonus' tables.
    # The query selects employees who have either:
    # 1. A bonus less than 1000.
    # 2. No bonus (NULL bonus).
    sqlQuery = '''
    SELECT 
        e.name, b.bonus  -- Select employee's name and the corresponding bonus
    FROM Employee e  -- From the 'Employee' table (aliased as e)
    LEFT JOIN        -- Perform a LEFT JOIN to include all employees, even those without a bonus
    Bonus b          -- Join with the 'Bonus' table (aliased as b)
    ON e.empId = b.empId  -- The join condition: match employees with their bonus using empId
    WHERE b.bonus < 1000 or b.bonus IS NULL  -- Only select employees with bonus less than 1000 or no bonus
    '''
    
    # Execute the SQL query using Spark's SQL execution engine.
    # This returns a DataFrame with the names of employees who either have a bonus < 1000 or no bonus at all.
    names_With_Less_Or_No_Bonus = spark.sql(sqlQuery)  # Fix here: Use sqlQuery directly as an argument
    
    # Return the DataFrame containing employees' names with less or no bonus.
    return names_With_Less_Or_No_Bonus

names_With_Less_Or_No_Bonus = employee_bonus(employee = spark_Employee, bonus = spark_Bonus)
names_With_Less_Or_No_Bonus.show()

+----+-----+
|name|bonus|
+----+-----+
|Brad| null|
|John| null|
| Dan|  500|
+----+-----+



### PySpark

In [13]:
def employee_bonus(employee: pyspark.sql.dataframe.DataFrame,
                   bonus: pyspark.sql.dataframe.DataFrame ) -> pyspark.sql.dataframe.DataFrame:
    # Perform a LEFT JOIN between the 'employee' DataFrame and 'bonus' DataFrame based on 'empId'.
    # A LEFT JOIN ensures all employees will be included, even those without a bonus.
    names_With_Less_Or_No_Bonus = employee.join(
        bonus,                    # Joining the 'bonus' DataFrame
        on = 'empId',             # The condition for the join: matching 'empId' from both DataFrames
        how = 'left'              # Perform a LEFT JOIN to include employees even if they don't have a bonus
    )\
    .filter(
        # Filter employees to select only those with a 
        # bonus less than 1000 or no bonus at all (NULL).
        (bonus.bonus < 1000) | (bonus.bonus.isNull())
    )\
    .select(
        # Select only the 'name' and 'bonus' 
        # columns from the resulting DataFrame.
        ['name', 'bonus'] 
    )    
    
    # Return the resulting DataFrame with employees' names and their bonus (if they qualify).
    return names_With_Less_Or_No_Bonus

names_With_Less_Or_No_Bonus = employee_bonus(employee = spark_Employee, bonus = spark_Bonus)
names_With_Less_Or_No_Bonus.show()

+----+-----+
|name|bonus|
+----+-----+
|Brad| null|
|John| null|
| Dan|  500|
+----+-----+



### Pandas

In [14]:
def employee_bonus(employee: pd.DataFrame, bonus: pd.DataFrame) -> pd.DataFrame:
    # Perform a LEFT JOIN between the 'employee' DataFrame and the 'bonus' DataFrame based on 'empId'.
    # This ensures that all employees are included, even if they don't have a corresponding bonus.
    tables_Join = employee.merge(bonus, on = ['empId'], how = 'left')
    
    # Filter the rows where the bonus is either:
    # 1. Less than 1000 (bonus < 1000), or
    # 2. NULL (i.e., no bonus) using `isna()` method.
    # This will create a DataFrame of employees with low or no bonuses.
    data_With_Less_Or_No_Bonus = tables_Join[(tables_Join['bonus'] < 1000) | (tables_Join['bonus'].isna())]
    
    # Select only the 'name' and 'bonus' columns from the filtered DataFrame.
    # This will create a final DataFrame with employee names and their corresponding bonus (if available).
    names_With_Less_Or_No_Bonus = data_With_Less_Or_No_Bonus[['name', 'bonus']]
    
    # Return the DataFrame containing employees with less or no bonus.
    return names_With_Less_Or_No_Bonus

names_With_Less_Or_No_Bonus = employee_bonus(employee = pandas_Employee, bonus = pandas_Bonus)
names_With_Less_Or_No_Bonus.head()

Unnamed: 0,name,bonus
0,Brad,
1,John,
2,Dan,500.0
