# Задание 3


**Создаем сессию с заданным конфигом**


In [29]:
from pyspark.sql import DataFrame, SparkSession, Row


spark = SparkSession.builder.config("spark.driver.memory", "2g").getOrCreate()

---

**Читаем паркеты с данными**


In [30]:
employees = spark.read.parquet("data/employees.parquet")
job_history = spark.read.parquet("data/job_history.parquet")
departments = spark.read.parquet("data/department.parquet")
jobs = spark.read.parquet("data/jobs.parquet")
locations = spark.read.parquet("data/locations.parquet")
countries = spark.read.parquet("data/countries.parquet")
regions = spark.read.parquet("data/regions.parquet")

In [31]:
employees.show(5)

+-----------+----------+---------+--------+------------+----------+-------+-------+--------------+----------+-------------+
|employee_id|first_name|last_name|   email|phone_number| hire_date| job_id| salary|commission_pct|manager_id|department_id|
+-----------+----------+---------+--------+------------+----------+-------+-------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|17.06.2003|AD_PRES|24000.0|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|21.09.2005|  AD_VP|17000.0|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|13.01.2001|  AD_VP|17000.0|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|03.01.2006|IT_PROG| 9000.0|          null|       102|           60|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568|21.05.2007|IT_PROG| 6000.0|          null|       103|           60|
+-------

---

**Импортируем Window, создаем строковую переменную с датой** <br>
(с типом Date адекватно сделать не получилось, поэтому далее костыли через unix timestamp)<br>

**В итоге из employees получаем employees_extended - DF, где собраны только нужные нам столбцы** <br>

**Далее уже на нем используем оконную функцию. Используем сортировку по убыванию на столбце total. Нумеруем строки, чтобы потом фильтром взять только первую из каждого города (ту, где total больше всего среди всех строк, относящихся к этому городу)**<br>

**Выводим результат**


In [32]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

date = "2015-01-01"

employees_extended = (
    employees.join(departments, ["department_id"])
    .join(locations, ["location_id"])
    .withColumn("set_date_string", lit(date))
    .withColumn(
        "set_date",
        to_date(
            from_unixtime(
                unix_timestamp(col("set_date_string"), "yyyy-mm-dd"), "yyyy-mm-dd"
            )
        ),
    )
    .withColumn(
        "hire_date_casted",
        to_date(
            from_unixtime(unix_timestamp(col("hire_date"), "dd.mm.yyyy"), "yyyy-mm-dd")
        ),
    )
    .withColumn(
        "months_diff", floor(months_between(col("set_date"), col("hire_date_casted")))
    )
    .withColumn("total", col("salary") * col("months_diff"))
    .select(
        col("city"),
        col("employee_id"),
        col("hire_date_casted"),
        col("months_diff"),
        col("salary"),
        col("total"),
    )
)

window = Window.partitionBy("city").orderBy(col("total").desc())

(
    employees_extended.withColumn("row", row_number().over(window))
    .filter(col("row") == 1)
    .select(
        col("city"),
        col("employee_id"),
        col("hire_date_casted"),
        col("months_diff"),
        col("salary"),
        col("total"),
    )
).show()

+-------------------+-----------+----------------+-----------+-------+---------+
|               city|employee_id|hire_date_casted|months_diff| salary|    total|
+-------------------+-----------+----------------+-----------+-------+---------+
|             London|        203|      2002-06-07|        150| 6500.0| 975000.0|
|             Munich|        204|      2002-06-07|        150|10000.0|1500000.0|
|             Oxford|        145|      2004-10-01|        123|14000.0|1722000.0|
|            Seattle|        100|      2003-06-17|        138|24000.0|3312000.0|
|South San Francisco|        122|      2003-05-01|        140| 7900.0|1106000.0|
|          Southlake|        103|      2006-01-03|        107| 9000.0| 963000.0|
|            Toronto|        201|      2004-02-17|        130|13000.0|1690000.0|
+-------------------+-----------+----------------+-----------+-------+---------+

