In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, min, max, avg, lit
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("Mini_Project").getOrCreate()

In [0]:
data = spark.read.options(header=True, inferSchema=True).csv('/FileStore/tables/OfficeDataProject.csv')
data.show(5)
data.printSchema()

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|      Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|      Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
+-----------+-----------------+----------+-----+------+---+-----+
only showing top 5 rows

root
 |-- employee_id: integer (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)



In [0]:
data = data.withColumn('employee_id', col('employee_id').cast(StringType()))
data.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)



In [0]:
# Total Number of employees in Company
data.select('employee_id').distinct().count()

Out[98]: 1000

In [0]:
# Total Number of Departments in Company
#data.select('department').dropDuplicates(['department']).count()
data.select('department').distinct().count()

Out[99]: 6

In [0]:
# Print Names of the Department
#data.select('department').dropDuplicates(['department']).show()
data.select('department').distinct().show()

+----------+
|department|
+----------+
|     Sales|
|        HR|
|   Finance|
|Purchasing|
| Marketing|
|  Accounts|
+----------+



In [0]:
# Total Number of employees in each department
data.groupBy('department').agg(count('*').alias("No. of Employees")).show()

+----------+----------------+
|department|No. of Employees|
+----------+----------------+
|     Sales|             169|
|        HR|             171|
|   Finance|             162|
|Purchasing|             166|
| Marketing|             170|
|  Accounts|             162|
+----------+----------------+



In [0]:
# Total Number of employees in each state
data.groupBy('state').count().show()

+-----+-----+
|state|count|
+-----+-----+
|   LA|  205|
|   CA|  205|
|   WA|  208|
|   NY|  173|
|   AK|  209|
+-----+-----+



In [0]:
# Total Number of employees in each state in each department
data.groupBy('state', 'department').agg(count('*').alias("No. of Employees")).show()

+-----+----------+----------------+
|state|department|No. of Employees|
+-----+----------+----------------+
|   CA|     Sales|              42|
|   CA| Marketing|              33|
|   NY|  Accounts|              34|
|   NY|     Sales|              27|
|   CA|   Finance|              35|
|   CA|  Accounts|              35|
|   CA|Purchasing|              32|
|   WA|        HR|              47|
|   AK|Purchasing|              30|
|   WA|  Accounts|              27|
|   WA|Purchasing|              38|
|   AK|     Sales|              38|
|   AK|  Accounts|              37|
|   WA| Marketing|              39|
|   LA|        HR|              41|
|   LA|     Sales|              35|
|   AK|        HR|              25|
|   LA|   Finance|              29|
|   AK|   Finance|              37|
|   LA|Purchasing|              45|
+-----+----------+----------------+
only showing top 20 rows



In [0]:
# Print min, max salaries in each department and sort salaries in asc order
data.groupBy('department').agg(min(col('salary')).alias("Min_Salary"), max('salary').alias("Max_Salary")).sort(col('Min_Salary').asc(), col('Max_Salary')).show()

+----------+----------+----------+
|department|Min_Salary|Max_Salary|
+----------+----------+----------+
|   Finance|      1006|      9899|
|  Accounts|      1007|      9890|
|        HR|      1013|      9982|
| Marketing|      1031|      9974|
|     Sales|      1103|      9982|
|Purchasing|      1105|      9985|
+----------+----------+----------+



In [0]:
# Print the names of employees working in NY state under Finance department whose bonuses are greater than the average bonuses of employees in NY State
avg_bonus = data.groupBy('state').avg('bonus').where(col('state')=='NY').first()[1]
#avg_bonus = data.filter(data.state=='NY').groupBy('state').avg('bonus').collect()[0][1]
data.filter( (col('state')=='NY') & (col('department')=='Finance') & (col('bonus')>avg_bonus) ).select('employee_name').show()

+--------------------+
|       employee_name|
+--------------------+
|       Vivan Sifford|
|      Herder Gallman|
|          Nena Rocha|
|       Leif Lemaster|
|Ellingsworth Meli...|
|        Escoto Gilma|
|     Georgeanna Laub|
|     Durio Tenenbaum|
|       Juliana Grigg|
|        Tiffani Benz|
|          Nitz Ilana|
|   Phylicia Antonina|
|         Durio Janey|
|       Melissia Jere|
|      Yukiko Kreamer|
|      Nena Kensinger|
|      Antonina Ilana|
+--------------------+



In [0]:
def raisesalary(age, sal, incre):
    if age>45:
        return sal+incre
    else:
        return sal
raisesalaryUDF = udf(lambda x,y,z: raisesalary(x,y,z))
data_updated = data.withColumn('salary', raisesalaryUDF(col('age'), col('salary'),lit(500)))


In [0]:
data.show(5)
data_updated.show(5)

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|      Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|      Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
+-----------+-----------------+----------+-----+------+---+-----+
only showing top 5 rows

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3

In [0]:
datatosave = data_updated.filter(col('age')>45)
datatosave.show(5)

+-----------+------------------+----------+-----+------+---+-----+
|employee_id|     employee_name|department|state|salary|age|bonus|
+-----------+------------------+----------+-----+------+---+-----+
|       1003|       Tamra Amber|  Accounts|   AK|  6217| 47| 1291|
|       1008| Recalde Kensinger|  Accounts|   LA|  4204| 48| 1330|
|       1011|  Barringer Escoto|Purchasing|   WA|  2185| 49| 1706|
|       1018|Vankirk Jacquelyne|Purchasing|   NY|  9136| 47| 1192|
|       1025|   Dionne Lemaster|     Sales|   AK|  5634| 48| 1356|
+-----------+------------------+----------+-----+------+---+-----+
only showing top 5 rows



In [0]:
datatosave.write.mode('overwrite').csv('/FileStore/tables/miniproject')