import libraries(I provide all libs that I need when make this tasks, if you need some external import them here)

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
from pyspark.sql.functions import when

create local SparkSession

In [2]:
spark = SparkSession.builder.master("local[1]") \
                    .appName('Session.com') \
                    .getOrCreate()

read csv with inferschema

In [3]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("ds_salaries.csv")

read csv one more time with the same code and you will see that it almostly don't take time, because info already in SparkSession and it will not read nothing
from this file

In [4]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("ds_salaries.csv")

write schema of scv on screen

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



create schema of this scv

In [6]:
data_science_job_salaries_schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("work_year", IntegerType(), True),
    StructField("experience_level", StringType(), True),
    StructField("employment_type", StringType(), True),
    StructField("job_title", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("salary_currency", StringType(), True),
    StructField("salary_in_usd", IntegerType(), True),
    StructField("employee_residence", StringType(), True),
    StructField("remote_ratio", StringType(), True),
    StructField("company_location", StringType(), True),
    StructField("company_size", StringType(), True)
])

restart kernel without cleaning output and after restarting you need to initialize SparkSession, after initialize start execute only cells from cell with schema=
=StructType.... 
To restart kernel click Kernel, Restart.

read ds_salaries with predefined schema and compare results from this cell and cell with inferSchema

In [7]:
df_with_schema = spark.read.csv("ds_salaries.csv", header=True, schema=data_science_job_salaries_schema)

