Write program in spark
 
emp_id		employer		startyear		endyear<br>
1001		Microsoft		  2015			 2020<br>			 
1001		Google			  2020			 2022<br>	
1002		Google			  2015			 2020<br>	
1002		Amazon		  	  2020			 2022<br>	
1002		Microsoft		  2022			 2023<br>
1003		Amazon		  	  2020			 2023<br>
 
Retreive the employees along with the employer details whose first employer is Microsoft and next employer is Google

In [2]:
employee_data = """
emp_id		employer		startyear		endyear
1001		Microsoft		  2015			 2020			 
1001		Google			  2020			 2022	
1002		Google			  2015			 2020	
1002		Amazon		  	  2020			 2022	
1002		Microsoft		  2022			 2023
1003		Amazon		  	  2020			 2023
"""

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder.appName("First Second Employers").getOrCreate()

In [37]:
df = spark.readtext("./company.txt")
splited_value = split(df['value'],"\t")
df = df.withColumn("emp_id",splited_value[0]).withColumn("employer",splited_value[1]).withColumn("startyear",splited_value[2]).withColumn("endyear",splited_value[3])
df = df.where("emp_id != 'emp_id'")
df = df.drop("value")
df.show()

In [43]:
df.printSchema()

root
 |-- emp_id: string (nullable = true)
 |-- employer: string (nullable = true)
 |-- startyear: string (nullable = true)
 |-- endyear: string (nullable = true)



In [46]:
df = df.withColumn("startyear",col("startyear").cast("int")).withColumn("endyear",col("endyear").cast("int"))

In [47]:
df.printSchema()

root
 |-- emp_id: string (nullable = true)
 |-- employer: string (nullable = true)
 |-- startyear: integer (nullable = true)
 |-- endyear: integer (nullable = true)



In [48]:
df.show()

+------+---------+---------+-------+
|emp_id| employer|startyear|endyear|
+------+---------+---------+-------+
|  1001|Microsoft|     2015|   2020|
|  1001|   Google|     2020|   2022|
|  1002|   Google|     2015|   2020|
|  1002|   Amazon|     2020|   2022|
|  1002|Microsoft|     2022|   2023|
|  1003|   Amazon|     2020|   2023|
+------+---------+---------+-------+



In [65]:
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [61]:
df2_schema = StructType(
    [
        StructField("emp_id",IntegerType(),False),
        StructField("employer",StringType(),False),
        StructField("startyear",IntegerType(),False),
        StructField("endyear",IntegerType(),False)
    ]
)

In [62]:
df2 = spark.read.schema(df2_schema).option("header",True).option("delimiter","\t").csv("./company.txt")
df2.show()

+------+---------+---------+-------+
|emp_id| employer|startyear|endyear|
+------+---------+---------+-------+
|  1001|Microsoft|     2015|   2020|
|  1001|   Google|     2020|   2022|
|  1002|   Google|     2015|   2020|
|  1002|   Amazon|     2020|   2022|
|  1002|Microsoft|     2022|   2023|
|  1003|   Amazon|     2020|   2023|
+------+---------+---------+-------+



In [66]:
df_window = Window().partitionBy("emp_id").orderBy("startyear")

In [74]:
df = df2
df = df.withColumn("firstEmployer",first("employer").over(df_window))
df = df.withColumn("nextEmployer",lead("employer").over(df_window))
df = df.withColumn("isFirst",row_number().over(df_window))
df.show()

+------+---------+---------+-------+-------------+------------+-------+
|emp_id| employer|startyear|endyear|firstEmployer|nextEmployer|isFirst|
+------+---------+---------+-------+-------------+------------+-------+
|  1001|Microsoft|     2015|   2020|    Microsoft|      Google|      1|
|  1001|   Google|     2020|   2022|    Microsoft|        NULL|      2|
|  1002|   Google|     2015|   2020|       Google|      Amazon|      1|
|  1002|   Amazon|     2020|   2022|       Google|   Microsoft|      2|
|  1002|Microsoft|     2022|   2023|       Google|        NULL|      3|
|  1003|   Amazon|     2020|   2023|       Amazon|        NULL|      1|
+------+---------+---------+-------+-------------+------------+-------+



In [83]:
filterd_df = df.where("isFirst = 1 and firstEmployer = 'Microsoft' and nextEmployer='Google' ").select(["emp_id"])


In [90]:
df.alias("a").join(filterd_df.alias("b"),df.emp_id == filterd_df.emp_id).select(["a.emp_id","employer","startyear","endyear"]).show()

+------+---------+---------+-------+
|emp_id| employer|startyear|endyear|
+------+---------+---------+-------+
|  1001|Microsoft|     2015|   2020|
|  1001|   Google|     2020|   2022|
+------+---------+---------+-------+

