In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructField, 
    StructType,
    StringType,
    IntegerType
)

In [2]:
spark = SparkSession.builder.master("local[2]").appName("DataLemur SQL").getOrCreate()
spark

# Compensation Outliers
#### [DataLemur URL](https://datalemur.com/questions/compensation-outliers)
#### [SQL Solution URL](https://github.com/temmyzeus/SQL-Projects/tree/master/DataLemur#compensation-outliers--httpsdatalemurcomquestionscompensation-outliers)

In [3]:
schema = StructType([
    StructField("employee_id", IntegerType(), nullable=False),
    StructField("salary", IntegerType(), nullable=False),
    StructField("title", StringType(), nullable=False)
])

compensation_df = spark.createDataFrame(
    (
        (101, 80000, "Data Analyst"),
        (102, 90000, "Data Analyst"),
        (103, 100000, "Data Analyst"),
        (104, 30000, "Data Analyst"),
        (105, 120000, "Data Scientist"),
        (106, 100000, "Data Scientist"),
        (107, 80000, "Data Scientist"),
        (108, 310000, "Data Scientist"),
        (109, 60000, "Python Developer"),
        (110, 70000, "Python Developer"),
        (111, 200000, "Python Developer"),
        (112, 25000, "Python Developer")
    ),
    schema=schema
)

In [4]:
compensation_df.show()

+-----------+------+----------------+
|employee_id|salary|           title|
+-----------+------+----------------+
|        101| 80000|    Data Analyst|
|        102| 90000|    Data Analyst|
|        103|100000|    Data Analyst|
|        104| 30000|    Data Analyst|
|        105|120000|  Data Scientist|
|        106|100000|  Data Scientist|
|        107| 80000|  Data Scientist|
|        108|310000|  Data Scientist|
|        109| 60000|Python Developer|
|        110| 70000|Python Developer|
|        111|200000|Python Developer|
|        112| 25000|Python Developer|
+-----------+------+----------------+



## Method 1 (I think this is better)

In [5]:
avg_salaries_per_title = compensation_df.groupBy("title") \
    .agg(
        F.avg("salary").alias("avg_salary_per_title")
    )

compensation_w_avg_salaries_per_title =  compensation_df \
    .join(avg_salaries_per_title, on=["title"], how="left")

compensation_w_status = compensation_w_avg_salaries_per_title.select(
    "employee_id",
    "salary",
    F.when(
        F.col("salary") > (2*F.col("avg_salary_per_title")),
        "Overpaid"
    ).when(
         F.col("salary") < (0.5*F.col("avg_salary_per_title")),
        "Underpaid"
    ).otherwise("Normal").alias("compensation_status")
)

compensation_outlier = compensation_w_status \
    .where(F.col("compensation_status") != "Normal")

compensation_outlier.show()

+-----------+------+-------------------+
|employee_id|salary|compensation_status|
+-----------+------+-------------------+
|        104| 30000|          Underpaid|
|        111|200000|           Overpaid|
|        112| 25000|          Underpaid|
|        108|310000|           Overpaid|
+-----------+------+-------------------+



## Method 2

In [6]:
avg_salaries_per_title = compensation_df.groupBy("title").avg("salary").withColumnRenamed("avg(salary)", "avg_salary_per_title")
compensation_w_avg_salaries_per_title =  compensation_df.join(avg_salaries_per_title, on=["title"], how="left")

compensation_w_status_2 = compensation_w_avg_salaries_per_title.select(
    compensation_w_avg_salaries_per_title.employee_id,
    compensation_w_avg_salaries_per_title.salary,
    F.when(
        compensation_w_avg_salaries_per_title.salary > (2*compensation_w_avg_salaries_per_title.avg_salary_per_title),
        "Overpaid"
    ).when(
         compensation_w_avg_salaries_per_title.salary < (0.5*compensation_w_avg_salaries_per_title.avg_salary_per_title),
        "Underpaid"
    ).otherwise("Normal").alias("status")
)

compensation_outlier = compensation_w_status_2.where(compensation_w_status_2.status != "Normal")
compensation_outlier.show()

+-----------+------+---------+
|employee_id|salary|   status|
+-----------+------+---------+
|        104| 30000|Underpaid|
|        111|200000| Overpaid|
|        112| 25000|Underpaid|
|        108|310000| Overpaid|
+-----------+------+---------+



In [7]:
spark.stop()