# Employee Departement Assignment

#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 60
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [41]:
import pyspark.sql.functions




#### Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='employee-department', table_name='employee_department_csv')
dyf.printSchema()

root
|-- employee_id: long
|-- employee_name: string
|-- department: string
|-- state: string
|-- salary: long
|-- age: long
|-- bonus: long


#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show(5)

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|      Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|      Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
+-----------+-----------------+----------+-----+------+---+-----+
only showing top 5 rows



#### Create a temporary view for SQL queries


In [4]:
df.createOrReplaceTempView('employee')




## Query 1:  Find the total number of employees in the company

In [7]:
# Dataframe
total_employees = df.count()
print(f"The total number of employees is: {total_employees}")

The total number of employees is: 1000


In [15]:
#SQL
total_employees_sql = spark.sql('select count(*) from employee').show()

+--------+
|count(1)|
+--------+
|    1000|
+--------+


## Query 2: Find the total number of departments in the company

In [9]:
#DataFrame
total_department = df.select('department').distinct().count()
print(f'The total number of department is:{total_department}')

The total number of department is:6


In [14]:
total_department_sql = spark.sql('select count(distinct department) from employee').show()

+--------------------------+
|count(DISTINCT department)|
+--------------------------+
|                         6|
+--------------------------+


## Query 3: Find the department names of the company

In [11]:
dep_name = df.select('department').distinct().show()

+----------+
|department|
+----------+
|     Sales|
| Marketing|
|   Finance|
|        HR|
|  Accounts|
|Purchasing|
+----------+


In [13]:
dep_name_sql = spark.sql('select distinct department from employee').show()

+----------+
|department|
+----------+
|     Sales|
| Marketing|
|   Finance|
|        HR|
|  Accounts|
|Purchasing|
+----------+


## Query 4: Find the total number of employees in each department

In [16]:
df.groupby('department').count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|  169|
| Marketing|  170|
|   Finance|  162|
|        HR|  171|
|  Accounts|  162|
|Purchasing|  166|
+----------+-----+


In [17]:
spark.sql('select department, count(*) from employee group by 1').show()

+----------+--------+
|department|count(1)|
+----------+--------+
|     Sales|     169|
| Marketing|     170|
|   Finance|     162|
|        HR|     171|
|  Accounts|     162|
|Purchasing|     166|
+----------+--------+


## Query 5: Find the total number of employees in each state

In [18]:
df.groupby('state').count().show()

+-----+-----+
|state|count|
+-----+-----+
|   CA|  205|
|   LA|  205|
|   NY|  173|
|   AK|  209|
|   WA|  208|
+-----+-----+


In [19]:
spark.sql('select state, count(*) from employee group by 1').show()

+-----+--------+
|state|count(1)|
+-----+--------+
|   CA|     205|
|   LA|     205|
|   NY|     173|
|   AK|     209|
|   WA|     208|
+-----+--------+


## Query 6: Find the total number of employees in each state of each department

In [21]:
df.groupby('state', 'department').count().show(10)

+-----+----------+-----+
|state|department|count|
+-----+----------+-----+
|   CA|  Accounts|   35|
|   CA| Marketing|   33|
|   LA| Marketing|   26|
|   WA|  Accounts|   27|
|   CA|        HR|   28|
|   CA|Purchasing|   32|
|   WA|     Sales|   27|
|   AK|  Accounts|   37|
|   CA|     Sales|   42|
|   AK|        HR|   25|
+-----+----------+-----+
only showing top 10 rows


In [22]:
spark.sql('select state, department, count(*) from employee group by 1, 2').show(5)

+-----+----------+--------+
|state|department|count(1)|
+-----+----------+--------+
|   CA|  Accounts|      35|
|   CA| Marketing|      33|
|   LA| Marketing|      26|
|   WA|  Accounts|      27|
|   CA|        HR|      28|
+-----+----------+--------+
only showing top 5 rows


## Query 7: Find the min and max salaries in each department and sort salaries in ascending order

In [52]:
df.groupBy("Department").agg(
    F.min("salary"),
    F.max("salary")
).orderBy("min(salary)").show()

+----------+-----------+-----------+
|Department|min(salary)|max(salary)|
+----------+-----------+-----------+
|   Finance|       1006|       9899|
|  Accounts|       1007|       9890|
|        HR|       1013|       9982|
| Marketing|       1031|       9974|
|     Sales|       1103|       9982|
|Purchasing|       1105|       9985|
+----------+-----------+-----------+


In [35]:
spark.sql('select department, min(salary), max(salary) from employee group by 1').show(5)

