# Advanced SQL puzzles with PySpark

In [7]:
!python --version

Python 3.10.8


In [1]:
import pyspark

In [8]:
pyspark.__version__

'3.3.1'

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Puzzles').getOrCreate()

In [6]:
import pyspark.sql.functions as f

### Puzzle #1

In [48]:
rows1 = [("Sugar", ), 
        ("Bread", ), 
        ("Juice", ), 
        ("Soda", ),
        ("Flour", )
      ]
nameColumns1 = ["Item"]
df1 = spark.createDataFrame(data=rows, schema = nameColumns1)
df1.show(truncate=False)

+-----+
|Item |
+-----+
|Sugar|
|Bread|
|Juice|
|Soda |
|Flour|
+-----+



In [49]:
rows1_ = [("Sugar", ), 
          ("Bread", ), 
          ("Butter", ), 
          ("Cheese", ),
          ("Fruit", )
         ]
nameColumns1_ = ["Item"]
df1_ = spark.createDataFrame(data=rows1_, schema = nameColumns1_)
df1_.show(truncate=False)

+------+
|Item  |
+------+
|Sugar |
|Bread |
|Butter|
|Cheese|
|Fruit |
+------+



In [50]:
df1.join(df1_, df1.Item ==  df1_.Item, "outer").show(truncate=False)

+-----+------+
|Item |Item  |
+-----+------+
|Bread|Bread |
|null |Butter|
|null |Cheese|
|Flour|null  |
|null |Fruit |
|Juice|null  |
|Soda |null  |
|Sugar|Sugar |
+-----+------+



### Puzzle #5

In [4]:
rows5 = [("1001", "Cellular", "555-897-5421"), 
         ("1001", "Work", "555-897-6542"), 
         ("1001", "Home", "555-698-9874"), 
         ("2002", "Cellular", "555-963-6544"),
         ("2002", "Work", "555-812-9856"),
         ("3003", "Cellular", "555-987-6541")]
nameColumns5 = ["Customer ID", "Type", "Phone Number"]
df5 = spark.createDataFrame(data=rows5, schema = nameColumns5)
df5.show(truncate=False)

+-----------+--------+------------+
|Customer ID|Type    |Phone Number|
+-----------+--------+------------+
|1001       |Cellular|555-897-5421|
|1001       |Work    |555-897-6542|
|1001       |Home    |555-698-9874|
|2002       |Cellular|555-963-6544|
|2002       |Work    |555-812-9856|
|3003       |Cellular|555-987-6541|
+-----------+--------+------------+



In [12]:
df5.groupBy("Customer ID").pivot("Type").agg(f.concat_ws(", ", f.collect_list(f.col("Type")))).show()

+-----------+--------+----+----+
|Customer ID|Cellular|Home|Work|
+-----------+--------+----+----+
|       1001|Cellular|Home|Work|
|       2002|Cellular|    |Work|
|       3003|Cellular|    |    |
+-----------+--------+----+----+



### Puzzle #8

In [13]:
rows8 = [('Alpha',0,0,0), 
         ('Bravo',0,1,1), 
         ('Charlie',1,0,0), 
         ('Delta',0,0,0)]
nameColumns8 = ["Workflow", "Case1", "Case2", "Case3"]
df8 = spark.createDataFrame(data=rows8, schema = nameColumns8)
df8.show(truncate=False)

+--------+-----+-----+-----+
|Workflow|Case1|Case2|Case3|
+--------+-----+-----+-----+
|Alpha   |0    |0    |0    |
|Bravo   |0    |1    |1    |
|Charlie |1    |0    |0    |
|Delta   |0    |0    |0    |
+--------+-----+-----+-----+



In [42]:
df8.withColumn("Passed", f.col("Case1")+f.col("Case2")+f.col("Case3")).select("Workflow", "Passed").show()

+--------+------+
|Workflow|Passed|
+--------+------+
|   Alpha|     0|
|   Bravo|     2|
| Charlie|     1|
|   Delta|     0|
+--------+------+



### Puzzle #13

In [45]:
from pyspark.sql.window import Window

In [43]:
rows13 = [('7/1/2018',100),
          ('7/2/2018',75), 
          ('7/3/2018',-150), 
          ('7/4/2018',50),
          ('7/5/2018',-75)]
nameColumns13 = ["InventoryDate", "QuantityAdjustment"]
df13 = spark.createDataFrame(data=rows13, schema = nameColumns13)
df13.show(truncate=False)

+-------------+------------------+
|InventoryDate|QuantityAdjustment|
+-------------+------------------+
|7/1/2018     |100               |
|7/2/2018     |75                |
|7/3/2018     |-150              |
|7/4/2018     |50                |
|7/5/2018     |-75               |
+-------------+------------------+



In [46]:
windowSpec = Window.orderBy("InventoryDate")

In [51]:
df13.withColumn("Inventory", f.lag("QuantityAdjustment",0).over(windowSpec)).show()

+-------------+------------------+---------+
|InventoryDate|QuantityAdjustment|Inventory|
+-------------+------------------+---------+
|     7/1/2018|               100|      100|
|     7/2/2018|                75|       75|
|     7/3/2018|              -150|     -150|
|     7/4/2018|                50|       50|
|     7/5/2018|               -75|      -75|
+-------------+------------------+---------+



### Puzzle #15

In [52]:
rows15 = [(1,'SELECT'),
          (2,'Product,'),
          (3,'UnitPrice,'),
          (4,'EffectiveDate'),
          (5,'FROM'),
          (6,'Products'),
          (7,'WHERE'),
          (8,'UnitPrice'),
          (9,'> 100')]
nameColumns15 = ["InventoryDate", "QuantityAdjustment"]
df15 = spark.createDataFrame(data=rows15, schema = nameColumns15)
df15.show(truncate=False)

+-------------+------------------+
|InventoryDate|QuantityAdjustment|
+-------------+------------------+
|1            |SELECT            |
|2            |Product,          |
|3            |UnitPrice,        |
|4            |EffectiveDate     |
|5            |FROM              |
|6            |Products          |
|7            |WHERE             |
|8            |UnitPrice         |
|9            |> 100             |
+-------------+------------------+



In [56]:
df15.withColumn('All', f.concat_ws(", ", f.collect_list(f.col("QuantityAdjustment")).alias("QuantityAdjustment"))).show()

AnalysisException: grouping expressions sequence is empty, and 'InventoryDate' is not an aggregate function. Wrap '(concat_ws(', ', collect_list(QuantityAdjustment)) AS All)' in windowing function(s) or wrap 'InventoryDate' in first() (or first_value) if you don't care which value you get.;
Aggregate [InventoryDate#545L, QuantityAdjustment#546, concat_ws(, , collect_list(QuantityAdjustment#546, 0, 0)) AS All#566]
+- LogicalRDD [InventoryDate#545L, QuantityAdjustment#546], false
