In [1]:
#importing pyspark
import pyspark

In [2]:
#importing sparksession
from pyspark.sql import SparkSession

In [3]:
#creating a sparksession object and providing appName 
spark=SparkSession.builder.appName("business_case_sql").getOrCreate()

# Create Dataframe

In [4]:
data = [('Janice', 27, 25000),('Ross', 25, 30000),('Chandler', 22, 19000),('Joey', 30, 35000)]

In [5]:
sqlDF = spark.createDataFrame(data, ['name', 'age', 'salary'])

In [6]:
sqlDF.show()

+--------+---+------+
|    name|age|salary|
+--------+---+------+
|  Janice| 27| 25000|
|    Ross| 25| 30000|
|Chandler| 22| 19000|
|    Joey| 30| 35000|
+--------+---+------+



# Create Table from DataFrame

In [7]:
sqlDF.createOrReplaceTempView("person")

In [8]:
spark.sql("SELECT * from person").show()

+--------+---+------+
|    name|age|salary|
+--------+---+------+
|  Janice| 27| 25000|
|    Ross| 25| 30000|
|Chandler| 22| 19000|
|    Joey| 30| 35000|
+--------+---+------+



# Create DataFrame from table 

In [9]:
newDF = spark.table("person")

In [10]:
newDF.show()

+--------+---+------+
|    name|age|salary|
+--------+---+------+
|  Janice| 27| 25000|
|    Ross| 25| 30000|
|Chandler| 22| 19000|
|    Joey| 30| 35000|
+--------+---+------+



# Other ways to create a table

In [11]:
sqlDF.registerTempTable("person1")



In [12]:
spark.sql("select * from person1").show()

+--------+---+------+
|    name|age|salary|
+--------+---+------+
|  Janice| 27| 25000|
|    Ross| 25| 30000|
|Chandler| 22| 19000|
|    Joey| 30| 35000|
+--------+---+------+



# Drop table 

In [13]:
spark.catalog.dropTempView("person")

# Implementing some basics sql function

In [14]:
spark.sql("SELECT name,age from person1").show()

+--------+---+
|    name|age|
+--------+---+
|  Janice| 27|
|    Ross| 25|
|Chandler| 22|
|    Joey| 30|
+--------+---+



In [15]:
spark.sql("SELECT max(salary) from person1").show()

+-----------+
|max(salary)|
+-----------+
|      35000|
+-----------+



In [16]:
spark.sql("SELECT avg(salary) from person1").show()

+-----------+
|avg(salary)|
+-----------+
|    27250.0|
+-----------+



In [17]:
spark.sql("SELECT sum(salary) as Total_Salary from person1").show()

+------------+
|Total_Salary|
+------------+
|      109000|
+------------+



In [18]:
spark.sql("SELECT * from person1 where salary > 20000").show()

+------+---+------+
|  name|age|salary|
+------+---+------+
|Janice| 27| 25000|
|  Ross| 25| 30000|
|  Joey| 30| 35000|
+------+---+------+



In [19]:
spark.sql("SELECT * from person1 where salary > 20000 and age > 25").show()

+------+---+------+
|  name|age|salary|
+------+---+------+
|Janice| 27| 25000|
|  Joey| 30| 35000|
+------+---+------+



# Joining on Tables

In [20]:
x = [('Pasta',1),('Noodles',2),('Bobby',7),('Monica',4)]
df1 = spark.createDataFrame(x,['name','id'])
 
y = [('Times',1),('Hindu',5),('',3),('',4)]
df2 = spark.createDataFrame(y,['name','id'])
 

In [21]:
df1.registerTempTable("table1")
df2.registerTempTable("table2")

# Inner Join

In [22]:
spark.sql("SELECT table1.*,table2.* FROM table2 inner JOIN table1 ON table2.id = table1.id").show()

+---------+---+-----------+---+
|     name| id|       name| id|
+---------+---+-----------+---+
|   Pirate|  1|   Rutabaga|  1|
|Spaghetti|  4|Darth Vader|  4|
+---------+---+-----------+---+



# Left Join

In [23]:
spark.sql("SELECT table1.*,table2.* FROM table2 left JOIN table1 ON table2.id = table1.id").show()

+---------+----+-----------+---+
|     name|  id|       name| id|
+---------+----+-----------+---+
|   Pirate|   1|   Rutabaga|  1|
|     null|null|      Ninja|  3|
|Spaghetti|   4|Darth Vader|  4|
|     null|null|     Pirate|  5|
+---------+----+-----------+---+



# Right Join

In [24]:
spark.sql("SELECT table1.*,table2.* FROM table2 right JOIN table1 ON table2.id = table1.id").show()

+---------+---+-----------+----+
|     name| id|       name|  id|
+---------+---+-----------+----+
|   Pirate|  1|   Rutabaga|   1|
|   Monkey|  2|       null|null|
|Spaghetti|  4|Darth Vader|   4|
|    Ninja|  7|       null|null|
+---------+---+-----------+----+



# Full Join

In [None]:
spark.sql("SELECT table1.*,table2.* FROM table2 full JOIN table1 ON table2.id = table1.id").show()

# Left Anti Join

In [None]:
spark.sql("SELECT table2.* FROM table2 left anti JOIN table1 ON table2.id = table1.id").show()

# Left Semi Join

In [None]:
spark.sql("SELECT table2.* FROM table2 left semi JOIN table1 ON table2.id = table1.id").show()