In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import when,col,regexp_replace,regexp_extract,cast,row_number
from pyspark.sql.functions import concat_ws,count,isnan,round,avg,coalesce,lit,udf,StringType
spark=SparkSession.builder.master("local").appName("emp_clean").getOrCreate()

In [34]:
import os
inp_file1='all_emp.csv'
inp_file2='left_emp.csv'
inp_file_path=os.getcwd()+'/../datafiles/input/'
op_file_path=os.getcwd()+'/../datafiles/output/'

In [35]:
df_raw_allemp=spark.read.format('csv').option('header',True).option('inferSchema',True).load(inp_file_path+inp_file1)
df_raw_allemp.printSchema()
df_raw_left=spark.read.format('csv').option('header',True).option('inferSchema',True).load(inp_file_path+inp_file2)

root
 |-- emp_id: integer (nullable = true)
 |-- f_name: string (nullable = true)
 |-- l_name: string (nullable = true)
 |-- emp_dept: string (nullable = true)
 |-- doj: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- mngr_id: integer (nullable = true)
 |-- total_exp: string (nullable = true)
 |-- location: integer (nullable = true)



In [36]:
#Transformations:

#1 Add a country field 'India'/'USA' and account 'Reliance'
#2 Clean total_exp field and convert it to Integer type and also remove whitespaces from department table
#3 Convert first letters of f_name and l_name in capital
#4 Combine f_name and l_name
#5 drop f_name and l_name
#6 Check and clean all the null/NAN values
#7 Impute salary where it is null, as average of the salaries of respective department

#8  Create bonus column for the currently working employees >1Lac: 17%;70k<15%,50k<12%,else 10%
#9 Based on the final salary, rank the employees by their experience, final salary dept-wise
#10 Save the final data in parquet format and exclude the intern role partition by country

In [37]:
#1 Add a country field 'India'/'USA' and account 'Reliance'
df_res1=df_raw_allemp.withColumn("location",when(col('location')==1,'India').when(col('location')==0,'USA'))
df_res1=df_res1.withColumn('account',lit("Reliance"))
#df_res2.show()

#2 Clean total_exp field and convert it to Integer type and also remove whitespaces from department table
df_res2=df_res1.withColumn('total_exp',regexp_extract('total_exp','\d+',0))
df_res2=df_res2.withColumn('total_exp',col('total_exp').cast('Integer'))
df_res2=df_res2.withColumn('emp_dept',regexp_replace(col('emp_dept'),'\s',''))
#df_res2.show()
#df_res2.printSchema()

#3 Convert first letters of f_name and l_name in capital
def convert_to_upper(string):
    string=string.replace(" ","")
    string=string[0].upper()+string[1:]
    return string

convert_df_upper=udf(lambda string:convert_to_upper(string),StringType())
df_res3=df_res2.withColumn('f_name',convert_df_upper(col('f_name')))
df_res3=df_res3.withColumn('l_name',convert_df_upper(col('l_name')))
#df_res3.show()

#4 Combine f_name and l_name
df_res4=df_res3.withColumn('f_name',concat_ws(' ',col('f_name'),col('l_name')))

#5 drop f_name and l_name
df_res5=df_res4.withColumnRenamed('f_name','name').drop('l_name')

#df_res5.show()

#df_res5=df_res5.withColumn('manager_name',df_res5.alias('emp1') \
#        .join(df_res5.alias('emp2'),col('emp1.mngr_id')==col('emp2.emp_id'),'left') \
#        .select(col('emp2.name').alias('manager_name')))

#6 Check and clean all the null/NAN values
df_res5.select([count(when(isnan(s) | col(s).isNull(),s)).alias(s) for s in df_res5.columns]).show()
#|emp_id|name|emp_dept|doj|city|salary|mngr_id|total_exp|location|account|
#|     1|   0|       2|  1|   3|     5|      0|        2|       0|      0|

df_res6=df_res5.dropna(subset=['emp_id','emp_dept','total_exp'])#.dropna(subset='')

#df_res6.select([count(when(isnan(s) | col(s).isNull(),s)).alias(s) for s in df_res5.columns]).show()
#|emp_id|name|emp_dept|doj|city|salary|mngr_id|total_exp|location|account|
#|     0|   0|       0|  0|   0|     4|      0|        0|       0|      0|

#filling the salary null fields with the average salary of respective departmens
spec1=Window.partitionBy('emp_dept')

#7 Impute salary where it is null, as average of the salaries of respective department
df_res7=df_res6.withColumn('salary',coalesce('salary',round(avg('salary').over(spec1))))
#df_res7.show()
#df_res7.select([count(when(isnan(s) | col(s).isNull(),s)).alias(s) for s in df_res5.columns]).show()
#|emp_id|name|emp_dept|doj|city|salary|mngr_id|total_exp|location|account|
#|     0|   0|       0|  0|   0|     0|      0|        0|       0|      0|

#8 Create manager column and
#Create bonus column for the currently working employees only >1Lac: 17%;70k<15%,50k<12%,else 10%

#fetching the current employees only
df_res8=df_res7.join(df_raw_left,df_res7.emp_id==df_raw_left.emp_id,'leftanti')

#adding managers column
df_res8=df_res8.alias('emp1') \
        .join(df_res5.alias('emp2'),col('emp1.mngr_id')==col('emp2.emp_id'),'left') \
        .select(col('emp1.emp_id'),col('emp1.name'),col('emp1.emp_dept'),col('emp1.doj'), \
                col('emp1.city'),col('emp1.salary'),col('emp1.mngr_id'),col('emp1.total_exp'), \
                col('emp1.location'),col('emp2.name').alias('manager_name'))

#adding bonus column
condition=when(col('salary')>=100000,col('salary')*.17) \
    .when(col('salary')>=70000,col('salary')*.15) \
    .when(col('salary')>=50000,col('salary')*.12) \
    .otherwise(col('salary')*.10)

df_res8=df_res8.withColumn('bonus',condition)
df_res8.show()
df_res8=df_res8.withColumn('salary',col('salary')+col('bonus')).drop('bonus')
#df_res8.show()

#9 Based on the final salary, rank the employees by their experience, final salary dept-wise

spec2=Window.partitionBy(col('emp_dept')).orderBy(col('total_exp').desc(),(col('salary')).desc())
df_res9=df_res8.withColumn('dept_rank',row_number().over(spec2))
#df_res9.select(col('name'),col('emp_dept'),col('salary'),col('bonus'),col('total_exp'),col('dept_rank')).show()
#df_res9.show()

#10 Save the final data in parquet format and exclude the intern role partition by country

df_res9.write.mode('overwrite').partitionBy('location').parquet(op_file_path)

+------+----+--------+---+----+------+-------+---------+--------+-------+
|emp_id|name|emp_dept|doj|city|salary|mngr_id|total_exp|location|account|
+------+----+--------+---+----+------+-------+---------+--------+-------+
|     1|   0|       2|  1|   3|     5|      0|        2|       0|      0|
+------+----+--------+---+----+------+-------+---------+--------+-------+

+------+----+--------+---+----+------+-------+---------+--------+-------+
|emp_id|name|emp_dept|doj|city|salary|mngr_id|total_exp|location|account|
+------+----+--------+---+----+------+-------+---------+--------+-------+
|     0|   0|       0|  0|   0|     0|      0|        0|       0|      0|
+------+----+--------+---+----+------+-------+---------+--------+-------+

+------+-------------------+--------+-----------+----------+--------+-------+---------+--------+-----------------+------------------+
|emp_id|               name|emp_dept|        doj|      city|  salary|mngr_id|total_exp|location|     manager_name|          