In [1]:
""" Notebook Information """
# References: https://github.com/XD-DENG/SQL-exercise ; https://en.wikibooks.org/wiki/SQL_Exercises/The_computer_store
# Workbook Title: PySpark SQL Exercises - Set 1

' Notebook Information '

### PySpark Setup
--------------------------------------------
#### Installing relevant libraries; Instantiating a PySpark session; Creating a SparkSession

In [None]:
""" Importing libraries """
import pandas as pd 
import numpy as np 
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext 
from pyspark.sql import SparkSession

In [3]:
""" Instantiate a SparkContext """
sc = SparkContext.getOrCreate()

## Print the Spark version
print(sc.version)

3.0.3


In [4]:
""" Creating a SparkSession """ 
spark = SparkSession.builder.appName('JoinsTutorial').getOrCreate()

### PySpark Dataframes
--------------------------------------------
#### Creating tables as PySpark dataframes

In [5]:
""" Building the schema """
# Table 1 - Manufacturers Table 
data1 = [[1, "Sony"],
        [2, "Creative Labs"], 
        [3, "Hewlett-Packard"],
        [4, "Iomega"], 
        [5, "Fujitsu"], 
        [6, "Winchester"]]
  
# specify column names
columns = ['Code', 'Name']
  
# creating a df1 from the lists of data
df1 = spark.createDataFrame(data1, columns)

# Table 2 - Products Table 
data2 = [[1, "Hard drive", 240, 5],
         [2,"Memory", 120, 6],
         [3,"ZIP drive",150,4],
         [4,"Floppy disk",5,6],
         [5,'Monitor',240,1],
         [6,'DVD drive',180,2],
         [7,'CD drive',90,2],
         [8,'Printer',270,3],
         [9,'Toner cartridge',66,3],
         [10,'DVD burner',180,2]]
  
# specify column names
columns = ['Code', 'Name', 'Price', 'Manufacturer']
  
# creating a df1 from the lists of data
df2 = spark.createDataFrame(data2, columns)

In [6]:
""" Creating Temporary Tables to be used in spark.sql """
## setting temporary views by creating dataframes
# creating a view for df1 named Manufacturers
df1.createOrReplaceTempView("Manufacturers")
  
# creating a view for df2 named Products
df2.createOrReplaceTempView("Products")

In [7]:
""" Checking the column types for Table 1 - Manufacturers """
df1.printSchema()

root
 |-- Code: long (nullable = true)
 |-- Name: string (nullable = true)



In [8]:
""" Checking the column types for Table 2 - PRoducts """
df2.printSchema()

root
 |-- Code: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Manufacturer: long (nullable = true)



In [9]:
""" Example - SQL Query """

query = """
        SELECT *
        FROM Products
        LIMIT 5
       """

spark.sql(query).show(truncate=False)

+----+-----------+-----+------------+
|Code|Name       |Price|Manufacturer|
+----+-----------+-----+------------+
|1   |Hard drive |240  |5           |
|2   |Memory     |120  |6           |
|3   |ZIP drive  |150  |4           |
|4   |Floppy disk|5    |6           |
|5   |Monitor    |240  |1           |
+----+-----------+-----+------------+



In [10]:
""" Example - SQL Query """

query = """
        SELECT *
        FROM Manufacturers
        LIMIT 5
       """

spark.sql(query).show(truncate=False)

+----+---------------+
|Code|Name           |
+----+---------------+
|1   |Sony           |
|2   |Creative Labs  |
|3   |Hewlett-Packard|
|4   |Iomega         |
|5   |Fujitsu        |
+----+---------------+



### Queries - Set 1
--------------------------------------------

In [11]:
""" Query 1.1 """

query = """
        SELECT Name
        FROM Products
       """

spark.sql(query).show(truncate=False)

+---------------+
|Name           |
+---------------+
|Hard drive     |
|Memory         |
|ZIP drive      |
|Floppy disk    |
|Monitor        |
|DVD drive      |
|CD drive       |
|Printer        |
|Toner cartridge|
|DVD burner     |
+---------------+



In [12]:
""" Query 1.2 """

