In [5]:
data1 = [(7369, "SMITH", "CLERK", 7902, "17-DEC-2005", 800, None, 20),
        (7499, "ALLEN", "SALESMAN",7698,"20-FEB-2006",1600,300,  30),
        (7521, "WARD", "SALESMAN", 7698,"22-FEB-2006",1250,500,  30),
        (7566, "JONES", "MANAGER", 7839, "02-APR-2006",2975,None,20),
        (7654, "MARTIN", "SALESMAN", 7698, "28-SEP-2006",1250,1400,30),
        (7698, "BLAKE", "MANAGER", 7839, "01-MAY-2006", 2850, None,30),
        (7782, "CLARK", "MANAGER", 7839, "09-JUN-2006", 2450, None, 10),
        (7788, "SCOTT", "ANALYST", 7566, "09-DEC-2007", 3000, None, 20),
        (7839, "KING", "PRESIDENT",None, "17-NOV-2006", 5000, None, 10),
        (7844, "Turner", "SALESMAN",7698, "08-SEP-2006", 1500, 0,  30),
        (7876, "ADAMS", "CLERK",   7788,  "12-JAN-2008", 1100, None, 20),
        (7900, "JAMES",  "CLERK",  7698,  "03-DEC-2006", 950, None, 30),
        (7902, "FORD",  "ANALYST", 7566,  "03-DEC-2006", 3000, None, 20),
        (7934, "MILLER", "CLERK",  7782,  "23-JAN-2007", 1300, None, 10)
       ]
schema1 = ["empno","ename","job","mgr","hiredate","sal","comm","deptno"]
        

In [6]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()
emp_df = spark.createDataFrame(data1,schema1)

In [3]:
import sparkmon 
mon = sparkmon.SparkMon(spark, period=5)
mon.start()

app = mon.application

In [4]:
app.debug = True
app.parse_db() 

In [None]:
mon.application.plot()

In [8]:
#df.show()
emp_df.createOrReplaceTempView('emp')

In [10]:
data2 = [(10, "ACCOUNTING", "NEW YORK"),
         (20, "RESEARCH", "DALLAS"),
         (30, "SALES",    "CHICAGO"),
         (40, "OPERATIONS", "BOSTON") ]
schema2 = ["deptno", "dname", "loc"]

In [11]:
dept_df = spark.createDataFrame(data = data2, schema = schema2) 

In [12]:
dept_df.createOrReplaceTempView('dept')

# Combining Related Rows

In [10]:
spark.sql('''select * 
           from emp e inner join dept d
           on (e.deptno=d.deptno) 
           where e.deptno=10 ''').show()
           

                                                                                

+-----+------+---------+----+-----------+----+----+------+------+----------+--------+
|empno| ename|      job| mgr|   hiredate| sal|comm|deptno|deptno|     dname|     loc|
+-----+------+---------+----+-----------+----+----+------+------+----------+--------+
| 7782| CLARK|  MANAGER|7839|09-JUN-2006|2450|NULL|    10|    10|ACCOUNTING|NEW YORK|
| 7839|  KING|PRESIDENT|NULL|17-NOV-2006|5000|NULL|    10|    10|ACCOUNTING|NEW YORK|
| 7934|MILLER|    CLERK|7782|23-JAN-2007|1300|NULL|    10|    10|ACCOUNTING|NEW YORK|
+-----+------+---------+----+-----------+----+----+------+------+----------+--------+



In [None]:
from pyspark.sql.functions import expr
emp_df.join(dept_df, emp_df.deptno==dept_df.deptno, "inner").where(expr("deptno=10")).show()

In [11]:
emp_df.join(dept_df, emp_df.deptno==dept_df.deptno, "inner").select(emp_df.ename, dept_df.loc).where(emp_df.deptno==10).show()



+------+--------+
| ename|     loc|
+------+--------+
| CLARK|NEW YORK|
|  KING|NEW YORK|
|MILLER|NEW YORK|
+------+--------+



                                                                                

# Retrieving Values in one table that do not exist in another.

In [18]:
lst = emp_df.select("deptno").collect()
lst = [row[0] for row in lst]
dept_df.select("deptno").where(~dept_df.deptno.isin(lst)).show()

+------+
|deptno|
+------+
|    40|
+------+



In [9]:
spark.sql('''select deptno from dept 
             where deptno not in (select deptno from emp)''').show()

+------+
|deptno|
+------+
|    40|
+------+



In [20]:
lst = emp_df.select("deptno").collect()
lst = [row[0] for row in lst]
print(lst)
dept_df.where(~dept_df.deptno.isin(lst)).show()

[20, 30, 30, 20, 30, 30, 10, 20, 10, 30, 20, 30, 20, 10]
+------+----------+------+
|deptno|     dname|   loc|
+------+----------+------+
|    40|OPERATIONS|BOSTON|
+------+----------+------+



