In [0]:
df1 = spark.read.format("csv").load("dbfs:/FileStore/Temp/departments.csv")


In [0]:
from pyspark.sql.types import *

In [0]:
schema = StructType([\
        StructField("first_name", StringType(), True),\
        StructField("middle_name",StringType(), True),\
        StructField("last_name", StringType(), True),\
        StructField("id", IntegerType(), True),\
        StructField("gender", StringType(), True),
])

In [0]:
df1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [0]:
df2 = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/FileStore/Temp/departments.csv")

In [0]:
df2.printSchema()

root
 |-- DEPARTMENT_ID: integer (nullable = true)
 |-- DEPARTMENT_NAME: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- LOCATION_ID: integer (nullable = true)



In [0]:
df2.show()

+-------------+--------------------+----------+-----------+
|DEPARTMENT_ID|     DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+--------------------+----------+-----------+
|           10|      Administration|       200|       1700|
|           20|           Marketing|       201|       1800|
|           30|          Purchasing|       114|       1700|
|           40|     Human Resources|       203|       2400|
|           50|            Shipping|       121|       1500|
|           60|                  IT|       103|       1400|
|           70|    Public Relations|       204|       2700|
|           80|               Sales|       145|       2500|
|           90|           Executive|       100|       1700|
|          100|             Finance|       108|       1700|
|          110|          Accounting|       205|       1700|
|          120|            Treasury|        - |       1700|
|          130|       Corporate Tax|        - |       1700|
|          140|  Control And Credit|    

In [0]:
empDF = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/FileStore/Temp/employees.csv")

In [0]:
empDF.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
empDF.show(5)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|  MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|  MK_REP|  6000|            - |       201|           20|
+-----------+---

