In [4]:
# 1.Импорт библиотек
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import when
import time

In [5]:
# 2. Инициализация Spark-сессии - create local SparkSession

spark = (SparkSession.builder
         .appName("Data Salaries")
         .getOrCreate())

In [6]:
# 3. read csv with inferschema
file_path = "/home/jovyan/work/ds_salaries.csv"

t0 = time.time()
df_infer = (spark.read
         .csv(
             file_path,
             header=True,         
             inferSchema=True,    
             sep=","              
         ))
t1 = time.time()
print(f"Read with inferSchema duration: {t1 - t0:.3f} sec")

Read with inferSchema duration: 3.559 sec


In [7]:
# 4. показать повторное чтение - read csv one more time with the same code and you will see that it almostly don't take time,
# because info already in SparkSession and it will not read nothing from this file

t0 = time.time()
df_infer2 = (spark.read
             .option("header", True)
             .option("sep", ",")
             .option("inferSchema", True)
             .csv(file_path))
t1 = time.time()
print(f"Second read with inferSchema duration: {t1 - t0:.3f} sec")


Second read with inferSchema duration: 0.263 sec


In [8]:
# 5. write schema of csv on screen
df_infer.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



In [9]:
# 6. create schema of this csv - схему задали 
schema = StructType([
    StructField("work_year", IntegerType(), True),
    StructField("experience_level", StringType(), True),
    StructField("employment_type", StringType(), True),
    StructField("job_title", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("salary_currency", StringType(), True),
    StructField("salary_in_usd", IntegerType(), True),
    StructField("employee_residence", StringType(), True),
    StructField("remote_ratio", IntegerType(), True),
    StructField("company_location", StringType(), True),
    StructField("company_size", StringType(), True),
])

In [10]:
# 7.  restart kernel without cleaning output and after restarting you need to initialize SparkSession, after initialize start execute only cells from cell with schema= =StructType....
# To restart kernel click Kernel, Restart.
df_schema = (spark.read
             .option("header", True)
             .option("sep", ",")
             .schema(schema)
             .csv(file_path))

print("Schema provided manually (df_schema):")
df_schema.printSchema()

# Сравниваем результаты - read ds_salaries with predefined schema and compare results from this cell and cell with inferSchema
print("Counts: infer vs schema:", df_infer.count(), df_schema.count())
print("First 5 rows from infer:")
df_infer.show(5, truncate=False)
print("First 5 rows from schema:")
df_schema.show(5, truncate=False)

Schema provided manually (df_schema):
root
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

Counts: infer vs schema: 607 607
First 5 rows from infer:
+---+---------+----------------+---------------+--------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|job_title                 |salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+---

In [11]:
# 8.  lazy/eager 
print("Schema (again) from df_infer:")
df_infer.printSchema()
print("Schema (again) from df_schema:")
df_schema.printSchema()


Schema (again) from df_infer:
root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

Schema (again) from df_schema:
root
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullabl

In [12]:
# 9. continue with one dataframe, print data in dataframe using df.show
df = df_schema
df.show(3, truncate=False)

+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+
|0        |2020            |MI             |FT       |NULL  |70000          |NULL         |79833             |NULL        |0               |DE          |
|1        |2020            |SE             |FT       |NULL  |260000         |NULL         |260000            |NULL        |0               |JP          |
|2        |2020            |SE             |FT       |NULL  |85000          |NULL         |109024            |NULL        |50              |GB          |
+---------+----------------+---------------+---------+------+---------------

In [13]:
# 10. print data using display(df.toPandas())
# В Python просто df.show(); в Jupyter можно display(df.toPandas()) - красиво отображение 
display(df.toPandas().head(10))
print(df.limit(10).toPandas())

Unnamed: 0,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,,70000,,79833,,0,DE
1,1,2020,SE,FT,,260000,,260000,,0,JP
2,2,2020,SE,FT,,85000,,109024,,50,GB
3,3,2020,MI,FT,,20000,,20000,,0,HN
4,4,2020,SE,FT,,150000,,150000,,50,US
5,5,2020,EN,FT,,72000,,72000,,100,US
6,6,2020,SE,FT,,190000,,190000,,100,US
7,7,2020,MI,FT,,11000000,,35735,,50,HU
8,8,2020,MI,FT,,135000,,135000,,100,US
9,9,2020,SE,FT,,125000,,125000,,50,NZ


   work_year experience_level employment_type job_title  salary  \
0          0             2020              MI        FT     NaN   
1          1             2020              SE        FT     NaN   
2          2             2020              SE        FT     NaN   
3          3             2020              MI        FT     NaN   
4          4             2020              SE        FT     NaN   
5          5             2020              EN        FT     NaN   
6          6             2020              SE        FT     NaN   
7          7             2020              MI        FT     NaN   
8          8             2020              MI        FT     NaN   
9          9             2020              SE        FT     NaN   

  salary_currency  salary_in_usd employee_residence  remote_ratio  \
0           70000            NaN              79833           NaN   
1          260000            NaN             260000           NaN   
2           85000            NaN             109024    

In [14]:
# 11. create df_job_title with unique job_titles - создаём фрейм с уник знач
df_job_title = df.select("job_title").distinct()

In [15]:
# 12. print all rows from df_job_title without truncating
job_count = df_job_title.count()
print(f"Unique job titles count: {job_count}")
df_job_title.orderBy("job_title").show(job_count, truncate=False)

Unique job titles count: 4
+---------+
|job_title|
+---------+
|CT       |
|FL       |
|FT       |
|PT       |
+---------+



In [16]:
# 13. create df_analytic with max, avg, min USD salaries per job_title
df_analytic = df.groupBy("job_title").agg(
    avg(col("salary_in_usd")).alias("avg_salary"),
    min(col("salary_in_usd")).alias("min_salary"),
    max(col("salary_in_usd")).alias("max_salary")
)

In [17]:
# 14. print all rows from df_analytic without truncating jobs
count_analytic = df_analytic.count()
print(f"Analytic rows: {count_analytic}")
df_analytic.orderBy(col("avg_salary").desc()).show(count_analytic, truncate=False)

Analytic rows: 4
+---------+----------+----------+----------+
|job_title|avg_salary|min_salary|max_salary|
+---------+----------+----------+----------+
|FT       |NULL      |NULL      |NULL      |
|PT       |NULL      |NULL      |NULL      |
|CT       |NULL      |NULL      |NULL      |
|FL       |NULL      |NULL      |NULL      |
+---------+----------+----------+----------+



In [18]:
# 15. add row_id ordered by avg_salary desc
w = Window.orderBy(col("avg_salary").desc())
df_analytic = df_analytic.withColumn("row_id", row_number().over(w))

In [19]:
# 16. print all data from df_analytic
df_analytic.orderBy(col("row_id")).show(count_analytic, truncate=False)

+---------+----------+----------+----------+------+
|job_title|avg_salary|min_salary|max_salary|row_id|
+---------+----------+----------+----------+------+
|FT       |NULL      |NULL      |NULL      |1     |
|PT       |NULL      |NULL      |NULL      |2     |
|CT       |NULL      |NULL      |NULL      |3     |
|FL       |NULL      |NULL      |NULL      |4     |
+---------+----------+----------+----------+------+



In [20]:
# 17. put row_id on first place
cols = df_analytic.columns
# желаемый порядок row_id, остальное так
new_order = ["row_id"] + [c for c in cols if c != "row_id"]
df_analytic = df_analytic.select(*new_order)

In [21]:
# 18. print df_analytic now
df_analytic.show(count_analytic, truncate=False)

+------+---------+----------+----------+----------+
|row_id|job_title|avg_salary|min_salary|max_salary|
+------+---------+----------+----------+----------+
|1     |FT       |NULL      |NULL      |NULL      |
|2     |PT       |NULL      |NULL      |NULL      |
|3     |CT       |NULL      |NULL      |NULL      |
|4     |FL       |NULL      |NULL      |NULL      |
+------+---------+----------+----------+----------+



In [22]:
# 19. create df_exp_lvl with the biggest usd_salary (biggest_salary) for each experience_level
#    (save all fields like in entire dataframe) — т.е. для каждой exp_level взять строку с max salary_in_usd
w_exp = Window.partitionBy("experience_level").orderBy(col("salary_in_usd").desc())
df_with_row = df.withColumn("rn", row_number().over(w_exp))
df_exp_lvl = df_with_row.filter(col("rn") == 1).drop("rn")
print("df_exp_lvl (one row per experience_level with biggest salary):")
df_exp_lvl.show(truncate=False)

df_exp_lvl (one row per experience_level with biggest salary):
+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+
|0        |2020            |MI             |FT       |NULL  |70000          |NULL         |79833             |NULL        |0               |DE          |
|72       |2021            |EN             |FT       |NULL  |60000          |NULL         |82528             |NULL        |50              |GB          |
|289      |2022            |SE             |FT       |NULL  |135000         |NULL         |135000            |NULL        |100             |US          |
+---------+--

In [23]:
# 20. print df_exp_lvl

In [24]:
# 21. create df_best: rows where salary == biggest salary for people in his exp_lvl
#     choose only columns: id, experience_level, biggest_salary, employee_residence
if "id" in df.columns:
    id_col = "id"
else:
    # если нет id, создадим уникальный id
    from pyspark.sql.functions import monotonically_increasing_id
    df = df.withColumn("id", monotonically_increasing_id())
    id_col = "id"

max_by_exp = df.groupBy("experience_level").agg(max("salary_in_usd").alias("biggest_salary"))

# Соед с исходной таблицей, чтобы взять строки, где salary_in_usd == biggest_salary и совпадает experience_level
df_best = (df.join(max_by_exp, on="experience_level", how="inner")
           .filter(col("salary_in_usd") == col("biggest_salary"))
           .select(id_col, "experience_level", "biggest_salary", "employee_residence"))

print("df_best (potentially multiple rows per exp level if tie):")
df_best.show(truncate=False)


df_best (potentially multiple rows per exp level if tie):
+---+----------------+--------------+------------------+
|id |experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
+---+----------------+--------------+------------------+



In [25]:
# 23. drop duplicates by experience_level 
df_best = df_best.dropDuplicates(["experience_level"])

In [26]:
# 24. print df_best
print("df_best after dropping duplicates by experience_level:")
df_best.show(truncate=False)

df_best after dropping duplicates by experience_level:
+---+----------------+--------------+------------------+
|id |experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
+---+----------------+--------------+------------------+



In [27]:
# 25. create df_new_best from df_best without id, map MI->middle, SE->senior, else Null
df_new_best = df_best.drop(id_col).withColumn(
    "experience_level",
    when(col("experience_level") == "MI", "middle")
    .when(col("experience_level") == "SE", "senior")
    .otherwise(None)
)

In [28]:
# 26. print df_new_best
print("df_new_best:")
df_new_best.show(truncate=False)

df_new_best:
+----------------+--------------+------------------+
|experience_level|biggest_salary|employee_residence|
+----------------+--------------+------------------+
+----------------+--------------+------------------+



In [29]:
# 27. write df_new_best like 1.csv and load then it to df_final
out_path = "/tmp/df_new_best_1.csv"  # путь можно поменять
(df_new_best.coalesce(1)
 .write
 .option("header", True)
 .mode("overwrite")
 .csv(out_path))

df_final = (spark.read
            .option("header", True)
            .csv(out_path))

In [30]:
# 28. print df_final
print("df_final (read from CSV):")
df_final.show(truncate=False)

df_final (read from CSV):
+----------------+--------------+------------------+
|experience_level|biggest_salary|employee_residence|
+----------------+--------------+------------------+
+----------------+--------------+------------------+



In [31]:
# 29. filter df_final to delete experience_level where it Null, then join by biggest_salary and employee_residence with entire df
df_final_filtered = df_final.filter(col("experience_level").isNotNull())

df_final_filtered = df_final_filtered.withColumn("biggest_salary", col("biggest_salary").cast("double"))
df_joined = (df_final_filtered.join(df,
                                   (df_final_filtered.biggest_salary == df.salary_in_usd) &
                                   (df_final_filtered.employee_residence == df.employee_residence),
                                   how="inner"))


In [32]:
# 30. print df_final
print("df_joined (result of join):")
df_joined.show(truncate=False)

df_joined (result of join):
+----------------+--------------+------------------+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+---+
|experience_level|biggest_salary|employee_residence|work_year|experience_level|employment_type|job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|id |
+----------------+--------------+------------------+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+---+
+----------------+--------------+------------------+---------+----------------+---------------+---------+------+---------------+-------------+------------------+------------+----------------+------------+---+



In [33]:
# 31. save biggest salary_in_usd from df_final to variable and print it
from pyspark.sql.functions import max as spark_max_fn
max_salary_row = df_joined.agg(spark_max_fn("salary_in_usd").alias("max_salary")).collect()[0]
max_salary_value = max_salary_row["max_salary"]
print("Max salary_in_usd from df_final (joined):", max_salary_value)

Max salary_in_usd from df_final (joined): None