query = """
        SELECT Name, Price
        FROM Products
       """

spark.sql(query).show(truncate=False)

+---------------+-----+
|Name           |Price|
+---------------+-----+
|Hard drive     |240  |
|Memory         |120  |
|ZIP drive      |150  |
|Floppy disk    |5    |
|Monitor        |240  |
|DVD drive      |180  |
|CD drive       |90   |
|Printer        |270  |
|Toner cartridge|66   |
|DVD burner     |180  |
+---------------+-----+



In [13]:
""" Query 1.3 """

query = """
        SELECT Name, Price
        FROM Products
        WHERE Price <= 200
       """

spark.sql(query).show(truncate=False)

+---------------+-----+
|Name           |Price|
+---------------+-----+
|Memory         |120  |
|ZIP drive      |150  |
|Floppy disk    |5    |
|DVD drive      |180  |
|CD drive       |90   |
|Toner cartridge|66   |
|DVD burner     |180  |
+---------------+-----+



In [14]:
""" Query 1.4 """

query = """
        SELECT Name, Price
        FROM Products
        WHERE Price BETWEEN 60 AND 120
       """

spark.sql(query).show(truncate=False)

+---------------+-----+
|Name           |Price|
+---------------+-----+
|Memory         |120  |
|CD drive       |90   |
|Toner cartridge|66   |
+---------------+-----+



In [15]:
""" Query 1.5 """

query = """
        SELECT Name, Price * 100 AS Price_Cents
        FROM Products
       """

spark.sql(query).show(truncate=False)

+---------------+-----------+
|Name           |Price_Cents|
+---------------+-----------+
|Hard drive     |24000      |
|Memory         |12000      |
|ZIP drive      |15000      |
|Floppy disk    |500        |
|Monitor        |24000      |
|DVD drive      |18000      |
|CD drive       |9000       |
|Printer        |27000      |
|Toner cartridge|6600       |
|DVD burner     |18000      |
+---------------+-----------+



In [16]:
""" Query 1.6 """

query = """
        SELECT AVG(Price) AS Average_Price
        FROM Products
       """

spark.sql(query).show(truncate=False)

+-------------+
|Average_Price|
+-------------+
|154.1        |
+-------------+



In [17]:
""" Query 1.7 """

query = """
        SELECT AVG(Price) AS Average_Price
        FROM Products
        WHERE Manufacturer = 2
       """

spark.sql(query).show(truncate=False)

+-------------+
|Average_Price|
+-------------+
|150.0        |
+-------------+



In [18]:
""" Query 1.8 """

query = """
        SELECT COUNT(*) AS Number_Of_Products
        FROM Products
        WHERE Price >= 180
       """

spark.sql(query).show(truncate=False)

+------------------+
|Number_Of_Products|
+------------------+
|5                 |
+------------------+



In [19]:
""" Query 1.9 """

query = """
        SELECT Name, Price
        FROM Products
        WHERE Price >= 180
        ORDER BY PRICE DESC, Name ASC
       """

spark.sql(query).show(truncate=False)

+----------+-----+
|Name      |Price|
+----------+-----+
|Printer   |270  |
|Hard drive|240  |
|Monitor   |240  |
|DVD burner|180  |
|DVD drive |180  |
+----------+-----+



In [20]:
""" Query 1.10 """

query = """
        SELECT *
        FROM Products AS A
        LEFT JOIN Manufacturers AS B ON A.Code = B.Code
       """

spark.sql(query).show(truncate=False)

+----+---------------+-----+------------+----+---------------+
|Code|Name           |Price|Manufacturer|Code|Name           |
+----+---------------+-----+------------+----+---------------+
|7   |CD drive       |90   |2           |null|null           |
|6   |DVD drive      |180  |2           |6   |Winchester     |
|9   |Toner cartridge|66   |3           |null|null           |
|5   |Monitor        |240  |1           |5   |Fujitsu        |
|1   |Hard drive     |240  |5           |1   |Sony           |
|10  |DVD burner     |180  |2           |null|null           |
|3   |ZIP drive      |150  |4           |3   |Hewlett-Packard|
|8   |Printer        |270  |3           |null|null           |
|2   |Memory         |120  |6           |2   |Creative Labs  |
|4   |Floppy disk    |5    |6           |4   |Iomega         |
+----+---------------+-----+------------+----+---------------+



