<a href="https://colab.research.google.com/github/roitraining/SparkforDataEngineers/blob/Development/Ch03_SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Open a terminal window and run the following commands:
sudo bash
start-hadoop
cd /home/student/ROI/SparkProgram/Day3
./fixhive.sh


### Let's make a simple hive table for regions.

In [1]:
! cat /home/student/ROI/Spark/regions.hql

CREATE TABLE Regions(
RegionID int,
RegionName string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/student/ROI/Spark/datasets/northwind/CSV/regions' OVERWRITE INTO TABLE Regions;

select * from regions



In [2]:
import sys

rootpath = '/home/student/ROI/Spark/'
datapath = f'{rootpath}datasets/'
sys.path.append(rootpath)
from pyspark_helpers import *
sc, spark, conf = initspark()



initializing pyspark
pyspark initialized


### You can query an existing Hive table and bring it into a Spark DataFrame.

In [33]:
regions = spark.sql('select * from regions')
#regions = spark.read.table('regions').where('o' in regions.regionname)
display(regions)
# regions.show()
# print(regions)

# r = spark.read.csv('hdfs://localhost:9000/user/hive/warehouse/regions', schema='regionid:int, regionname:string').where('regionid<=2')
# r.show()

Unnamed: 0,regionid,regionname
0,1,Eastern
1,2,Western
2,3,Northern
3,4,Southern


### Read in a file to a Spark DataFrame.

In [34]:
territories = spark.read.csv(f'{datapath}/northwind/CSVHeaders/territories', header=True)
territories.show()

regions.join(territories, regions.regionid == territories.RegionID).drop(regions.regionid).show()

+-----------+-------------+--------+
|TerritoryID|TerritoryName|RegionID|
+-----------+-------------+--------+
|      01581|     Westboro|       1|
|      01730|      Bedford|       1|
|      01833|    Georgetow|       1|
|      02116|       Boston|       1|
|      02139|    Cambridge|       1|
|      02184|    Braintree|       1|
|      02903|   Providence|       1|
|      03049|       Hollis|       3|
|      03801|   Portsmouth|       3|
|      06897|       Wilton|       1|
|      07960|   Morristown|       1|
|      08837|       Edison|       1|
|      10019|     New York|       1|
|      10038|     New York|       1|
|      11747|     Mellvile|       1|
|      14450|     Fairport|       1|
|      19428| Philadelphia|       3|
|      19713|       Neward|       1|
|      20852|    Rockville|       1|
|      27403|   Greensboro|       1|
+-----------+-------------+--------+
only showing top 20 rows

+----------+-----------+-------------+--------+
|regionname|TerritoryID|TerritoryName|

### Use createOrReplaceTempView to create a virtual table in the Hive catalog and then it can be queried using SQL as if it were a hive table.

In [35]:
territories.createOrReplaceTempView('territories')
ordercol = 'TerritoryName'
t1 = spark.sql('select * from territories where regionid = 1 order by territoryname')#.orderBy(ordercol)
t2 = territories.where('regionid = 1').orderBy('territoryname')
t1.show()
print(t1.count())

+-----------+-------------+--------+
|TerritoryID|TerritoryName|RegionID|
+-----------+-------------+--------+
|      01730|      Bedford|       1|
|      02116|       Boston|       1|
|      02184|    Braintree|       1|
|      02139|    Cambridge|       1|
|      27511|         Cary|       1|
|      08837|       Edison|       1|
|      14450|     Fairport|       1|
|      01833|    Georgetow|       1|
|      27403|   Greensboro|       1|
|      40222|   Louisville|       1|
|      11747|     Mellvile|       1|
|      07960|   Morristown|       1|
|      10019|     New York|       1|
|      10038|     New York|       1|
|      19713|       Neward|       1|
|      02903|   Providence|       1|
|      20852|    Rockville|       1|
|      01581|     Westboro|       1|
|      06897|       Wilton|       1|
+-----------+-------------+--------+

19


### Spark DataFrames can be saved to a Hive table using either the saveAsTable method or writing a SQL query that uses CREATE TABLE AS.

In [18]:
# ! hadoop fs -rm -r /user/hive/warehouse/territories2
# ! hadoop fs -rm -r /user/hive/warehouse/territories3
# ! hadoop fs -rm -r /user/hive/warehouse/territoryregion
# spark.sql('drop table if exists territories2')
# spark.sql('drop table if exists territories3')

# territories.write.saveAsTable('Territories2', mode='overwrite')
spark.sql('CREATE TABLE Territories3 AS SELECT * FROM territories')

DataFrame[]

### Queries use standard HQL to mix Hive tables and virtual tables. Both are read into a Spark DataFrame and the processing happens at the Spark level not at the Hive level. HQL is just used to parse the logic into the corresponding Spark methods.

In [25]:
sql = """
select r.regionid, r.regionname, t.territoryid, t.territoryname 
from regions as r 
join territories as t on r.regionid = t.regionid 
order by r.regionid, t.territoryid
"""
rt = spark.sql(sql)
rt.show(10)

# tr = regions.join(territories, regions.regionid == territories.RegionID). \
#      select('regions.regionid', 'regionname', 'TerritoryID', 'TerritoryName'). \
#      orderBy('regionid', 'territoryid')

tr = regions.join(territories, regions.regionid == territories.RegionID). \
     drop(territories.RegionID). \
     orderBy('regionid', 'territoryid')

tr = spark.sql('select * from territories as t join regions as r on r.regionid = t.regionid') \
          .drop('RegionID')

tr.show(10)

+--------+----------+-----------+-------------+
|regionid|regionname|territoryid|territoryname|
+--------+----------+-----------+-------------+
|       1|   Eastern|      01581|     Westboro|
|       1|   Eastern|      01730|      Bedford|
|       1|   Eastern|      01833|    Georgetow|
|       1|   Eastern|      02116|       Boston|
|       1|   Eastern|      02139|    Cambridge|
|       1|   Eastern|      02184|    Braintree|
|       1|   Eastern|      02903|   Providence|
|       1|   Eastern|      06897|       Wilton|
|       1|   Eastern|      07960|   Morristown|
|       1|   Eastern|      08837|       Edison|
+--------+----------+-----------+-------------+
only showing top 10 rows

+-----------+-------------+----------+
|TerritoryID|TerritoryName|regionname|
+-----------+-------------+----------+
|      01581|     Westboro|   Eastern|
|      01730|      Bedford|   Eastern|
|      01833|    Georgetow|   Eastern|
|      02116|       Boston|   Eastern|
|      02139|    Cambridge|  

### Lab: Read the northwind JSON products and make it into a TempView and do the same with the CSVHeaders version of categories. Then join the two.

In [26]:
categories = spark.read.csv(f'{datapath}/northwind/CSVHeaders/categories', header=True, inferSchema = True)
print(categories)
display(categories)
categories.createOrReplaceTempView('categories') 

products = spark.read.json(f'{datapath}/northwind/JSON/products')
print(products)
display(products)
products.createOrReplaceTempView('products') 

sql = '''
select c.categoryid, c.categoryname, p.productid, p.productname, p.unitprice
from products as p
join categories as c on p.categoryid = c.categoryid
order by c.categoryid, p.productid
'''
display(spark.sql(sql))



DataFrame[CategoryID: int, CategoryName: string, Description: string]


Unnamed: 0,CategoryID,CategoryName,Description
0,1,Beverages,Soft drinks coffees teas beers and ales
1,2,Condiments,Sweet and savory sauces relishes spreads and s...
2,3,Confections,Desserts candies and sweet breads
3,4,Dairy Products,Cheeses
4,5,Grains/Cereals,Breads crackers pasta and cereal
5,6,Meat/Poultry,Prepared meats
6,7,Produce,Dried fruit and bean curd
7,8,Seafood,Seaweed and fish


DataFrame[categoryid: bigint, discontinued: bigint, productid: bigint, productname: string, quantityperunit: string, reorderlevel: bigint, supplierid: bigint, unitprice: double, unitsinstock: bigint, unitsonorder: bigint]


Unnamed: 0,categoryid,discontinued,productid,productname,quantityperunit,reorderlevel,supplierid,unitprice,unitsinstock,unitsonorder
0,1,1,1,Chai,10 boxes x 30 bags,10,8,18.0,39,0
1,1,1,2,Chang,24 - 12 oz bottles,25,1,19.0,17,40
2,2,0,3,Aniseed Syrup,12 - 550 ml bottles,25,1,10.0,13,70
3,2,0,4,Chef Anton's Cajun Seasoning,48 - 6 oz jars,0,2,22.0,53,0
4,2,1,5,Chef Anton's Gumbo Mix,36 boxes,0,2,21.35,0,0
5,2,0,6,Grandma's Boysenberry Spread,12 - 8 oz jars,25,3,25.0,120,0
6,7,0,7,Uncle Bob's Organic Dried Pears,12 - 1 lb pkgs.,10,3,30.0,15,0
7,2,0,8,Northwoods Cranberry Sauce,12 - 12 oz jars,0,3,40.0,6,0
8,6,1,9,Mishi Kobe Niku,18 - 500 g pkgs.,0,4,97.0,29,0
9,8,0,10,Ikura,12 - 200 ml jars,0,4,31.0,31,0


Unnamed: 0,categoryid,categoryname,productid,productname,unitprice
0,1,Beverages,1,Chai,18.0
1,1,Beverages,2,Chang,19.0
2,1,Beverages,24,Guarana Fantastica,4.5
3,1,Beverages,34,Sasquatch Ale,14.0
4,1,Beverages,35,Steeleye Stout,18.0
5,1,Beverages,38,Cote de Blaye,263.5
6,1,Beverages,39,Chartreuse verte,18.0
7,1,Beverages,43,Ipoh Coffee,46.0
8,1,Beverages,67,Laughing Lumberjack Lager,14.0
9,1,Beverages,70,Outback Lager,15.0


### Install the MySQL Python connector. This has nothing to do with Spark but if you want to run SQL queries directly, it is helpful.

In [None]:
#! pip install mysql-connector-python

### Let's make sure we have a database for northwind and no regions table.

In [27]:
import mysql.connector
try:
    cn = mysql.connector.connect(host='localhost', user='test', password='password')
    cursor = cn.cursor()
    cursor.execute('create database if not exists northwind')
    cn.close()

    cn = mysql.connector.connect(host='localhost', user='test', password='password', database='northwind')
    cursor = cn.cursor()    
    cursor.execute('drop table if exists regions')
    cn.close()
except:
    print('something went wrong')
else:
    print('success')



success


### Write a DataFrame to a SQL database.

In [28]:
regions.write.format("jdbc").mode('overwrite').options(url="jdbc:mysql://localhost/northwind", driver='com.mysql.jdbc.Driver', dbtable='regions', user='test', password = "password", mode = "append", useSSL = "false").save()
print('Done')

Done


### Read a SQL table into a Spark DataFrame.

In [29]:
regions2 = spark.read.format("jdbc").options(url="jdbc:mysql://localhost/northwind", driver="com.mysql.jdbc.Driver", dbtable= "regions", user="test", password="password").load()
regions2.show()


+--------+----------+
|regionid|regionname|
+--------+----------+
|       3|  Northern|
|       4|  Southern|
+--------+----------+



### Creating the regions2 DataFrame does not execute anything yet, but by making the DataFrame into a Temp View then running a Spark SQL query, it tells Spark to read the SQL data into a DataFrame and then use the cluster to do the processing, not the SQL source.

In [None]:
regions2.createOrReplaceTempView('regions2')
spark.sql('select * from regions2 where regionid < 3').show()

### Alternate ways to code a query using SQL and methods.

In [None]:
# x = spark.sql('select count(*) from regions').collect()
# print(x[0][0])
# spark.sql('select * from regions').count()

p2 = products.withColumn('value', products.unitprice * products.unitsinstock).where('value > 500')
display(p2)

sql = """
select *
from (select *, unitprice * quantity as value) as t
where value > 500
"""

### Using SQL you can use familiar syntax instead of withColumn or withCoumnRenamed methods.

In [None]:
t1 = spark.sql('select TerritoryID as TerrID, UPPER(TerritoryName) as TerritoryName, RegionID from territories')
t1.show(5)

from pyspark.sql.functions import expr
territories.withColumn('TerritoryName', expr('UPPER(TerritoryName)')).withColumnRenamed('TerritoryID', 'TerrID').show(5)

### Sometimes there is a function in Python that doesn't exist in SQL and it would be helpful to use, so you could make a udf and use withColumn.

In [None]:
from pyspark.sql.functions import expr, udf
from pyspark.sql.types import *

t2 = spark.sql('select * from territories')
t2.printSchema()
#t2.show()
t2 = t2.withColumn('upperName', expr('UPPER(TerritoryName)'))
t2.show(5)

t2 = t2.withColumn('titleName', udf(lambda x : x.title(), StringType())(t2.upperName))
t2.show(5)


### To make it easier though, you could make the Python function into a udf that SQL can understand similar to how you can make a DataFrame seem like a virtual table with createOrReplaceTempView.

In [32]:
from pyspark.sql.functions import expr
def reverseString(x):
    return x[::-1]

spark.udf.register('reverse', reverseString, StringType())

#spark.sql('select *, reverse(TerritoryName) as Reversed from Territories').orderBy('Reversed').show()
territories.withColumn('reversed', expr('reverse(territoryname)')).show()

+-----------+-------------+--------+------------+
|TerritoryID|TerritoryName|RegionID|    reversed|
+-----------+-------------+--------+------------+
|      01581|     Westboro|       1|    orobtseW|
|      01730|      Bedford|       1|     drofdeB|
|      01833|    Georgetow|       1|   wotegroeG|
|      02116|       Boston|       1|      notsoB|
|      02139|    Cambridge|       1|   egdirbmaC|
|      02184|    Braintree|       1|   eertniarB|
|      02903|   Providence|       1|  ecnedivorP|
|      03049|       Hollis|       3|      silloH|
|      03801|   Portsmouth|       3|  htuomstroP|
|      06897|       Wilton|       1|      notliW|
|      07960|   Morristown|       1|  nwotsirroM|
|      08837|       Edison|       1|      nosidE|
|      10019|     New York|       1|    kroY weN|
|      10038|     New York|       1|    kroY weN|
|      11747|     Mellvile|       1|    elivlleM|
|      14450|     Fairport|       1|    tropriaF|
|      19428| Philadelphia|       3|aihpledalihP|


### HQL has collect_set and collect_list functions to aggregate items into a list instead of summing them up. 

In [37]:
from pyspark.sql.functions import collect_list
display(territories.groupBy(territories.RegionID). \
        agg(collect_list(territories.TerritoryName)). \
        withColumnRenamed('collect_list(TerritoryName)', 'TerritoryList'))


tr1 = spark.sql("""
                SELECT RegionID, collect_list(TerritoryName) AS TerritoryList 
                FROM Territories 
                GROUP BY RegionID
                """)
display(tr1)
tr1.printSchema()
print(tr1.take(1))


Unnamed: 0,RegionID,TerritoryList
0,3,"[Hollis, Portsmouth, Philadelphia, Beachwood, ..."
1,1,"[Westboro, Bedford, Georgetow, Boston, Cambrid..."
2,4,"[Columbia, Atlanta, Savannah, Orlando, Tampa, ..."
3,2,"[Hoffman Estates, Chicago, Denver, Colorado Sp..."


Unnamed: 0,RegionID,TerritoryList
0,3,"[Hollis, Portsmouth, Philadelphia, Beachwood, ..."
1,1,"[Westboro, Bedford, Georgetow, Boston, Cambrid..."
2,4,"[Columbia, Atlanta, Savannah, Orlando, Tampa, ..."
3,2,"[Hoffman Estates, Chicago, Denver, Colorado Sp..."


root
 |-- RegionID: string (nullable = true)
 |-- TerritoryList: array (nullable = true)
 |    |-- element: string (containsNull = true)

[Row(RegionID='3', TerritoryList=['Hollis', 'Portsmouth', 'Philadelphia', 'Beachwood', 'Findlay', 'Southfield', 'Troy', 'Bloomfield Hills', 'Racine', 'Roseville', 'Minneapolis'])]


### Instead of a simple datatype you could also collect complex structured objects using the HQL NAMED_STRUCT.

In [38]:
tl = StructType([
    StructField('TerritoryID', IntegerType()),
    StructField('TerritoryName', StringType())
])

tr1 = territories.groupBy(territories.RegionID). \
        agg(collect_list(territories.TerritoryName)))