In [10]:
li_emp_df = emp_df.select("deptno").collect()
#li_emp_df = emp_df.select(emp_df.deptno)
print(li_emp_df[0])
#dept_df.select(dept_df.deptno).where(~dept_df.deptno.isin(li_emp_df)).show()

Row(deptno=20)


In [12]:
li_emp_df = [row[0] for row in emp_df.select("deptno").collect()]
print(li_emp_df)

[20, 30, 30, 20, 30, 30, 10, 20, 10, 30, 20, 30, 20, 10]


In [13]:
dept_df.select(dept_df.deptno).where(~dept_df.deptno.isin(li_emp_df)).show()

+------+
|deptno|
+------+
|    40|
+------+



# Retrieving Rows from One Table That Do Not Corresponds to Rows in another.

In [14]:
dept_df.select('*').where(~dept_df.deptno.isin(li_emp_df)).show()

+------+----------+------+
|deptno|     dname|   loc|
+------+----------+------+
|    40|OPERATIONS|BOSTON|
+------+----------+------+



In [21]:
df = dept_df.join(emp_df, dept_df.deptno==emp_df.deptno, "leftouter").select(dept_df.dname,dept_df.loc,emp_df.deptno)

In [23]:
df.filter(expr("deptno is not NULL")).show()

+----------+--------+------+
|     dname|     loc|deptno|
+----------+--------+------+
|ACCOUNTING|NEW YORK|    10|
|ACCOUNTING|NEW YORK|    10|
|ACCOUNTING|NEW YORK|    10|
|  RESEARCH|  DALLAS|    20|
|  RESEARCH|  DALLAS|    20|
|  RESEARCH|  DALLAS|    20|
|  RESEARCH|  DALLAS|    20|
|  RESEARCH|  DALLAS|    20|
|     SALES| CHICAGO|    30|
|     SALES| CHICAGO|    30|
|     SALES| CHICAGO|    30|
|     SALES| CHICAGO|    30|
|     SALES| CHICAGO|    30|
|     SALES| CHICAGO|    30|
+----------+--------+------+



In [9]:
df.filter("deptno is NULL").show()

                                                                                

+----------+------+------+
|     dname|   loc|deptno|
+----------+------+------+
|OPERATIONS|BOSTON|  NULL|
+----------+------+------+



# Adding Joins to a Query Without Interfering with Other Joins

# Determing whether two tables have the same data.

In [14]:
spark.sql('''DROP VIEW IF EXISTS view ''')
spark.sql('''CREATE TEMPORARY VIEW view
             as
             (select * from emp where deptno != 10)
             union all 
             select * from emp where ename = 'ward'
          ''') 
spark.sql('''select * from view''').show()

+-----+------+--------+----+-----------+----+----+------+
|empno| ename|     job| mgr|   hiredate| sal|comm|deptno|
+-----+------+--------+----+-----------+----+----+------+
| 7369| SMITH|   CLERK|7902|17-DEC-2005| 800|NULL|    20|
| 7499| ALLEN|SALESMAN|7698|20-FEB-2006|1600| 300|    30|
| 7521|  WARD|SALESMAN|7698|22-FEB-2006|1250| 500|    30|
| 7566| JONES| MANAGER|7839|02-APR-2006|2975|NULL|    20|
| 7654|MARTIN|SALESMAN|7698|28-SEP-2006|1250|1400|    30|
| 7698| BLAKE| MANAGER|7839|01-MAY-2006|2850|NULL|    30|
| 7788| SCOTT| ANALYST|7566|09-DEC-2007|3000|NULL|    20|
| 7844|Turner|SALESMAN|7698|08-SEP-2006|1500|   0|    30|
| 7876| ADAMS|   CLERK|7788|12-JAN-2008|1100|NULL|    20|
| 7900| JAMES|   CLERK|7698|03-DEC-2006| 950|NULL|    30|
| 7902|  FORD| ANALYST|7566|03-DEC-2006|3000|NULL|    20|
+-----+------+--------+----+-----------+----+----+------+



##### Find rows in table emp that do not exists in view.

In [None]:
emp_df.select('*').where

# Performing Joins When Using Aggregates

In [21]:
data3 = [(7934, "17-MAR-2005", 1),
         (7934, "15-FEB-2005", 2),
         (7839, "15-FEB-2005", 3),
         (7782, "15-FEB-2005", 1) ]
schema3 = ["empno", "received", "type"]

In [22]:
emp_bonus_df=spark.createDataFrame(data=data3, schema=schema3)

In [23]:
emp_bonus_df.createOrReplaceTempView("emp_bonus")

In [24]:
spark.sql('''select *
             from emp_bonus
          ''').show()

+-----+-----------+----+
|empno|   received|type|
+-----+-----------+----+
| 7934|17-MAR-2005|   1|
| 7934|15-FEB-2005|   2|
| 7839|15-FEB-2005|   3|
| 7782|15-FEB-2005|   1|
+-----+-----------+----+