In [21]:
""" Query 1.11 """

query = """
        SELECT A.Name, A.Price, B.Name AS Manufacturer_Name
        FROM Products AS A
        LEFT JOIN Manufacturers AS B ON A.Code = B.Code
       """

spark.sql(query).show(truncate=False)

+---------------+-----+-----------------+
|Name           |Price|Manufacturer_Name|
+---------------+-----+-----------------+
|CD drive       |90   |null             |
|DVD drive      |180  |Winchester       |
|Toner cartridge|66   |null             |
|Monitor        |240  |Fujitsu          |
|Hard drive     |240  |Sony             |
|DVD burner     |180  |null             |
|ZIP drive      |150  |Hewlett-Packard  |
|Printer        |270  |null             |
|Memory         |120  |Creative Labs    |
|Floppy disk    |5    |Iomega           |
+---------------+-----+-----------------+



In [22]:
""" Query 1.12 """

query = """
        SELECT AVG(Price) AS Average_Price
        FROM Products
        GROUP BY Manufacturer
       """

spark.sql(query).show(truncate=False)

+-------------+
|Average_Price|
+-------------+
|62.5         |
|240.0        |
|240.0        |
|168.0        |
|150.0        |
|150.0        |
+-------------+



In [23]:
""" Query 1.13 """

query = """
        SELECT AVG(Price) AS Average_Price
        FROM Products
        GROUP BY Manufacturer
       """

spark.sql(query).show(truncate=False)

+-------------+
|Average_Price|
+-------------+
|62.5         |
|240.0        |
|240.0        |
|168.0        |
|150.0        |
|150.0        |
+-------------+



In [24]:
""" Query 1.14 """

query = """
        SELECT B.Name AS Manufacturer_Name, AVG(A.Price) AS Average_Price
        FROM Products AS A
        LEFT JOIN Manufacturers AS B ON A.Code = B.Code
        GROUP BY B.Name 
        HAVING AVG(A.Price) >= 150
       """

spark.sql(query).show(truncate=False) 

+-----------------+-------------+
|Manufacturer_Name|Average_Price|
+-----------------+-------------+
|Sony             |240.0        |
|Fujitsu          |240.0        |
|null             |151.5        |
|Winchester       |180.0        |
|Hewlett-Packard  |150.0        |
+-----------------+-------------+



In [25]:
""" Query 1.15 """ 

query = """
        SELECT Name, Price
        FROM Products 
        ORDER BY Price ASC
        LIMIT 1
       """

spark.sql(query).show(truncate=False) 

+-----------+-----+
|Name       |Price|
+-----------+-----+
|Floppy disk|5    |
+-----------+-----+



In [27]:
""" Query 1.16 """ ## Check this approach

query = """
        SELECT B.Name AS Manufacturer_Name, A.Name AS Product_Name, MAX(A.Price) AS Max_Price 
        FROM Products AS A
        LEFT JOIN Manufacturers AS B ON A.Code = B.Code
        GROUP BY B.Name, A.Name
        """

spark.sql(query).show(truncate=False) 

### Reference: You can use partitions to divide the dataframe into multiple dataframes after your sort by price of each manufacturer and then you can select the first item of each table to get the maximum price.

+-----------------+---------------+---------+
|Manufacturer_Name|Product_Name   |Max_Price|
+-----------------+---------------+---------+
|Iomega           |Floppy disk    |5        |
|Creative Labs    |Memory         |120      |
|null             |Toner cartridge|66       |
|null             |DVD burner     |180      |
|Winchester       |DVD drive      |180      |
|Hewlett-Packard  |ZIP drive      |150      |
|Fujitsu          |Monitor        |240      |
|Sony             |Hard drive     |240      |
|null             |CD drive       |90       |
|null             |Printer        |270      |
+-----------------+---------------+---------+

