Условие: дана таблица с колонками (id, name, salary, managerId), студентам необходимо написать код на spark, который создаст эту таблицу (данные указаны ниже) и в результате выдаст таблицу в которой будут имена сотрудников, которые зарабатывают больше своих менеджеров.

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

In [2]:
spark = SparkSession.builder.appName("employee_salary_analysis").getOrCreate()

In [10]:
schema = StructType([
    StructField("Id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("Salary", StringType(), nullable=True),
    StructField("managerId", StringType(), nullable=True)
])

users_data = [
    (1, "Joe", 70, 3),
    (2, "Henry", 80, 4),
    (3, "Sam", 60, "null"),
    (4, "Max", 90, "Null")
]

users_df = spark.createDataFrame(users_data, schema)
users_df.show()

+---+-----+------+---------+
| Id| name|Salary|managerId|
+---+-----+------+---------+
|  1|  Joe|    70|        3|
|  2|Henry|    80|        4|
|  3|  Sam|    60|     null|
|  4|  Max|    90|     Null|
+---+-----+------+---------+



In [11]:
users_df = users_df.na.replace(["null", "Null"], [None, None], "managerId")
users_df = users_df.toDF(*[c.lower() for c in users_df.columns])
users_df.show()

+---+-----+------+---------+
| id| name|salary|managerid|
+---+-----+------+---------+
|  1|  Joe|    70|        3|
|  2|Henry|    80|        4|
|  3|  Sam|    60|     NULL|
|  4|  Max|    90|     NULL|
+---+-----+------+---------+



In [12]:
numerical_columns = ["id", "salary", "managerid"]

for col_name in numerical_columns:
    users_df = users_df.withColumn(col_name, col(col_name).cast(IntegerType()))

users_df.show()


+---+-----+------+---------+
| id| name|salary|managerid|
+---+-----+------+---------+
|  1|  Joe|    70|        3|
|  2|Henry|    80|        4|
|  3|  Sam|    60|     NULL|
|  4|  Max|    90|     NULL|
+---+-----+------+---------+



In [14]:
users_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = false)
 |-- salary: integer (nullable = true)
 |-- managerid: integer (nullable = true)



In [18]:
joined_df = users_df.alias("emp").join(
    users_df.alias("mgr"),
    col("emp.managerid") == col("mgr.id"),
    "inner"
)
joined_df.show()

+---+-----+------+---------+---+----+------+---------+
| id| name|salary|managerid| id|name|salary|managerid|
+---+-----+------+---------+---+----+------+---------+
|  1|  Joe|    70|        3|  3| Sam|    60|     NULL|
|  2|Henry|    80|        4|  4| Max|    90|     NULL|
+---+-----+------+---------+---+----+------+---------+



In [19]:
result_df = joined_df.select(
    col("emp.name").alias("employee_name"),
    col("emp.Salary").alias("emp_salary"),
    col("mgr.Salary").alias("mgr_salary")
).where(col("emp_salary") > col("mgr_salary"))

result_df.select("employee_name").show()

spark.stop()

+-------------+
|employee_name|
+-------------+
|          Joe|
+-------------+