this happens because read operation is lazy(transformation), but if you use inferschema it start to be action that will create Spark Job, because Spark need to loop throw all file to check datatypes for all columns and this can harm to your code(if we compare to parquet, it will also go to check data types, but parquet provide meta information, so Spark will not go throw all file, he will just read meta information, but csv don't provide such meta information). Also header make Spark to create one more Spark Job to check first line
to define name of columns and remember to skeep it when reading. Actual reading start when you will use first action. More about Spark Jobs you will see in next topic

write schema of scv on screen one more time and compare with previous

In [8]:
df_with_schema.printSchema()

root
 |-- id: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: string (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



now continue to work with one of the dataframes that you create

print data in dataframe using df.show

In [9]:
df = df_with_schema
df.show(1, vertical=True)

-RECORD 0----------------------------
 id                 | 0              
 work_year          | 2020           
 experience_level   | MI             
 employment_type    | FT             
 job_title          | Data Scientist 
 salary             | 70000          
 salary_currency    | EUR            
 salary_in_usd      | 79833          
 employee_residence | DE             
 remote_ratio       | 0              
 company_location   | DE             
 company_size       | L              
only showing top 1 row



print data in dataframe using display(df.toPandas())

In [10]:
display(df.toPandas())

Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L
...,...,...,...,...,...,...,...,...,...,...,...,...
602,602,2022,SE,FT,Data Engineer,154000,USD,154000,US,100,US,M
603,603,2022,SE,FT,Data Engineer,126000,USD,126000,US,100,US,M
604,604,2022,SE,FT,Data Analyst,129000,USD,129000,US,0,US,M
605,605,2022,SE,FT,Data Analyst,150000,USD,150000,US,100,US,M


create df_job_title that consists from all job_titles without duplicates

In [24]:
df_job_title = df.select('job_title').distinct()

print all rows from df_job_titles without truncating jobs

In [26]:
df_job_title.show(df_job_title.count(), vertical=True, truncate=False)

-RECORD 0---------------------------------------------
 job_title | 3D Computer Vision Researcher            
-RECORD 1---------------------------------------------
 job_title | Lead Data Engineer                       
-RECORD 2---------------------------------------------
 job_title | Head of Machine Learning                 
-RECORD 3---------------------------------------------
 job_title | Data Specialist                          
-RECORD 4---------------------------------------------
 job_title | Data Analytics Lead                      
-RECORD 5---------------------------------------------
 job_title | Machine Learning Scientist               
-RECORD 6---------------------------------------------
 job_title | Lead Data Analyst                        
-RECORD 7---------------------------------------------
 job_title | Data Engineering Manager                 
-RECORD 8---------------------------------------------
 job_title | Staff Data Scientist                     
-RECORD 9-

create  df_analytic that will consists from max, avg, min USD salaries for all job_titles using groupBy. name of fields is avg_salary, min_salary, max_salary

In [27]:
df_analytic = (df.groupBy("job_title") .agg(
        avg("salary_in_usd").alias("avg_salary"), 
        min("salary_in_usd").alias("min_salary"), 
        max("salary_in_usd").alias("max_salary"))
) 

print all rows from df_analytic without trancating jobs

In [28]:
df_analytic.show(df_analytic.count(), vertical=True, truncate=False)

-RECORD 0----------------------------------------------
 job_title  | 3D Computer Vision Researcher            
 avg_salary | 5409.0                                   
 min_salary | 5409                                     
 max_salary | 5409                                     
-RECORD 1----------------------------------------------
 job_title  | Lead Data Engineer                       
 avg_salary | 139724.5                                 
 min_salary | 56000                                    
 max_salary | 276000                                   
-RECORD 2----------------------------------------------
 job_title  | Head of Machine Learning                 
 avg_salary | 79039.0                                  
 min_salary | 79039                                    
 max_salary | 79039                                    
-RECORD 3----------------------------------------------
 job_title  | Data Specialist                          
 avg_salary | 165000.0                          

now you need to add in df_analytic column row_id, that will show order of all job_titles depending on avg salary. they should be descending

In [30]:
df_analytic = df_analytic.withColumn("row_id", row_number().over(Window.orderBy(desc("avg_salary"))))

print all data from df_analytic

In [31]:
df_analytic.show(df_analytic.count(), vertical=True, truncate=False)

-RECORD 0----------------------------------------------
 job_title  | Data Analytics Lead                      
 avg_salary | 405000.0                                 
 min_salary | 405000                                   
 max_salary | 405000                                   
 row_id     | 1                                        
-RECORD 1----------------------------------------------
 job_title  | Principal Data Engineer                  
 avg_salary | 328333.3333333333                        
 min_salary | 185000                                   
 max_salary | 600000                                   
 row_id     | 2                                        
-RECORD 2----------------------------------------------
 job_title  | Financial Data Analyst                   
 avg_salary | 275000.0                                 
 min_salary | 100000                                   
 max_salary | 450000                                   
 row_id     | 3                                 

it isn't beautifull, so we need to put now row_id on first place in df_analytic

In [32]:
reordered_columns = ["row_id", "job_title", "avg_salary", "min_salary", "max_salary"]
df_analytic = df_analytic.select(reordered_columns)

print df_analytic now

In [33]:
df_analytic.show(df_analytic.count(), vertical=True, truncate=False)

-RECORD 0----------------------------------------------
 row_id     | 1                                        
 job_title  | Data Analytics Lead                      
 avg_salary | 405000.0                                 
 min_salary | 405000                                   
 max_salary | 405000                                   
-RECORD 1----------------------------------------------
 row_id     | 2                                        
 job_title  | Principal Data Engineer                  
 avg_salary | 328333.3333333333                        
 min_salary | 185000                                   
 max_salary | 600000                                   
-RECORD 2----------------------------------------------
 row_id     | 3                                        
 job_title  | Financial Data Analyst                   
 avg_salary | 275000.0                                 
 min_salary | 100000                                   
 max_salary | 450000                            

here you need to create df_exp_lvl with the biggest usd_salary(biggest_salary) for each experience_level(you need to save all fields like in entire dataframe)

In [23]:
df_exp_lvl = (df.withColumn("salary_rank",row_number().over(Window.partitionBy("experience_level").orderBy(col("salary_in_usd").desc())),)
    .filter(col("salary_rank") == 1)
    .drop("salary_rank")
    .withColumnRenamed("salary", "biggest_salary")
)

print here df_exp_lvl

In [24]:
df_exp_lvl.show(df_exp_lvl.count(), vertical=True, truncate=False)

-RECORD 0---------------------------------------
 id                 | 37                        
 work_year          | 2020                      
 experience_level   | EN                        
 employment_type    | FT                        
 job_title          | Machine Learning Engineer 
 biggest_salary     | 250000                    
 salary_currency    | USD                       
 salary_in_usd      | 250000                    
 employee_residence | US                        
 remote_ratio       | 50                        
 company_location   | US                        
 company_size       | L                         
-RECORD 1---------------------------------------
 id                 | 252                       
 work_year          | 2021                      
 experience_level   | EX                        
 employment_type    | FT                        
 job_title          | Principal Data Engineer   
 biggest_salary     | 600000                    
 salary_currency    

create df_best that consists from rows where salary of guy same as biggest salary for other people in his exp_lvl and choose only columns: id, experience_level, biggest_salary, employee_residence

In [11]:
df_max_salary_grouped_by_exp_lvl = df.groupBy("experience_level").agg(max("salary").alias("max_salary"))
df_best = (df
            .join(
                df_max_salary_grouped_by_exp_lvl, 
                (df["salary"] == df_max_salary_grouped_by_exp_lvl["max_salary"])
                & (df["experience_level"] == df_max_salary_grouped_by_exp_lvl["experience_level"]),
                how="left_semi")
            .select("id", "experience_level", col("salary").alias("biggest_salary"), "employee_residence")
)

print df_best

In [27]:
df_best.show(df_best.count(), vertical=True, truncate=False)

-RECORD 0----------------------
 id                 | 16       
 experience_level   | EN       
 biggest_salary     | 4450000  
 employee_residence | JP       
-RECORD 1----------------------
 id                 | 177      
 experience_level   | MI       
 biggest_salary     | 30400000 
 employee_residence | CL       
-RECORD 2----------------------
 id                 | 285      
 experience_level   | SE       
 biggest_salary     | 7000000  
 employee_residence | IN       
-RECORD 3----------------------
 id                 | 384      
 experience_level   | EX       
 biggest_salary     | 6000000  
 employee_residence | IN       



drop duplicates if exist by experience_level

In [28]:
df_best = df_best.drop_duplicates(["experience_level"])

print df_best

In [29]:
df_best.show(df_best.count(), vertical=True, truncate=False)

-RECORD 0----------------------
 id                 | 16       
 experience_level   | EN       
 biggest_salary     | 4450000  
 employee_residence | JP       
-RECORD 1----------------------
 id                 | 384      
 experience_level   | EX       
 biggest_salary     | 6000000  
 employee_residence | IN       
-RECORD 2----------------------
 id                 | 177      
 experience_level   | MI       
 biggest_salary     | 30400000 
 employee_residence | CL       
-RECORD 3----------------------
 id                 | 285      
 experience_level   | SE       
 biggest_salary     | 7000000  
 employee_residence | IN       



create df_new_best from df_best without id, and make the next: when exp_level = MI we want middle, when SE we want senior, else Null

In [12]:
df_new_best = df_best.drop("id").withColumn("experience_level",
                                  when(df_best["experience_level"] == "MI","middle")
                                 .when(df_best["experience_level"] == "SE","senior")
                                 )

print df_new_best

In [37]:
df_new_best.show(df_new_best.count(), vertical=True, truncate=False)

-RECORD 0----------------------
 experience_level   | null     
 biggest_salary     | 4450000  
 employee_residence | JP       
-RECORD 1----------------------
 experience_level   | null     
 biggest_salary     | 6000000  
 employee_residence | IN       
-RECORD 2----------------------
 experience_level   | middle   
 biggest_salary     | 30400000 
 employee_residence | CL       
-RECORD 3----------------------
 experience_level   | senior   
 biggest_salary     | 7000000  
 employee_residence | IN       



write df_new_best like 1.csv and load then it to df_final

In [13]:
df_new_best.toPandas().to_csv("1.csv", index=False)
df_final = spark.read.option("header", True).csv("1.csv")

print df_final

In [36]:
df_final.show(df_final.count(), vertical=True, truncate=False)

-RECORD 0----------------------
 experience_level   | null     
 biggest_salary     | 4450000  
 employee_residence | JP       
-RECORD 1----------------------
 experience_level   | null     
 biggest_salary     | 6000000  
 employee_residence | IN       
-RECORD 2----------------------
 experience_level   | middle   
 biggest_salary     | 30400000 
 employee_residence | CL       
-RECORD 3----------------------
 experience_level   | senior   
 biggest_salary     | 7000000  
 employee_residence | IN       



filter df_final to delete experience_level where it Null, then join this table by biggest_salary(salary_in_usd) and employee_residence with entire df

In [14]:
df_final = ( df_final.filter(df_final["experience_level"].isNotNull()).join(
        df,
        (df_final["employee_residence"] == df["employee_residence"])
        & (df_final["biggest_salary"] == df["salary"]),
        how="inner",
    )
)

print df_final

In [15]:
df_final.show(df_final.count(), vertical=True, truncate=False)

-RECORD 0----------------------------------
 experience_level   | middle               
 biggest_salary     | 30400000             
 employee_residence | CL                   
 id                 | 177                  
 work_year          | 2021                 
 experience_level   | MI                   
 employment_type    | FT                   
 job_title          | Data Scientist       
 salary             | 30400000             
 salary_currency    | CLP                  
 salary_in_usd      | 40038                
 employee_residence | CL                   
 remote_ratio       | 100                  
 company_location   | CL                   
 company_size       | L                    
-RECORD 1----------------------------------
 experience_level   | senior               
 biggest_salary     | 7000000              
 employee_residence | IN                   
 id                 | 285                  
 work_year          | 2021                 
 experience_level   | SE        

last task is to save in variable and then print this variable of the biggest salary_in_usd from df_final

In [19]:
value = df_final.select(max(df_final["salary_in_usd"])).collect()[0]["max(salary_in_usd)"]
print(value)

94665


It is the end of PySpark basics. In other lessons you will learn optimizations technics and how to make distributed system