In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, min, max, avg, col, lit, udf

In [3]:
spark = SparkSession.builder.appName('DataframeProject').getOrCreate()

In [4]:
df = spark.read.options(inferSchema='True', header='True').csv('data/OfficeDataProject.csv')
df.printSchema()
df.show()

root
 |-- employee_id: integer (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust|

#### Total Employees in the company

In [5]:
df.count()

1000

#### Total Departments in the company

In [6]:
df.select('department').distinct().count()

6

#### Departments in the company

In [7]:
df.select('department').distinct().show()

+----------+
|department|
+----------+
|     Sales|
|        HR|
|   Finance|
|Purchasing|
| Marketing|
|  Accounts|
+----------+



#### Employees in each department

In [8]:
df.groupBy('department').agg(count('employee_id').alias('total_Employee')).distinct().show()

+----------+--------------+
|department|total_Employee|
+----------+--------------+
|     Sales|           169|
|        HR|           171|
|   Finance|           162|
|Purchasing|           166|
| Marketing|           170|
|  Accounts|           162|
+----------+--------------+



#### Employee in each states

In [9]:
df.groupBy('state').agg(count('employee_id').alias('total_emp')).show()

+-----+---------+
|state|total_emp|
+-----+---------+
|   LA|      205|
|   CA|      205|
|   WA|      208|
|   NY|      173|
|   AK|      209|
+-----+---------+



#### Employee in each state and department

In [10]:
df.groupBy('state','department').agg(count('employee_id').alias('total_emp')).orderBy('state').show()

+-----+----------+---------+
|state|department|total_emp|
+-----+----------+---------+
|   AK|Purchasing|       30|
|   AK|  Accounts|       37|
|   AK|        HR|       25|
|   AK|     Sales|       38|
|   AK|   Finance|       37|
|   AK| Marketing|       42|
|   CA|   Finance|       35|
|   CA|        HR|       28|
|   CA|     Sales|       42|
|   CA| Marketing|       33|
|   CA|Purchasing|       32|
|   CA|  Accounts|       35|
|   LA|  Accounts|       29|
|   LA| Marketing|       26|
|   LA|        HR|       41|
|   LA|Purchasing|       45|
|   LA|   Finance|       29|
|   LA|     Sales|       35|
|   NY|  Accounts|       34|
|   NY|     Sales|       27|
+-----+----------+---------+
only showing top 20 rows



#### Min-Max Salaries in Each department

In [11]:
df.groupBy('department').agg(min('salary').alias('min_sal'), max('salary').alias('max_sal')).sort(col('min_sal'), col('max_sal')).show()

+----------+-------+-------+
|department|min_sal|max_sal|
+----------+-------+-------+
|   Finance|   1006|   9899|
|  Accounts|   1007|   9890|
|        HR|   1013|   9982|
| Marketing|   1031|   9974|
|     Sales|   1103|   9982|
|Purchasing|   1105|   9985|
+----------+-------+-------+



#### NY and Finance Employees having Bonus > avg(bonus)

In [22]:
avgBonus = df.filter((df.state == 'NY')).groupBy('state').agg(avg('bonus')).collect()[0][1]
df.filter((df.state == 'NY') & (df.department == 'Finance') & (df.bonus > avgBonus)).select('employee_name').show()

+--------------------+
|       employee_name|
+--------------------+
|       Vivan Sifford|
|      Herder Gallman|
|          Nena Rocha|
|       Leif Lemaster|
|Ellingsworth Meli...|
|        Escoto Gilma|
|     Georgeanna Laub|
|     Durio Tenenbaum|
|       Juliana Grigg|
|        Tiffani Benz|
|          Nitz Ilana|
|   Phylicia Antonina|
|         Durio Janey|
|       Melissia Jere|
|      Yukiko Kreamer|
|      Nena Kensinger|
|      Antonina Ilana|
+--------------------+



#### Salary raise for employee age > 45

In [24]:
from pyspark.sql.types import IntegerType

def getRaisedSalary(age, salary):
    if age > 45:
        return salary + 500
    else:
        return salary
    
salaryUDF = udf(lambda x,y: getRaisedSalary(x,y), IntegerType())

raisedSalDF = df.withColumn('salary', salaryUDF(df.age, df.salary))

In [25]:
raisedSalDF.show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  6217| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  4204| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5061| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9308| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

#### Writing data for Employee age > 45

In [40]:
#raisedSalDF.write.mode('overwrite').csv('data/output/OfficeDataProject/')

In [26]:
spark.stop()