In [0]:
empDF.select("EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "MANAGER_ID").show(5)

+-----------+----------+---------+----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|MANAGER_ID|
+-----------+----------+---------+----------+
|        198|    Donald| OConnell|       124|
|        199|   Douglas|    Grant|       124|
|        200|  Jennifer|   Whalen|       101|
|        201|   Michael|Hartstein|       100|
|        202|       Pat|      Fay|       201|
+-----------+----------+---------+----------+
only showing top 5 rows



In [0]:
empDF.select(empDF["EMPLOYEE_ID"], empDF["FIRST_NAME"], empDF["LAST_NAME"], empDF["MANAGER_ID"]).show(3)

+-----------+----------+---------+----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|MANAGER_ID|
+-----------+----------+---------+----------+
|        198|    Donald| OConnell|       124|
|        199|   Douglas|    Grant|       124|
|        200|  Jennifer|   Whalen|       101|
+-----------+----------+---------+----------+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import *

In [0]:
empDF.select(col("EMPLOYEE_ID").alias("EMP_ID"), col("FIRST_NAME").alias("F_NAME"), col("LAST_NAME").alias("L_NAME")).show(3)

+------+--------+--------+
|EMP_ID|  F_NAME|  L_NAME|
+------+--------+--------+
|   198|  Donald|OConnell|
|   199| Douglas|   Grant|
|   200|Jennifer|  Whalen|
+------+--------+--------+
only showing top 3 rows



In [0]:
empDF.select(count("*")).show()

+--------+
|count(1)|
+--------+
|      50|
+--------+



In [0]:
empDF.withColumn("NEW_SALARY", col("SALARY") + 1000).show(2)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|NEW_SALARY|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+----------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|      3600|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|            - |       124|           50|      3600|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+----------+
only showing top 2 rows



In [0]:
empDF.withColumn("SALARY",col("SALARY")+1000).show(3)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  3600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  3600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  5400|            - |       101|           10|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
only showing top 3 rows



In [0]:
empDF.withColumnRenamed("SALARY", "NEW_SALARY").show(3)

+-----------+----------+---------+--------+------------+---------+--------+----------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|NEW_SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+----------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|      2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|      2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|      4400|            - |       101|           10|
+-----------+----------+---------+--------+------------+---------+--------+----------+--------------+----------+-------------+
only showing top 3 rows



In [0]:
empDF.drop("COMMISSION_PCT").show(3)

+-----------+----------+---------+--------+------------+---------+--------+------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|       101|           10|
+-----------+----------+---------+--------+------------+---------+--------+------+----------+-------------+
only showing top 3 rows



In [0]:
empDF.filter(col("SALARY") > 20000).show()

+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|SKING|515.123.4567|17-JUN-03|AD_PRES| 24000|            - |        - |           90|
+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+



In [0]:
empDF.filter((col("SALARY") > 10000) & (col("SALARY") < 20000)).show()


+-----------+----------+---------+--------+------------+---------+------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+------+------+--------------+----------+-------------+
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|MK_MAN| 13000|            - |       100|           20|
|        205|   Shelley|  Higgins|SHIGGINS|515.123.8080|07-JUN-02|AC_MGR| 12008|            - |       101|          110|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|21-SEP-05| AD_VP| 17000|            - |       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|13-JAN-01| AD_VP| 17000|            - |       100|           90|
|        108|     Nancy|Greenberg|NGREENBE|515.124.4569|17-AUG-02|FI_MGR| 12008|            - |       101|          100|
|        114|       Den| Raphael

In [0]:
empDF.filter((col("SALARY") > 10000) & (col("DEPARTMENT_ID") == 90)).select("EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "SALARY", "DEPARTMENT_ID").show()

+-----------+----------+---------+------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|SALARY|DEPARTMENT_ID|
+-----------+----------+---------+------+-------------+
|        100|    Steven|     King| 24000|           90|
|        101|     Neena|  Kochhar| 17000|           90|
|        102|       Lex|  De Haan| 17000|           90|
+-----------+----------+---------+------+-------------+



In [0]:
empDF.distinct().show(3)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|  MK_MAN| 13000|            - |       100|           20|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|            - |       101|           10|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
only showing top 3 rows



In [0]:
empDF.dropDuplicates().show(3)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|  MK_MAN| 13000|            - |       100|           20|
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|            - |       101|           10|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
only showing top 3 rows



In [0]:
empDF.dropDuplicates(["HIRE_DATE", "DEPARTMENT_ID"]).select("EMPLOYEE_ID", "HIRE_DATE", "DEPARTMENT_ID").show(5)
#in dropdulicate with two columns, output will not repeat duplicates from first column mentioned in dropduplicate, in this case it will not repeat HIRE_DATE from DEPARTMENT_ID

+-----------+---------+-------------+
|EMPLOYEE_ID|HIRE_DATE|DEPARTMENT_ID|
+-----------+---------+-------------+
|        106|05-FEB-06|           60|
|        127|14-JAN-07|           50|
|        119|10-AUG-07|           30|
|        137|14-JUL-03|           50|
|        114|07-DEC-02|           30|
+-----------+---------+-------------+
only showing top 5 rows



In [0]:
empDF.select(sum("SALARY").alias("Total_Salary"), count("SALARY").alias("Salary_Count")).show()

+------------+------------+
|Total_Salary|Salary_Count|
+------------+------------+
|      309116|          50|
+------------+------------+



In [0]:
empDF.select(avg("SALARY").alias("Average_Salary"), min("SALARY").alias("Min_Salary"), max("SALARY").alias("Max_Salary"), mean("SALARY").alias("Mean_Salary")).show()

+--------------+----------+----------+-----------+
|Average_Salary|Min_Salary|Max_Salary|Mean_Salary|
+--------------+----------+----------+-----------+
|       6182.32|      2100|     24000|    6182.32|
+--------------+----------+----------+-----------+



In [0]:
empDF.select("EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "SALARY", "DEPARTMENT_ID").orderBy(col("DEPARTMENT_ID").asc(),col("SALARY").desc()).show(10)

+-----------+----------+----------+------+-------------+
|EMPLOYEE_ID|FIRST_NAME| LAST_NAME|SALARY|DEPARTMENT_ID|
+-----------+----------+----------+------+-------------+
|        200|  Jennifer|    Whalen|  4400|           10|
|        201|   Michael| Hartstein| 13000|           20|
|        202|       Pat|       Fay|  6000|           20|
|        114|       Den|  Raphaely| 11000|           30|
|        115| Alexander|      Khoo|  3100|           30|
|        116|    Shelli|     Baida|  2900|           30|
|        117|     Sigal|    Tobias|  2800|           30|
|        118|       Guy|    Himuro|  2600|           30|
|        119|     Karen|Colmenares|  2500|           30|
|        203|     Susan|    Mavris|  6500|           40|
+-----------+----------+----------+------+-------------+
only showing top 10 rows



In [0]:
empDF.groupBy("DEPARTMENT_ID").sum("SALARY", "EMPLOYEE_ID").orderBy(col("DEPARTMENT_ID")).show()
# appling groupby with sum/min/max on two or more columns seperated by coma ','

+-------------+-----------+----------------+
|DEPARTMENT_ID|sum(SALARY)|sum(EMPLOYEE_ID)|
+-------------+-----------+----------------+
|           10|       4400|             200|
|           20|      19000|             403|
|           30|      24900|             699|
|           40|       6500|             203|
|           50|      85600|            3127|
|           60|      28800|             525|
|           70|      10000|             204|
|           90|      58000|             303|
|          100|      51608|             663|
|          110|      20308|             411|
+-------------+-----------+----------------+



In [0]:
# empDF.groupBy("DEPARTMENT_ID").agg(sum("SALARY"), round(avg("SALARY"),2),max("SALARY"),min("SALARY"), count("SALARY").alias("Total_salary")).orderBy(col("DEPARTMENT_ID").asc()).show()

empDF.groupBy("DEPARTMENT_ID").agg(sum("SALARY"), max("SALARY")).orderBy(col("DEPARTMENT_ID")).show()

+-------------+-----------+-----------+
|DEPARTMENT_ID|sum(SALARY)|max(SALARY)|
+-------------+-----------+-----------+
|           10|       4400|       4400|
|           20|      19000|      13000|
|           30|      24900|      11000|
|           40|       6500|       6500|
|           50|      85600|       8200|
|           60|      28800|       9000|
|           70|      10000|      10000|
|           90|      58000|      24000|
|          100|      51608|      12008|
|          110|      20308|      12008|
+-------------+-----------+-----------+



In [0]:
empDF.groupBy("DEPARTMENT_ID").agg(sum("SALARY").alias("Total_Salary"), max("SALARY").alias("Max_Salary"), min("SALARY").alias("Min_Salary"), count("SALARY").alias("Salary_Count")).where(((col("Min_Salary") > 5000)|(col("Max_Salary") < 20000))).show()

#using alias in where cluase similar to sql

+-------------+------------+----------+----------+------------+
|DEPARTMENT_ID|Total_Salary|Max_Salary|Min_Salary|Salary_Count|
+-------------+------------+----------+----------+------------+
|           20|       19000|     13000|      6000|           2|
|           40|        6500|      6500|      6500|           1|
|          100|       51608|     12008|      6900|           6|
|           10|        4400|      4400|      4400|           1|
|           50|       85600|      8200|      2100|          23|
|           70|       10000|     10000|     10000|           1|
|           90|       58000|     24000|     17000|           3|
|           60|       28800|      9000|      4200|           5|
|          110|       20308|     12008|      8300|           2|
|           30|       24900|     11000|      2500|           6|
+-------------+------------+----------+----------+------------+



In [0]:
empDF.groupBy("DEPARTMENT_ID").agg(sum("SALARY").alias("Total_Salary"), max("SALARY").alias("Max_Salary"), min("SALARY").alias("Min_Salary"), count("SALARY").alias("Salary_Count")).where(((col("Min_Salary")> 5000) | (col("Max_Salary")< 20000))).orderBy(col("Total_Salary").desc()).show()

+-------------+------------+----------+----------+------------+
|DEPARTMENT_ID|Total_Salary|Max_Salary|Min_Salary|Salary_Count|
+-------------+------------+----------+----------+------------+
|           50|       85600|      8200|      2100|          23|
|           90|       58000|     24000|     17000|           3|
|          100|       51608|     12008|      6900|           6|
|           60|       28800|      9000|      4200|           5|
|           30|       24900|     11000|      2500|           6|
|          110|       20308|     12008|      8300|           2|
|           20|       19000|     13000|      6000|           2|
|           70|       10000|     10000|     10000|           1|
|           40|        6500|      6500|      6500|           1|
|           10|        4400|      4400|      4400|           1|
+-------------+------------+----------+----------+------------+



In [0]:
gradeDF = empDF.withColumn("EMP_GRADE", when((col("SALARY")> 20000), "A").when((col("SALARY") > 10000) & (col("SALARY") < 20000), "B").otherwise("C"))
# Use of withColumn, when, otherwise similar to case when in sql

In [0]:
gradeDF.groupBy("EMP_GRADE").agg(sum("SALARY"), max("SALARY")).orderBy("EMP_GRADE").show()

+---------+-----------+-----------+
|EMP_GRADE|sum(SALARY)|max(SALARY)|
+---------+-----------+-----------+
|        A|      24000|      24000|
|        B|      82016|      17000|
|        C|     203100|      10000|
+---------+-----------+-----------+



In [0]:
# Creating tempview to directly write sql querie instead of pyspark
empDF.createOrReplaceTempView("employee")

In [0]:
spark.sql("SELECT * FROM employee").show(5)

+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|  MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|  MK_REP|  6000|            - |       201|           20|
+-----------+---

In [0]:
%sql
SELECT * FROM employee WHERE DEPARTMENT_ID = 90 AND SALARY > 2000 LIMIT 5

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90
101,Neena,Kochhar,NKOCHHAR,515.123.4568,21-SEP-05,AD_VP,17000,-,100,90
102,Lex,De Haan,LDEHAAN,515.123.4569,13-JAN-01,AD_VP,17000,-,100,90


In [0]:

sqlDF = spark.sql("SELECT * FROM employee WHERE DEPARTMENT_ID = 90 AND SALARY > 2000 LIMIT 5")

In [0]:
sqlDF = spark.sql("SELECT DEPARTMENT_ID, SUM(SALARY) AS TOTAL_SALARY FROM employee GROUP BY DEPARTMENT_ID ORDER BY TOTAL_SALARY DESC").show()

+-------------+------------+
|DEPARTMENT_ID|TOTAL_SALARY|
+-------------+------------+
|           50|       85600|
|           90|       58000|
|          100|       51608|
|           60|       28800|
|           30|       24900|
|          110|       20308|
|           20|       19000|
|           70|       10000|
|           40|        6500|
|           10|        4400|
+-------------+------------+



In [0]:
spark.sql("SELECT EMPLOYEE_ID, DEPARTMENT_ID, SALARY, RANK() OVER(PARTITION BY DEPARTMENT_ID ORDER BY SALARY DESC) AS RANK_SALARY FROM employee LIMIT 5").show()

+-----------+-------------+------+-----------+
|EMPLOYEE_ID|DEPARTMENT_ID|SALARY|RANK_SALARY|
+-----------+-------------+------+-----------+
|        200|           10|  4400|          1|
|        201|           20| 13000|          1|
|        202|           20|  6000|          2|
|        114|           30| 11000|          1|
|        115|           30|  3100|          2|
+-----------+-------------+------+-----------+



In [0]:
empDF.dropDuplicates(["HIRE_DATE", "DEPARTMENT_ID"]).select("EMPLOYEE_ID", "HIRE_DATE", "DEPARTMENT_ID").show(5)

+-----------+---------+-------------+
|EMPLOYEE_ID|HIRE_DATE|DEPARTMENT_ID|
+-----------+---------+-------------+
|        106|05-FEB-06|           60|
|        127|14-JAN-07|           50|
|        119|10-AUG-07|           30|
|        137|14-JUL-03|           50|
|        114|07-DEC-02|           30|
+-----------+---------+-------------+
only showing top 5 rows



In [0]:
deptDF = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/FileStore/Temp/departments.csv")

In [0]:
deptDF.show(2)

+-------------+---------------+----------+-----------+
|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+---------------+----------+-----------+
|           10| Administration|       200|       1700|
|           20|      Marketing|       201|       1800|
+-------------+---------------+----------+-----------+
only showing top 2 rows



In [0]:
empDF.join(deptDF, empDF.DEPARTMENT_ID == deptDF.DEPARTMENT_ID, "inner").select(empDF.EMPLOYEE_ID, deptDF.DEPARTMENT_ID, deptDF.DEPARTMENT_NAME, empDF.MANAGER_ID).show(5)
#Join

+-----------+-------------+---------------+----------+
|EMPLOYEE_ID|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|
+-----------+-------------+---------------+----------+
|        198|           50|       Shipping|       124|
|        199|           50|       Shipping|       124|
|        200|           10| Administration|       101|
|        201|           20|      Marketing|       100|
|        202|           20|      Marketing|       201|
+-----------+-------------+---------------+----------+
only showing top 5 rows



In [0]:
# Selfjoin using pyspark
# e.g. finding mangers from employee table (given managers are also employees, self join will give the details. drop duplicate to remove duplicates)
empDF.alias("emp1").join(empDF.alias("emp2"), col("emp1.MANAGER_ID") == col("emp2.EMPLOYEE_ID"), "inner").select(col("emp1.MANAGER_ID"), col("emp2.FIRST_NAME"), col("emp2.LAST_NAME")).dropDuplicates().show(20)

+----------+----------+---------+
|MANAGER_ID|FIRST_NAME|LAST_NAME|
+----------+----------+---------+
|       122|     Payam| Kaufling|
|       101|     Neena|  Kochhar|
|       100|    Steven|     King|
|       205|   Shelley|  Higgins|
|       114|       Den| Raphaely|
|       103| Alexander|   Hunold|
|       124|     Kevin|  Mourgos|
|       120|   Matthew|    Weiss|
|       123|    Shanta|  Vollman|
|       121|      Adam|    Fripp|
|       108|     Nancy|Greenberg|
|       102|       Lex|  De Haan|
|       201|   Michael|Hartstein|
+----------+----------+---------+



In [0]:
deptDF.show()

+-------------+--------------------+----------+-----------+
|DEPARTMENT_ID|     DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+--------------------+----------+-----------+
|           10|      Administration|       200|       1700|
|           20|           Marketing|       201|       1800|
|           30|          Purchasing|       114|       1700|
|           40|     Human Resources|       203|       2400|
|           50|            Shipping|       121|       1500|
|           60|                  IT|       103|       1400|
|           70|    Public Relations|       204|       2700|
|           80|               Sales|       145|       2500|
|           90|           Executive|       100|       1700|
|          100|             Finance|       108|       1700|
|          110|          Accounting|       205|       1700|
|          120|            Treasury|        - |       1700|
|          130|       Corporate Tax|        - |       1700|
|          140|  Control And Credit|    

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
location_data = [(1700, "INDIA"), (1800, "USA")]

In [0]:
schema = StructType([
                    StructField("LOCATION_ID", IntegerType(), True),\
                    StructField("LOCATION_NAME", StringType(), True)\
])

In [0]:
locDF = spark.createDataFrame(data = location_data, schema = schema)

In [0]:
locDF.printSchema()

root
 |-- LOCATION_ID: integer (nullable = true)
 |-- LOCATION_NAME: string (nullable = true)



In [0]:
locDF.show()

+-----------+-------------+
|LOCATION_ID|LOCATION_NAME|
+-----------+-------------+
|       1700|        INDIA|
|       1800|          USA|
+-----------+-------------+



In [0]:
empDF.join(deptDF, (empDF.DEPARTMENT_ID== deptDF.DEPARTMENT_ID) & (deptDF.LOCATION_ID == 1700))\
    .join(locDF, deptDF.LOCATION_ID == locDF.LOCATION_ID)\
        .select(empDF.EMPLOYEE_ID, empDF.DEPARTMENT_ID, deptDF.LOCATION_ID, locDF.LOCATION_NAME)\
            .show(5
                  )

+-----------+-------------+-----------+-------------+
|EMPLOYEE_ID|DEPARTMENT_ID|LOCATION_ID|LOCATION_NAME|
+-----------+-------------+-----------+-------------+
|        119|           30|       1700|        INDIA|
|        118|           30|       1700|        INDIA|
|        117|           30|       1700|        INDIA|
|        116|           30|       1700|        INDIA|
|        115|           30|       1700|        INDIA|
+-----------+-------------+-----------+-------------+
only showing top 5 rows



In [0]:
#creating udf

@udf(returnType=StringType())
def upperCaseNew(in_str):
    out_str = in_str.upper()
    return out_str


In [0]:
from pyspark.sql.functions import udf
# udf function is imported when import * is done

In [0]:
empDF.select(col("EMPLOYEE_ID"), col("FIRST_NAME"), col("LAST_NAME"), upperCaseNew(col("FIRST_NAME")), upperCaseNew(col("LAST_NAME"))).show(5)
  
  #applying udf

+-----------+----------+---------+------------------------+-----------------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|upperCaseNew(FIRST_NAME)|upperCaseNew(LAST_NAME)|
+-----------+----------+---------+------------------------+-----------------------+
|        198|    Donald| OConnell|                  DONALD|               OCONNELL|
|        199|   Douglas|    Grant|                 DOUGLAS|                  GRANT|
|        200|  Jennifer|   Whalen|                JENNIFER|                 WHALEN|
|        201|   Michael|Hartstein|                 MICHAEL|              HARTSTEIN|
|        202|       Pat|      Fay|                     PAT|                    FAY|
+-----------+----------+---------+------------------------+-----------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.window import Window
# importing Window

In [0]:
# create a window spec before applying window function
windowSpec = Window.partitionBy("DEPARTMENT_ID").orderBy(col("SALARY").desc())

In [0]:
#apply window function using window spec created begining with withColumn
empDF.withColumn(("SALARY_RANK"), rank().over(windowSpec)).select("DEPARTMENT_ID", "SALARY", "SALARY_RANK").show(5)

+-------------+------+-----------+
|DEPARTMENT_ID|SALARY|SALARY_RANK|
+-------------+------+-----------+
|           10|  4400|          1|
|           20| 13000|          1|
|           20|  6000|          2|
|           30| 11000|          1|
|           30|  3100|          2|
+-------------+------+-----------+
only showing top 5 rows



In [0]:
empDF.withColumn("SUM", sum("SALARY").over(windowSpec)).select("DEPARTMENT_ID", "SALARY", "SUM").select("DEPARTMENT_ID", "SALARY", "SUM").show(5)

+-------------+------+-----+
|DEPARTMENT_ID|SALARY|  SUM|
+-------------+------+-----+
|           10|  4400| 4400|
|           20| 13000|13000|
|           20|  6000|19000|
|           30| 11000|11000|
|           30|  3100|14100|
+-------------+------+-----+
only showing top 5 rows



In [0]:
windowSpec = Window.partitionBy("DEPARTMENT_ID")
# when partitionBy is defined without orderBy clause, it will return total sum against each row.
# when partitionBy is defined witth orderBy clause, it'll return rolling/running sum for each department against rows

In [0]:
empDF.withColumn("SUM", sum("SALARY").over(windowSpec)).select("DEPARTMENT_ID", "SALARY", "SUM").select("DEPARTMENT_ID", "SALARY", "SUM").show(5)

+-------------+------+-----+
|DEPARTMENT_ID|SALARY|  SUM|
+-------------+------+-----+
|           10|  4400| 4400|
|           20| 13000|19000|
|           20|  6000|19000|
|           30| 11000|24900|
|           30|  3100|24900|
+-------------+------+-----+
only showing top 5 rows



In [0]:
dbutils.fs.mkdirs("/Project_1_Attrition")

Out[128]: True