+----------+-----------+-----------+
|department|min(salary)|max(salary)|
+----------+-----------+-----------+
|     Sales|       1103|       9982|
| Marketing|       1031|       9974|
|   Finance|       1006|       9899|
|        HR|       1013|       9982|
|  Accounts|       1007|       9890|
+----------+-----------+-----------+
only showing top 5 rows


## Query 8: Find the names of employees working in NY state under Finance department whose bonuses are greater than the average bonuses of employees in NY state

In [80]:
average_bonus_ny = df.filter(df.state == "NY").agg({'bonus':'avg'}).collect()[0]['avg(bonus)']




In [81]:
average_bonus_ny

1251.3468208092486


In [82]:
df.filter( (df['state'] == 'NY') & 
           (df['department'] == 'Finance' ) & 
            (df['bonus'] > average_bonus_ny)). \
            select('employee_name').show()

+--------------------+
|       employee_name|
+--------------------+
|       Vivan Sifford|
|      Herder Gallman|
|          Nena Rocha|
|       Leif Lemaster|
|Ellingsworth Meli...|
|        Escoto Gilma|
|     Georgeanna Laub|
|     Durio Tenenbaum|
|       Juliana Grigg|
|        Tiffani Benz|
|          Nitz Ilana|
|   Phylicia Antonina|
|         Durio Janey|
|       Melissia Jere|
|      Yukiko Kreamer|
|      Nena Kensinger|
|      Antonina Ilana|
+--------------------+


In [56]:
spark.sql(
  """select employee_name 
    from employee 
    where state = 'NY' 
    and department = 'Finance' 
    and bonus > ( 
    select avg(bonus) from employee where state = 'NY'
    )
    """
).show()

+--------------------+
|       employee_name|
+--------------------+
|       Vivan Sifford|
|      Herder Gallman|
|          Nena Rocha|
|       Leif Lemaster|
|Ellingsworth Meli...|
|        Escoto Gilma|
|     Georgeanna Laub|
|     Durio Tenenbaum|
|       Juliana Grigg|
|        Tiffani Benz|
|          Nitz Ilana|
|   Phylicia Antonina|
|         Durio Janey|
|       Melissia Jere|
|      Yukiko Kreamer|
|      Nena Kensinger|
|      Antonina Ilana|
+--------------------+


## Query 9: Create DF of all those employees whose age is greater than 45 and save them in a file

### When you use the write.csv method in PySpark, it saves the output in multiple part files within a directory. To save the output as a single CSV file with a meaningful name, you need to first coalesce the DataFrame into a single partition and then save it with the desired filename.

In [96]:
df.filter(df['age'] > 45).write.csv( 's3://beaconfire-zhuoya/emp_45_df',  header=True)




In [97]:
spark.sql("SELECT * FROM employee WHERE Age > 45").write.csv("s3://beaconfire-zhuoya/emp_45_sql", header=True)




In [None]:
# Employees whose age is greater than 45
employees_age_above_45 = df.filter(df.age > 45)

# Coalesce into a single partition and save as a single CSV file
output_path = "s3://beaconfire-zhuoya/employees_age_above_45_df.csv"
employees_age_above_45.coalesce(1).write.mode('overwrite').csv(output_path, header=True)

# Rename the part file to the desired file name
import boto3

s3 = boto3.client('s3')

bucket = 'beaconfire-zhuoya'
prefix = 'employees_age_above_45_df.csv'
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in response['Contents']:
    if obj['Key'].endswith('.csv'):
        copy_source = {'Bucket': bucket, 'Key': obj['Key']}
        s3.copy_object(CopySource=copy_source, Bucket=bucket, Key='employees_age_above_45_df.csv')
        s3.delete_object(Bucket=bucket, Key=obj['Key'])


In [None]:
# Employees whose age is greater than 45 using SQL
employees_age_above_45_sql = spark.sql("SELECT * FROM employee WHERE age > 45")

# Coalesce into a single partition and save as a single CSV file
output_path_sql = "s3://beaconfire-zhuoya/employees_age_above_45_sql.csv"
employees_age_above_45_sql.coalesce(1).write.mode('overwrite').csv(output_path_sql, header=True)

# Rename the part file to the desired file name
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in response['Contents']:
    if obj['Key'].endswith('.csv'):
        copy_source = {'Bucket': bucket, 'Key': obj['Key']}
        s3.copy_object(CopySource=copy_source, Bucket=bucket, Key='employees_age_above_45_sql.csv')
        s3.delete_object(Bucket=bucket, Key=obj['Key'])
