In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.functions import sum, avg, max, min, count, mean

spark = SparkSession.builder.appName("MiniProject").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/23 20:31:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.options(header="True", inferSchema="True").csv("OfficeDataProject.csv")
df.show()

+-----------+-------------------+----------+-----+------+---+-----+
|employee_id|      employee_name|department|state|salary|age|bonus|
+-----------+-------------------+----------+-----+------+---+-----+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
|       1003|        Tamra Amber|  Accounts|   AK|  5717| 47| 1291|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|
|       1008|  Recalde Kensinger|  Accounts|   LA|  3704| 48| 1330|
|       1009|        Imai Hallie|  Accounts|   AK|  5061| 38| 1557|
|       1010|    Debroah Gallman|  Accounts|   NY|  9308| 35|  817|
|       1011|   Barringer Escoto|Purchasing|   W

In [4]:
df.count()

1000

In [5]:
df.select("department").dropDuplicates(["department"]).count()

6

In [6]:
df.select("department").dropDuplicates(["department"]).show()

+----------+
|department|
+----------+
|     Sales|
|        HR|
|   Finance|
|Purchasing|
| Marketing|
|  Accounts|
+----------+



In [7]:
df.groupBy("department").count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|  169|
|        HR|  171|
|   Finance|  162|
|Purchasing|  166|
| Marketing|  170|
|  Accounts|  162|
+----------+-----+



In [8]:
df.groupBy("state").count().show()

+-----+-----+
|state|count|
+-----+-----+
|   LA|  205|
|   CA|  205|
|   WA|  208|
|   NY|  173|
|   AK|  209|
+-----+-----+



In [9]:
df.groupBy("department", "state").count().show()

+----------+-----+-----+
|department|state|count|
+----------+-----+-----+
|        HR|   WA|   47|
|        HR|   LA|   41|
| Marketing|   AK|   42|
| Marketing|   WA|   39|
|   Finance|   LA|   29|
|   Finance|   NY|   31|
|        HR|   NY|   30|
|  Accounts|   CA|   35|
| Marketing|   NY|   30|
|Purchasing|   CA|   32|
|     Sales|   CA|   42|
|     Sales|   LA|   35|
|        HR|   CA|   28|
| Marketing|   CA|   33|
|  Accounts|   AK|   37|
|Purchasing|   LA|   45|
|  Accounts|   NY|   34|
|     Sales|   WA|   27|
|        HR|   AK|   25|
|  Accounts|   WA|   27|
+----------+-----+-----+
only showing top 20 rows



In [13]:
df.groupBy("department").agg(min("salary").alias("min"), 
                             max("salary").alias("max"), avg("salary").alias("avg")).orderBy(col("min").desc()).show()

+----------+----+----+-----------------+
|department| min| max|              avg|
+----------+----+----+-----------------+
|Purchasing|1105|9985|5135.849397590361|
|     Sales|1103|9982|5499.585798816568|
| Marketing|1031|9974|5188.135294117647|
|        HR|1013|9982|5780.918128654971|
|  Accounts|1007|9890|5196.746913580247|
|   Finance|1006|9899| 5157.83950617284|
+----------+----+----+-----------------+



In [20]:
avgBonus = df.filter(df.state == "NY").groupBy("state").agg(avg("bonus").alias("avgBonus")).select("avgBonus").collect()[0]["avgBonus"]
df.filter((df.state == "NY") & (df.department == "Finance") & (df.bonus > avgBonus)).show()

+-----------+--------------------+----------+-----+------+---+-----+
|employee_id|       employee_name|department|state|salary|age|bonus|
+-----------+--------------------+----------+-----+------+---+-----+
|       1035|       Vivan Sifford|   Finance|   NY|  1129| 35| 1261|
|       1073|      Herder Gallman|   Finance|   NY|  1988| 31| 1402|
|       1082|          Nena Rocha|   Finance|   NY|  3417| 25| 1647|
|       1087|       Leif Lemaster|   Finance|   NY|  8642| 45| 1782|
|       1100|Ellingsworth Meli...|   Finance|   NY|  7845| 32| 1358|
|       1127|        Escoto Gilma|   Finance|   NY|  3426| 41| 1285|
|       1161|     Georgeanna Laub|   Finance|   NY|  2469| 26| 1679|
|       1175|     Durio Tenenbaum|   Finance|   NY|  2253| 42| 1684|
|       1180|       Juliana Grigg|   Finance|   NY|  8178| 42| 1617|
|       1215|        Tiffani Benz|   Finance|   NY|  1665| 41| 1969|
|       1220|          Nitz Ilana|   Finance|   NY|  2443| 50| 1342|
|       1342|   Phylicia Antonina|

In [21]:
from pyspark.sql.types import IntegerType

In [22]:
def incr_salary(age, currentSalary):
    if age > 45:
        return currentSalary + 500
    return currentSalary

In [23]:
incrSalary = udf(lambda x, y: incr_salary(x, y), IntegerType())

In [24]:
df.withColumn("new_salary", incrSalary(col("age"), col("salary"))).show()

[Stage 52:>                                                         (0 + 1) / 1]

+-----------+-------------------+----------+-----+------+---+-----+----------+
|employee_id|      employee_name|department|state|salary|age|bonus|new_salary|
+-----------+-------------------+----------+-----+------+---+-----+----------+
|       1000|          Nitz Leif| Marketing|   CA|  6131| 26|  543|      6131|
|       1001|    Melissia Dedman|   Finance|   AK|  4027| 43| 1290|      4027|
|       1002|  Rudolph Barringer|        HR|   LA|  3122| 43| 1445|      3122|
|       1003|        Tamra Amber|  Accounts|   AK|  5717| 47| 1291|      6217|
|       1004|        Mullan Nitz|Purchasing|   CA|  5685| 34| 1394|      5685|
|       1005|      Zollner Karie|  Accounts|   CA|  2843| 27| 1078|      2843|
|       1006|Kaczorowski Zollner|     Sales|   CA|  7201| 21| 1834|      7201|
|       1007|      Nakano Locust| Marketing|   LA|  3444| 23| 1823|      3444|
|       1008|  Recalde Kensinger|  Accounts|   LA|  3704| 48| 1330|      4204|
|       1009|        Imai Hallie|  Accounts|   AK|  

                                                                                

In [26]:
df.filter(df.age > 45).write.csv("output")

23/11/23 21:35:10 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1029129 ms exceeds timeout 120000 ms
23/11/23 21:35:10 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/23 21:35:15 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$