In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

custome_schema = StructType([StructField("id", IntegerType()),
                             StructField("name", StringType()),
                             StructField("experience", IntegerType()),
                             StructField("gender", StringType()),
                             StructField("dob", TimestampType()),
                             StructField("company", StringType()),
                             StructField("designation", StringType()),
                             StructField("doj", StringType()),
                             StructField("skills", StringType()),
                             StructField("actual_expected_salary", StringType())])

employee_df = spark.read.csv("/FileStore/tables/country_count.txt/employee.csv",
                             sep="|",
                             header=True,
                             quote="'",
                             schema=custome_schema)
employee_df.display()

id,name,experience,gender,dob,company,designation,doj,skills,actual_expected_salary
101.0,Agastya,1.0,M,1987-01-22T00:00:00Z,Infosys,Developer,21-01-2015,"Hadoop,PySpark,Kafka",10500001100000.0
102.0,Acyuta,2.0,F,1987-03-29T00:00:00Z,TCS,Team Lead,21-01-2016,"C,C++,Java",10500001100000.0
103.0,Anuvrata,1.0,M,1987-01-22T00:00:00Z,Infosys,Developer,21-01-2017,"Java,Python,Hadoop",10500001100000.0
,Bhavika,6.0,F,1987-01-22T00:00:00Z,Cisco,Team lead,21-01-2015,"Hadoop,PySpark,Kafka",
105.0,Chitragandha,,M,1987-01-22T00:00:00Z,CTS,Developer,,"C,C++,Java",10500001100000.0
106.0,Hritika,9.0,F,1987-01-22T00:00:00Z,Cisco,Developer,21-01-2016,"C,C++,Java",10500001100000.0
107.0,Jigyasa,12.0,M,1987-01-22T00:00:00Z,Infosys,Team Lead,21-01-2005,"Hadoop,PySpark,Kafka",10500001100000.0
108.0,Kaveri,4.0,F,1987-01-22T00:00:00Z,cisco,Research and Development Engineer,21-01-2015,"C,C++,Java",10500001100000.0
109.0,,8.0,M,1987-01-22T00:00:00Z,,Developer,21-01-2008,"C,C++,Java",10500001100000.0
110.0,Vin,,T,1987-01-22T00:00:00Z,Sagar Info Tech,""" Team Lead """,21-01-2020,"C,C++,C++,Assembly",1000000600000.0


## Q-1 Filter out the records which are having accepted salary more than expected sal

In [0]:
# Approach 1
salary_array_df= employee_df.withColumn("actual_expected_salary", split(col("actual_expected_salary"), ","))

accepted_salary_greater_df = salary_array_df.filter(col("actual_expected_salary").getItem(0).cast("int") > col("actual_expected_salary").getItem(1).cast("int"))

accepted_salary_greater_df.display()

# Approach 2
sal_array_int_df.filter(col("sal_int")[0] > col("sal_int")[1]).display()

# Approach 3
to_int = udf(lambda x: [int(i) for i in x] if x is not None else None, ArrayType(IntegerType()))
 
emp_df12 = emp_df1.withColumn("actual_expected_salary", to_int("actual_expected_salary"))
 
emp_df12.display()


id,name,experience,gender,dob,company,designation,doj,skills,actual_expected_salary
110,Vin,,T,1987-01-22T00:00:00Z,Sagar Info Tech,""" Team Lead """,21-01-2020,"C,C++,C++,Assembly","List(1000000, 600000)"


## Q-2 max salry- 30% hike

In [0]:
# Approach 1
hike_df = salary_array_df.withColumn("30% Salary Hike", col("actual_expected_salary").getItem(0) * 1.30)
hike_df.display()

# Approach 2
result_df_salary_hike=result_df_01.withColumn("30%_Hike_Salary",array_max(col("actual_expected_salary").cast("array<double>"))*1.3)
 
result_df_salary_hike.display()

# Approach 3
actual_sal = col("actual_expected_salary")[0].cast("double")
expected_sal = col("actual_expected_salary")[1].cast("double")
 
greatest_element = greatest(actual_sal, expected_sal)
hiked_value = greatest_element * 1.30
 
empdf1_with_hike = empdf1.withColumn("hiked_salary_30%", hiked_value)
empdf1_with_hike.display()


id,name,experience,gender,dob,company,designation,doj,skills,actual_expected_salary,30% Salary Hike
101.0,Agastya,1.0,M,1987-01-22T00:00:00Z,Infosys,Developer,21-01-2015,"Hadoop,PySpark,Kafka","List(1050000, 1100000)",1365000.0
102.0,Acyuta,2.0,F,1987-03-29T00:00:00Z,TCS,Team Lead,21-01-2016,"C,C++,Java","List(1050000, 1100000)",1365000.0
103.0,Anuvrata,1.0,M,1987-01-22T00:00:00Z,Infosys,Developer,21-01-2017,"Java,Python,Hadoop","List(1050000, 1100000)",1365000.0
,Bhavika,6.0,F,1987-01-22T00:00:00Z,Cisco,Team lead,21-01-2015,"Hadoop,PySpark,Kafka",,
105.0,Chitragandha,,M,1987-01-22T00:00:00Z,CTS,Developer,,"C,C++,Java","List(1050000, 1100000)",1365000.0
106.0,Hritika,9.0,F,1987-01-22T00:00:00Z,Cisco,Developer,21-01-2016,"C,C++,Java","List(1050000, 1100000)",1365000.0
107.0,Jigyasa,12.0,M,1987-01-22T00:00:00Z,Infosys,Team Lead,21-01-2005,"Hadoop,PySpark,Kafka","List(1050000, 1100000)",1365000.0
108.0,Kaveri,4.0,F,1987-01-22T00:00:00Z,cisco,Research and Development Engineer,21-01-2015,"C,C++,Java","List(1050000, 1100000)",1365000.0
109.0,,8.0,M,1987-01-22T00:00:00Z,,Developer,21-01-2008,"C,C++,Java","List(1050000, 1100000)",1365000.0
110.0,Vin,,T,1987-01-22T00:00:00Z,Sagar Info Tech,""" Team Lead """,21-01-2020,"C,C++,C++,Assembly","List(1000000, 600000)",1300000.0


# Q3 python-30% extra and pyspark&kafka -40% extra

In [0]:
# Approach 1
increased_sal_pyspark_df = increased_sal_df.withColumn("salary_extra_hike",
                                                       when((array_contains(col("skills"),"Hadoop") & array_contains(col("skills"),"PySpark")),round(col("salary")*1.4,0 ))
                                                       .when(array_contains(col("skills"),"Hadoop") , col("salary")*1.3 )
                                                       .otherwise(col("salary"))).display()


# Approach 2 
result_df_salary_hike_skill = result_df_salary_hike.withColumn("Extra_Salary_Hike_Pyspark_Skill",when(array_contains(col("skills"), "PySpark"),col("30%_Hike_Salary") * 1.3).otherwise(col("30%_Hike_Salary")))
 
result_df_salary_hike_skill.display(truncate=False)


# Approach 3
empdf1_with_hike.withColumn(
    "hike_based_on_skill",
    when(array_contains(col("skills"), "Python"), col("hiked_salary_30%") * 1.3)
    .when(
        array_contains(("skills"), "PySpark") & array_contains(col("skills"), "Kafka"),
        round(col("hiked_salary_30%") * 1.4,2),
    )
    .otherwise(col("hiked_salary_30%")),
).display()