In [25]:
spark.sql('''select e.empno, e.ename, e.sal, e.deptno, e.sal*case when eb.type = 1 then .1
                                                         when eb.type = 2 then .2
                                                         else .3 
                                                    end AS bonus from emp e, emp_bonus eb 
            where e.empno = eb.empno
            and e.deptno = 10 ''').show()

+-----+------+----+------+------+
|empno| ename| sal|deptno| bonus|
+-----+------+----+------+------+
| 7782| CLARK|2450|    10| 245.0|
| 7839|  KING|5000|    10|1500.0|
| 7934|MILLER|1300|    10| 130.0|
| 7934|MILLER|1300|    10| 260.0|
+-----+------+----+------+------+



In [None]:
df=emp_bonus_df.join(emp_df, emp_bonus_df.empno == emp_df.empno,"inner")
df.withColumn("bonus", when(df.type==1, df.sal*0.1)
                       .when(df.type==2, df.sal*0.2)
                       .otherwise(df.sal*0.3))

In [26]:
df=emp_bonus_df.join(emp_df, emp_bonus_df.empno == emp_df.empno,"inner")
df=df.select(emp_df.empno,emp_df.ename,emp_df.deptno,emp_df.sal,emp_bonus_df.type)
df.withColumn("bonus", when (df.type == 1, df.sal*0.1)
                           .when (df.type == 2, df.sal*0.2)                            
                           .otherwise (df.sal*0.3 )).show()

+-----+------+------+----+----+------+
|empno| ename|deptno| sal|type| bonus|
+-----+------+------+----+----+------+
| 7782| CLARK|    10|2450|   1| 245.0|
| 7839|  KING|    10|5000|   3|1500.0|
| 7934|MILLER|    10|1300|   1| 130.0|
| 7934|MILLER|    10|1300|   2| 260.0|
+-----+------+------+----+----+------+



# Performing Outer Joins When Using Aggregates

In [20]:
df=df.withColumn("bonus", when (df.type == 1, df.sal*0.1)
                           .when (df.type == 2, df.sal*0.2)                            
                           .otherwise (df.sal*0.3 ))
df.withColumn("salary_bonus", df.sal + df.bonus).show()

+-----+------+------+----+----+------+------------+
|empno| ename|deptno| sal|type| bonus|salary_bonus|
+-----+------+------+----+----+------+------------+
| 7782| CLARK|    10|2450|   1| 245.0|      2695.0|
| 7839|  KING|    10|5000|   3|1500.0|      6500.0|
| 7934|MILLER|    10|1300|   1| 130.0|      1430.0|
| 7934|MILLER|    10|1300|   2| 260.0|      1560.0|
+-----+------+------+----+----+------+------------+



# Returning Missing Data from Multiple Tables 

In [None]:
spark.sql('''select d.deptno, d.dname, e.ename
             from dept d 
             left outer join
             emp e 
             on (d.deptno=e.deptno) ''').show()

In [None]:
#select d.deptno, d.dname, e.ename
#             from dept d 
#             left outer join
#             emp e 
#             on (d.deptno=e.deptno) 
dept_df.join(emp_df,dept_df.deptno==emp_df.deptno, "leftouter").select(dept_df.deptno,dept_df.dname,emp_df.ename).show()

In [27]:
spark.sql('''select d.deptno, d.dname, e.ename
             from dept d 
             right outer join
             emp e 
             on (d.deptno=e.deptno) ''').show()

+------+----------+------+
|deptno|     dname| ename|
+------+----------+------+
|    30|     SALES| ALLEN|
|    30|     SALES|  WARD|
|    20|  RESEARCH| SMITH|
|    30|     SALES|MARTIN|
|    30|     SALES| BLAKE|
|    20|  RESEARCH| JONES|
|    10|ACCOUNTING| CLARK|
|    10|ACCOUNTING|  KING|
|    20|  RESEARCH| SCOTT|
|    10|ACCOUNTING|MILLER|
|    30|     SALES|Turner|
|    30|     SALES| JAMES|
|    20|  RESEARCH| ADAMS|
|    20|  RESEARCH|  FORD|
+------+----------+------+



In [None]:
MySQL has no outer join. Hence, we need a union of leftouter and rightouter joins. 

In [28]:
spark.sql('''
           select ename,comm,coalesce(comm, 0)
           from emp
           where coalesce(comm, 0) < (select comm from emp where ename = 'WARD')
          ''').show()

+------+----+-----------------+
| ename|comm|coalesce(comm, 0)|
+------+----+-----------------+
| SMITH|NULL|                0|
| ALLEN| 300|              300|
| JONES|NULL|                0|
| BLAKE|NULL|                0|
| CLARK|NULL|                0|
| SCOTT|NULL|                0|
|  KING|NULL|                0|
|Turner|   0|                0|
| ADAMS|NULL|                0|
| JAMES|NULL|                0|
|  FORD|NULL|                0|
|MILLER|NULL|                0|
+------+----+-----------------+