sql = """SELECT r.RegionID, r.RegionName
, COLLECT_SET(NAMED_STRUCT("TerritoryID", t.TerritoryID, "TerritoryName", t.TerritoryName)) AS TerritoryList
FROM Regions AS r
JOIN Territories AS t ON r.RegionID = t.RegionID
GROUP BY r.RegionID, r.RegionName
ORDER BY r.RegionID"""

tr2 = spark.sql(sql)
tr2.printSchema()
print(tr2)
tr2.show()
print(tr2.take(2))
tr2.write.json('TerritoryRegion.json')
spark.sql('create table TerritoryRegion as ' + sql)

root
 |-- RegionID: integer (nullable = true)
 |-- RegionName: string (nullable = true)
 |-- TerritoryList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- TerritoryID: string (nullable = true)
 |    |    |-- TerritoryName: string (nullable = true)

DataFrame[RegionID: int, RegionName: string, TerritoryList: array<struct<TerritoryID:string,TerritoryName:string>>]
+--------+----------+--------------------+
|RegionID|RegionName|       TerritoryList|
+--------+----------+--------------------+
|       1|   Eastern|[[27511, Cary], [...|
|       2|   Western|[[60179, Hoffman ...|
|       3|  Northern|[[45839, Findlay]...|
|       4|  Southern|[[75234, Dallas],...|
+--------+----------+--------------------+

[Row(RegionID=1, RegionName='Eastern', TerritoryList=[Row(TerritoryID='27511', TerritoryName='Cary'), Row(TerritoryID='07960', TerritoryName='Morristown'), Row(TerritoryID='02184', TerritoryName='Braintree'), Row(TerritoryID='11747', TerritoryName='M

AnalysisException: '`default`.`TerritoryRegion` already exists.;'

In [43]:
# spark.sql('drop table TerritoryRegion')
# spark.sql('create table TerritoryRegion as ' + sql)
spark.read.table('TerritoryRegion').show()

+--------+----------+--------------------+
|RegionID|RegionName|       TerritoryList|
+--------+----------+--------------------+
|       1|   Eastern|[[27511, Cary], [...|
|       2|   Western|[[60179, Hoffman ...|
|       3|  Northern|[[45839, Findlay]...|
|       4|  Southern|[[75234, Dallas],...|
+--------+----------+--------------------+



In [39]:
tr2 = spark.read.json('TerritoryRegion.json')
display(tr2)
tr2.printSchema()

Unnamed: 0,RegionID,RegionName,TerritoryList
0,1,Eastern,"[(27511, Cary), (07960, Morristown), (02184, B..."
1,2,Western,"[(60179, Hoffman Estates), (94025, Menlo Park)..."
2,3,Northern,"[(45839, Findlay), (48304, Bloomfield Hills), ..."
3,4,Southern,"[(75234, Dallas), (32859, Orlando), (30346, At..."


root
 |-- RegionID: long (nullable = true)
 |-- RegionName: string (nullable = true)
 |-- TerritoryList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- TerritoryID: string (nullable = true)
 |    |    |-- TerritoryName: string (nullable = true)



### If you have data that is already collected into a complex datatype and want to flatten it, you could use HQL EXPLODE function.

### You could use the Spark explode method.

In [45]:
from pyspark.sql.functions import explode
display(tr1.select('RegionID', explode('TerritoryList')).withColumnRenamed('col','TerritoryName'))

Unnamed: 0,RegionID,TerritoryName
0,3,Hollis
1,3,Portsmouth
2,3,Philadelphia
3,3,Beachwood
4,3,Findlay
5,3,Southfield
6,3,Troy
7,3,Bloomfield Hills
8,3,Racine
9,3,Roseville


### Or if the DataFrame is turned into a Temp View, you could use the HQL query to do it.

In [46]:
tr1.createOrReplaceTempView('RegionTerritories')
sql = """
SELECT RegionID, TerritoryName
FROM RegionTerritories
LATERAL VIEW EXPLODE(TerritoryList) EXPLODED_TABLE AS TerritoryName
ORDER BY RegionID, TerritoryName
"""
spark.sql(sql).show()

+--------+-------------+
|RegionID|TerritoryName|
+--------+-------------+
|       1|      Bedford|
|       1|       Boston|
|       1|    Braintree|
|       1|    Cambridge|
|       1|         Cary|
|       1|       Edison|
|       1|     Fairport|
|       1|    Georgetow|
|       1|   Greensboro|
|       1|   Louisville|
|       1|     Mellvile|
|       1|   Morristown|
|       1|     New York|
|       1|     New York|
|       1|       Neward|
|       1|   Providence|
|       1|    Rockville|
|       1|     Westboro|
|       1|       Wilton|
|       2|     Bellevue|
+--------+-------------+
only showing top 20 rows



### Or you could select specific elements from a collection.

In [47]:
tr2.createOrReplaceTempView('RegionTerritories')
spark.sql("""
select RegionId, RegionName, TerritoryList[0] as First, 
TerritoryList[size(TerritoryList) - 1] as Last, 
size(TerritoryList) as TerritoryCount 
from RegionTerritories
""").show()

+--------+----------+--------------------+-------------------+--------------+
|RegionId|RegionName|               First|               Last|TerritoryCount|
+--------+----------+--------------------+-------------------+--------------+
|       1|   Eastern|       [27511, Cary]| [02139, Cambridge]|            19|
|       2|   Western|[60179, Hoffman E...|[85251, Scottsdale]|            15|
|       3|  Northern|    [45839, Findlay]| [44122, Beachwood]|            11|
|       4|  Southern|     [75234, Dallas]|     [33607, Tampa]|             8|
+--------+----------+--------------------+-------------------+--------------+



### If the array is of structs note the syntax of fetching the elements from the struct uses the . like an object property.

In [50]:
sql = """
SELECT RegionID, RegionName, Territory.TerritoryID AS TerritoryID, 
Territory.TerritoryName AS TerritoryName
FROM RegionTerritories
LATERAL VIEW EXPLODE(TerritoryList) EXPLODED_TABLE AS Territory
"""
spark.sql(sql).show()

+--------+----------+-----------+---------------+
|RegionID|RegionName|TerritoryID|  TerritoryName|
+--------+----------+-----------+---------------+
|       1|   Eastern|      27511|           Cary|
|       1|   Eastern|      07960|     Morristown|
|       1|   Eastern|      02184|      Braintree|
|       1|   Eastern|      11747|       Mellvile|
|       1|   Eastern|      01581|       Westboro|
|       1|   Eastern|      14450|       Fairport|
|       1|   Eastern|      20852|      Rockville|
|       1|   Eastern|      10038|       New York|
|       1|   Eastern|      02903|     Providence|
|       1|   Eastern|      02116|         Boston|
|       1|   Eastern|      06897|         Wilton|
|       1|   Eastern|      19713|         Neward|
|       1|   Eastern|      40222|     Louisville|
|       1|   Eastern|      27403|     Greensboro|
|       1|   Eastern|      08837|         Edison|
|       1|   Eastern|      01833|      Georgetow|
|       1|   Eastern|      01730|        Bedford|


In [3]:
people = spark.read.format("org.apache.spark.sql.cassandra").options(table="student", keyspace="classroom").load()
display(people)   

Unnamed: 0,id,emails,firstname,lastname
0,3,"[Mary1@gmail.com, Mary2@yahoo.com]",Mary,Johnson
1,1,"[joe.smith@abc.net, joes@xyz.com]",Joseph,Smith
2,2,"[mike.jones@def.net, mike1234@gmail.com, mikej...",Mike,Jones
3,4,[],XXXX,YYYY


### Homework ##

**First Challenge**

Create a Python function to determine if a number is odd or even and use that to select only the even numbered shippers from the TSV folder of northwind. Note the TSV file does not have headers so you will need to do something to make the DataFrame have a meaningful structure. I would suggest using SparkSql as much as possible to rename and cast the columns which are ShipperID, CompanyName and Phone.

**Second Challenge**

Take the Order_LineItems.json folder, read it into a DataFrame, and flatten it and then calculate the average price paid for a product.



In [3]:
# Read the following code and see how it will shape order line items into the order header record
# You will use the result of this saved file for the second challenge
o = spark.read.csv('/home/student/ROI/Spark/datasets/northwind/CSVHeaders/orders', header = True, inferSchema = True)
od = spark.read.csv('/home/student/ROI/Spark/datasets/northwind/CSVHeaders/orderdetails', header = True, inferSchema = True)

o.createOrReplaceTempView('Orders')
od.createOrReplaceTempView('OrderDetails')
sql = """
select o.OrderID, o.CustomerID, o.OrderDate
           , COUNT(*) as cnt
           , SUM(od.unitprice * od.quantity) as OrderTotal
           , COLLECT_SET(NAMED_STRUCT("ProductID", od.ProductID, 
                                      "UnitPrice", od.UnitPrice,
                                      "Quantity", od.Quantity,
                                      "Discount", od.discount)) as LineItems
from Orders as o join OrderDetails as od on o.OrderID = od.OrderID
GROUP BY o.OrderID, o.CustomerID, o.OrderDate
ORDER BY o.OrderID"""
od2 = spark.sql(sql)
od2.write.mode('overwrite').json('Orders_LineItems.json')

print('Done')

Done


In [4]:
display(od2)

Unnamed: 0,OrderID,CustomerID,OrderDate,cnt,OrderTotal,LineItems
0,10248,VINET,1996-07-04,3,440.0,"[(72, 34.8, 5, 0.0), (42, 9.8, 10, 0.0), (11, ..."
1,10249,TOMSP,1996-07-05,2,1863.4,"[(51, 42.4, 40, 0.0), (14, 18.6, 9, 0.0)]"
2,10250,HANAR,1996-07-08,3,1813.0,"[(65, 16.8, 15, 0.15), (51, 42.4, 35, 0.15), (..."
3,10251,VICTE,1996-07-08,3,670.8,"[(22, 16.8, 6, 0.05), (57, 15.6, 15, 0.05), (6..."
4,10252,SUPRD,1996-07-09,3,3730.0,"[(33, 2.0, 25, 0.05), (20, 64.8, 40, 0.05), (6..."
5,10253,HANAR,1996-07-10,3,1444.8,"[(31, 10.0, 20, 0.0), (49, 16.0, 40, 0.0), (39..."
6,10254,CHOPS,1996-07-11,3,625.2,"[(55, 19.2, 21, 0.15), (24, 3.6, 15, 0.15), (7..."
7,10255,RICSU,1996-07-12,4,2490.5,"[(16, 13.9, 35, 0.0), (59, 44.0, 30, 0.0), (2,..."
8,10256,WELLI,1996-07-15,2,517.8,"[(77, 10.4, 12, 0.0), (53, 26.2, 15, 0.0)]"
9,10257,HILAA,1996-07-16,3,1119.9,"[(27, 35.1, 25, 0.0), (77, 10.4, 15, 0.0), (39..."
