#  Big Data and AI for Business
## Spark and Spark SQL
### Sreerag Mahadevan Cheeroth

In [None]:
# install Java Virtual Machine (JVM) from OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# download and decompress Apache Spark with Hadoop from https://spark.apache.org/downloads.html
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [None]:
# set environment path
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.3.2-bin-hadoop3'

In [None]:
# install and import findspark to locate Spark on the system
#!pip install -q findspark
import findspark
findspark.init()
findspark.find()

'/content/spark-3.3.2-bin-hadoop3'

In [None]:
# if you get error comment the install statements on the top
import pyspark
sc = pyspark.SparkContext(appName='Act7')
rdd = sc.parallelize(range(10), 4)
rdd.collect(), rdd.getNumPartitions()

([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 4)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('Act7').config('spark.ui.port', '4050').getOrCreate()
spark

## Python Pandas vd SQL vs Spark SQL

In [None]:
import pandas as pd
# read Act7_product.json into pandas data frame
pd_df = pd.read_json('act7_product.json')
pd_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19 entries, 0 to 18
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   productID             19 non-null     int64 
 1   productLineID         19 non-null     int64 
 2   productDescription    19 non-null     object
 3   productFinish         19 non-null     object
 4   productStandardPrice  19 non-null     int64 
dtypes: int64(3), object(2)
memory usage: 888.0+ bytes


In [None]:
spark.read.json('act7_product.json').registerTempTable('Product')



In [None]:
spark.sql('SELECT * FROM Product p').show(truncate = False)

+-----------------------+-------------+---------+-------------+--------------------+
|productDescription     |productFinish|productID|productLineID|productStandardPrice|
+-----------------------+-------------+---------+-------------+--------------------+
|Cherry End Table       |Cherry       |1        |1            |175                 |
|Birch Coffee Tables    |Birch        |2        |1            |200                 |
|Oak Computer Desk      |Oak          |3        |1            |750                 |
|Entertainment Center   |Cherry       |4        |1            |1650                |
|Writer's Desk          |Oak          |5        |2            |325                 |
|8-Drawer Dresser       |Birch        |6        |1            |750                 |
|48 Bookcase            |Walnut       |7        |3            |150                 |
|48 Bookcase            |Oak          |8        |3            |175                 |
|96 Bookcase            |Walnut       |9        |3            |22

In [None]:
sp_df = spark.read.json('act7_product.json')

In [None]:
sp_df.printSchema()

root
 |-- productDescription: string (nullable = true)
 |-- productFinish: string (nullable = true)
 |-- productID: long (nullable = true)
 |-- productLineID: long (nullable = true)
 |-- productStandardPrice: long (nullable = true)



In [None]:
sp_df.count()

19

In [None]:
sp_df.show()

+--------------------+-------------+---------+-------------+--------------------+
|  productDescription|productFinish|productID|productLineID|productStandardPrice|
+--------------------+-------------+---------+-------------+--------------------+
|    Cherry End Table|       Cherry|        1|            1|                 175|
| Birch Coffee Tables|        Birch|        2|            1|                 200|
|   Oak Computer Desk|          Oak|        3|            1|                 750|
|Entertainment Center|       Cherry|        4|            1|                1650|
|       Writer's Desk|          Oak|        5|            2|                 325|
|    8-Drawer Dresser|        Birch|        6|            1|                 750|
|         48 Bookcase|       Walnut|        7|            3|                 150|
|         48 Bookcase|          Oak|        8|            3|                 175|
|         96 Bookcase|       Walnut|        9|            3|                 225|
|         96 Boo

**What are the description, finish and standard price of the first three products?**

In [None]:
# BUDT 704
pd_df[['productDescription', 'productFinish', 'productStandardPrice']].head(3)

Unnamed: 0,productDescription,productFinish,productStandardPrice
0,Cherry End Table,Cherry,175
1,Birch Coffee Tables,Birch,200
2,Oak Computer Desk,Oak,750


In [None]:
# BUDT 703
spark.sql('''SELECT productDescription, productFinish, productStandardPrice
            FROM Product
            LIMIT 3''').show(truncate=False) # triple quotation for multiple line

+-------------------+-------------+--------------------+
|productDescription |productFinish|productStandardPrice|
+-------------------+-------------+--------------------+
|Cherry End Table   |Cherry       |175                 |
|Birch Coffee Tables|Birch        |200                 |
|Oak Computer Desk  |Oak          |750                 |
+-------------------+-------------+--------------------+



In [None]:
spark.sql('SELECT productDescription, productFinish, productStandardPrice ' +
          'FROM Product ' +
          'LIMIT 3').show(truncate=False) # or concatenation

+-------------------+-------------+--------------------+
|productDescription |productFinish|productStandardPrice|
+-------------------+-------------+--------------------+
|Cherry End Table   |Cherry       |175                 |
|Birch Coffee Tables|Birch        |200                 |
|Oak Computer Desk  |Oak          |750                 |
+-------------------+-------------+--------------------+



In [None]:
sp_df[['productDescription', 'productFinish', 'productStandardPrice']].show(3, truncate=False)

+-------------------+-------------+--------------------+
|productDescription |productFinish|productStandardPrice|
+-------------------+-------------+--------------------+
|Cherry End Table   |Cherry       |175                 |
|Birch Coffee Tables|Birch        |200                 |
|Oak Computer Desk  |Oak          |750                 |
+-------------------+-------------+--------------------+
only showing top 3 rows



In [None]:
# sp_df[['productDescription', 'productFinish', 'productStandardPrice']].show()
sp_df.select('productDescription', 'productFinish', 'productStandardPrice').show(3, truncate=False)

+-------------------+-------------+--------------------+
|productDescription |productFinish|productStandardPrice|
+-------------------+-------------+--------------------+
|Cherry End Table   |Cherry       |175                 |
|Birch Coffee Tables|Birch        |200                 |
|Oak Computer Desk  |Oak          |750                 |
+-------------------+-------------+--------------------+
only showing top 3 rows



**What are the EDA stats?**

In [None]:
# Pandas
pd_df.describe()

Unnamed: 0,productID,productLineID,productStandardPrice
count,19.0,19.0,19.0
mean,10.526316,2.0,534.631579
std,6.345326,1.105542,433.198211
min,1.0,1.0,150.0
25%,5.5,1.0,200.0
50%,10.0,2.0,325.0
75%,15.5,3.0,775.0
max,21.0,4.0,1650.0


In [None]:
# Spark
sp_df.describe().show(truncate=False)

+-------+------------------+-------------+------------------+------------------+--------------------+
|summary|productDescription|productFinish|productID         |productLineID     |productStandardPrice|
+-------+------------------+-------------+------------------+------------------+--------------------+
|count  |19                |19           |19                |19                |19                  |
|mean   |null              |null         |10.526315789473685|2.0               |534.6315789473684   |
|stddev |null              |null         |6.345325672866354 |1.1055415967851332|433.1982110517996   |
|min    |4-Drawer Dresser  |Birch        |1                 |1                 |150                 |
|max    |Writer's Desk     |Walnut       |21                |4                 |1650                |
+-------+------------------+-------------+------------------+------------------+--------------------+



**What are the products in Cherry finish with price higher than $300?**

In [None]:
# 704
pd_df[['productDescription', 'productFinish', 'productStandardPrice']]\
  [(pd_df.productFinish == "Cherry") & (pd_df.productStandardPrice > 300)]



Unnamed: 0,productDescription,productFinish,productStandardPrice
3,Entertainment Center,Cherry,1650


In [None]:
# 704
pd_df[(pd_df.productFinish ==  'Cherry') & (pd_df.productStandardPrice > 300)]\
  [['productDescription', 'productFinish', 'productStandardPrice']]


Unnamed: 0,productDescription,productFinish,productStandardPrice
3,Entertainment Center,Cherry,1650


In [None]:
pd_df.loc[(pd_df.productFinish == 'Cherry') & (pd_df.productStandardPrice > 300)]

Unnamed: 0,productID,productLineID,productDescription,productFinish,productStandardPrice
3,4,1,Entertainment Center,Cherry,1650


In [None]:
pd_df.query('productFinish ==  \'Cherry\' & productStandardPrice > 300')\
  [['productDescription', 'productFinish', 'productStandardPrice']]

Unnamed: 0,productDescription,productFinish,productStandardPrice
3,Entertainment Center,Cherry,1650


In [None]:
# 703
spark.sql('SELECT p.productDescription, p.productFinish, p.productStandardPrice ' +
          'FROM Product p ' +
          'WHERE p.productFinish = \'Cherry\' AND p.productStandardPrice > 300')\
          .show(truncate=False)

+--------------------+-------------+--------------------+
|productDescription  |productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|Cherry       |1650                |
+--------------------+-------------+--------------------+



In [None]:
# 737
sp_df[(sp_df.productFinish =="Cherry") & (sp_df.productStandardPrice > 300)]\
  [['productDescription', 'productFinish', 'productStandardPrice']]\
  .show(truncate = False)

+--------------------+-------------+--------------------+
|productDescription  |productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|Cherry       |1650                |
+--------------------+-------------+--------------------+



In [None]:
sp_df.select('productDescription', 'productFinish', 'productStandardPrice')\
  .filter('productFinish = \'Cherry\' AND productStandardPrice > 300')\
  .show(truncate = False)

+--------------------+-------------+--------------------+
|productDescription  |productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|Cherry       |1650                |
+--------------------+-------------+--------------------+



In [None]:
sp_df.filter('productFinish = \'Cherry\' AND productStandardPrice > 300')\
  .select('productDescription', 'productFinish', 'productStandardPrice')\
  .show(truncate = False)

+--------------------+-------------+--------------------+
|productDescription  |productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|Cherry       |1650                |
+--------------------+-------------+--------------------+



**What are the product finishes?**

In [None]:
#704
sorted(pd_df.productFinish.unique())

['Birch', 'Cherry', 'Leather', 'Oak', 'Pine', 'Walnut']

In [None]:
pd_df.productFinish.drop_duplicates().sort_values()

1       Birch
0      Cherry
14    Leather
2         Oak
18       Pine
6      Walnut
Name: productFinish, dtype: object

In [None]:
#703
spark.sql('SELECT DISTINCT p.productFinish FROM Product p ORDER BY p.productFinish').show(truncate=False)

+-------------+
|productFinish|
+-------------+
|Birch        |
|Cherry       |
|Leather      |
|Oak          |
|Pine         |
|Walnut       |
+-------------+



In [None]:
# 737
sp_df.select('productFinish').drop_duplicates().sort('productFinish').show(truncate = False)

+-------------+
|productFinish|
+-------------+
|Birch        |
|Cherry       |
|Leather      |
|Oak          |
|Pine         |
|Walnut       |
+-------------+



In [None]:
sp_df.select('productFinish').distinct().sort(sp_df.productFinish).show(truncate = False)

+-------------+
|productFinish|
+-------------+
|Birch        |
|Cherry       |
|Leather      |
|Oak          |
|Pine         |
|Walnut       |
+-------------+



In [None]:
sp_df.select('productFinish').distinct().orderBy(sp_df.productFinish).show(truncate = False)

+-------------+
|productFinish|
+-------------+
|Birch        |
|Cherry       |
|Leather      |
|Oak          |
|Pine         |
|Walnut       |
+-------------+



**what are the prices among all products in descending order?**

In [None]:
# 704
pd_df.productStandardPrice.drop_duplicates().sort_values(ascending = False)

3     1650
17    1200
16    1100
15     890
11     800
2      750
10     500
14     362
4      325
13     300
18     256
8      225
1      200
0      175
6      150
Name: productStandardPrice, dtype: int64

In [None]:
# 703
spark.sql('SELECT DISTINCT p.productStandardPrice FROM Product p ORDER BY p.productStandardPrice DESC').show(truncate = False)

+--------------------+
|productStandardPrice|
+--------------------+
|1650                |
|1200                |
|1100                |
|890                 |
|800                 |
|750                 |
|500                 |
|362                 |
|325                 |
|300                 |
|256                 |
|225                 |
|200                 |
|175                 |
|150                 |
+--------------------+



In [None]:
# 737
sp_df.select('productStandardPrice').distinct().orderBy(sp_df.productStandardPrice.desc()).show(truncate = False)

+--------------------+
|productStandardPrice|
+--------------------+
|1650                |
|1200                |
|1100                |
|890                 |
|800                 |
|750                 |
|500                 |
|362                 |
|325                 |
|300                 |
|256                 |
|225                 |
|200                 |
|175                 |
|150                 |
+--------------------+



**what are the description , finish and standardprice of the most expensive product**

In [None]:
# 704
# formal way easy to debug and use for HW 4
# maxPrice = pd_df.productStandardPrice.max()
# pd_df[pd_df.productStandardPrice == maxPrice]
pd_df[pd_df.productStandardPrice == pd_df.productStandardPrice.max()]\
  [['productDescription', 'productFinish', 'productStandardPrice']]

Unnamed: 0,productDescription,productFinish,productStandardPrice
3,Entertainment Center,Cherry,1650


In [None]:
# 703
spark.sql('''
SELECT p.productDescription, p.productFinish, p.productStandardPrice
FROM Product p,(
  SELECT MAX(q.productStandardPrice) AS MaxPrice
  FROM Product q) m
WHERE p.productStandardPrice = m.MaxPrice
''').show(truncate = False)

+--------------------+-------------+--------------------+
|productDescription  |productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|Cherry       |1650                |
+--------------------+-------------+--------------------+



In [None]:
# 737
from pyspark.sql.functions import max
# SELECT MAX(q.productStandardPrice) AS MaxPrice
# FROM Product q )
# sp_df.agg(max('productStandardPrice')).show(truncate = False)
MaxPrice = sp_df.agg(max('productStandardPrice')).collect()[0][0]
# FROM Product p
# WHERE p.productStandardPrice = MaxPrice
# SELECT p.productDescription, p.productFinish, p.productStandardPrice
sp_df\
  .filter(sp_df.productStandardPrice == MaxPrice)\
  .select('productDescription', 'productFinish', 'productStandardPrice')\
  .show(truncate = False)

+--------------------+-------------+--------------------+
|productDescription  |productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|Cherry       |1650                |
+--------------------+-------------+--------------------+



**What are ther average prices for each product finish?**

In [None]:
pd.DataFrame(pd_df.groupby('productFinish').mean().productStandardPrice\
             .reset_index())
# use this for HW 4 use index reset

  pd.DataFrame(pd_df.groupby('productFinish').mean().productStandardPrice\


Unnamed: 0,productFinish,productStandardPrice
0,Birch,416.666667
1,Cherry,658.333333
2,Leather,362.0
3,Oak,592.5
4,Pine,256.0
5,Walnut,525.0


In [None]:
pd.DataFrame(pd_df.groupby('productFinish').mean().productStandardPrice)
# groupby makes index as the group i.e. productFinish

  pd.DataFrame(pd_df.groupby('productFinish').mean().productStandardPrice)


Unnamed: 0_level_0,productStandardPrice
productFinish,Unnamed: 1_level_1
Birch,416.666667
Cherry,658.333333
Leather,362.0
Oak,592.5
Pine,256.0
Walnut,525.0


In [None]:
pd_df.groupby('productFinish').mean()
# groups and gives averages for all numerical columns

  pd_df.groupby('productFinish').mean()


Unnamed: 0_level_0,productID,productLineID,productStandardPrice
productFinish,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Birch,7.333333,1.333333,416.666667
Cherry,6.0,1.0,658.333333
Leather,17.0,3.0,362.0
Oak,10.75,2.375,592.5
Pine,21.0,1.0,256.0
Walnut,12.0,2.666667,525.0


In [None]:
#pivot table
pd_df.pivot_table(index = 'productFinish').productStandardPrice.reset_index()
# do this for HW 4 ***

  pd_df.pivot_table(index = 'productFinish').productStandardPrice.reset_index()


Unnamed: 0,productFinish,productStandardPrice
0,Birch,416.666667
1,Cherry,658.333333
2,Leather,362.0
3,Oak,592.5
4,Pine,256.0
5,Walnut,525.0


In [None]:
pd_df.groupby('productFinish').productStandardPrice.mean().round(2)

productFinish
Birch      416.67
Cherry     658.33
Leather    362.00
Oak        592.50
Pine       256.00
Walnut     525.00
Name: productStandardPrice, dtype: float64

In [None]:
pd_df.productStandardPrice.mean().round(2)

534.63

In [None]:
pd_df.productStandardPrice.agg('mean').round(2)

534.63

In [None]:
pd_df.agg({'productStandardPrice':'mean'}).round(2)

productStandardPrice    534.63
dtype: float64

In [None]:
spark.sql('''
SELECT p.productFinish, AVG(p.productStandardPrice) AS AveragePrice
FROM Product p
GROUP BY p.productFinish
ORDER BY p.productFinish
''').show(truncate = False)

+-------------+-----------------+
|productFinish|AveragePrice     |
+-------------+-----------------+
|Birch        |416.6666666666667|
|Cherry       |658.3333333333334|
|Leather      |362.0            |
|Oak          |592.5            |
|Pine         |256.0            |
|Walnut       |525.0            |
+-------------+-----------------+



In [None]:
from pyspark.sql.functions import avg
# FROM Product p
# GROUP BY p.productFinish
# SELECT p.productFinish, AVG(p.productStandardPrice) AS AveragePrice
# ORDER BY p.productFinish
sp_df.groupBy('productFinish').agg(avg('productStandardPrice'))\
  .orderBy('productFinish')\
  .show(truncate = False)

+-------------+-------------------------+
|productFinish|avg(productStandardPrice)|
+-------------+-------------------------+
|Birch        |416.6666666666667        |
|Cherry       |658.3333333333334        |
|Leather      |362.0                    |
|Oak          |592.5                    |
|Pine         |256.0                    |
|Walnut       |525.0                    |
+-------------+-------------------------+



In [None]:
sp_df.groupBy('productFinish').agg({'productStandardPrice': 'avg'})\
  .sort('productFinish')\
  .show(truncate = False)

+-------------+-------------------------+
|productFinish|avg(productStandardPrice)|
+-------------+-------------------------+
|Birch        |416.6666666666667        |
|Cherry       |658.3333333333334        |
|Leather      |362.0                    |
|Oak          |592.5                    |
|Pine         |256.0                    |
|Walnut       |525.0                    |
+-------------+-------------------------+



In [None]:
from pyspark.sql.functions import count, sum, avg, min, max
sp_df.groupBy('productFinish').agg(
  count('productStandardPrice'),
  sum('productStandardPrice'),
  avg('productStandardPrice'),
  min('productStandardPrice'),
  max('productStandardPrice'))\
  .sort('productFinish')\
  .show(truncate = False)

+-------------+---------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|productFinish|count(productStandardPrice)|sum(productStandardPrice)|avg(productStandardPrice)|min(productStandardPrice)|max(productStandardPrice)|
+-------------+---------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|Birch        |3                          |1250                     |416.6666666666667        |200                      |750                      |
|Cherry       |3                          |1975                     |658.3333333333334        |150                      |1650                     |
|Leather      |1                          |362                      |362.0                    |362                      |362                      |
|Oak          |8                          |4740                     |592.5                    |175              