<a href="https://colab.research.google.com/github/sherrillz/Bigdata_AI_Notes/blob/main/Spark%20and%20Spark%20SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark and Spark SQL
##SherrillZ

In [None]:
# install Java Virtual Machine (JVM) from OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# download and decompress Apache Spark with Hadoop from https://spark.apache.org/downloads.html
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
# set environment path
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.2.1-bin-hadoop3.2'

In [None]:
# install and import findspark to locate Spark on the system
!pip install -q findspark
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

In [None]:
# Spark resilient distributed dataset (RDD)
import pyspark
sc = pyspark.SparkContext(appName='Act7')
data = list(range(7))
rdd = sc.parallelize(data)
rdd.getNumPartitions(), rdd.collect()

(2, [0, 1, 2, 3, 4, 5, 6])

In [None]:
# import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('Colab').config('spark.ui.port', '4050').getOrCreate()
spark

In [None]:
# authorize Colab to access Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# load data into PySpark
df = spark.read.json('/content/drive/My Drive/Terp Drive/BUDT737.202201/act7_product.json')

In [None]:
# show column details
df.printSchema()

root
 |-- productDescription: string (nullable = true)
 |-- productFinish: string (nullable = true)
 |-- productID: long (nullable = true)
 |-- productLineID: long (nullable = true)
 |-- productStandardPrice: double (nullable = true)



In [None]:
# display rows from top
df.show(7)

+--------------------+-------------+---------+-------------+--------------------+
|  productDescription|productFinish|productID|productLineID|productStandardPrice|
+--------------------+-------------+---------+-------------+--------------------+
|    Cherry End Table|       Cherry|        1|            1|               175.0|
| Birch Coffee Tables|        Birch|        2|            1|               200.0|
|   Oak Computer Desk|          Oak|        3|            1|               750.0|
|Entertainment Center|       Cherry|        4|            1|              1650.0|
|       Writer's Desk|          Oak|        5|            2|               325.0|
|    8-Drawer Dresser|        Birch|        6|            1|               750.0|
|         48 Bookcase|       Walnut|        7|            3|               150.0|
+--------------------+-------------+---------+-------------+--------------------+
only showing top 7 rows



In [None]:
# total number of rows
df.count()

19

In [None]:
# select columns
df.select('productDescription', 'productFinish', 'productStandardPrice').show(7)

+--------------------+-------------+--------------------+
|  productDescription|productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|    Cherry End Table|       Cherry|               175.0|
| Birch Coffee Tables|        Birch|               200.0|
|   Oak Computer Desk|          Oak|               750.0|
|Entertainment Center|       Cherry|              1650.0|
|       Writer's Desk|          Oak|               325.0|
|    8-Drawer Dresser|        Birch|               750.0|
|         48 Bookcase|       Walnut|               150.0|
+--------------------+-------------+--------------------+
only showing top 7 rows



In [None]:
# filter rows then select columns
df.filter('productFinish = \'Cherry\' AND productStandardPrice > 300').select('productDescription', 'productFinish', 'productStandardPrice').show()

+--------------------+-------------+--------------------+
|  productDescription|productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|       Cherry|              1650.0|
+--------------------+-------------+--------------------+



In [None]:
# select columns then filter rows
df.select('productDescription', 'productFinish', 'productStandardPrice').filter('productFinish = \'Cherry\' AND productStandardPrice > 300').show()

+--------------------+-------------+--------------------+
|  productDescription|productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|       Cherry|              1650.0|
+--------------------+-------------+--------------------+



In [None]:
# distinct values
df.select('productFinish').distinct().show()

+-------------+
|productFinish|
+-------------+
|          Oak|
|         Pine|
|       Walnut|
|        Birch|
|      Leather|
|       Cherry|
+-------------+



In [None]:
# sort values in ascending order
df.select('productFinish').distinct().sort(df.productFinish).show()

+-------------+
|productFinish|
+-------------+
|        Birch|
|       Cherry|
|      Leather|
|          Oak|
|         Pine|
|       Walnut|
+-------------+



In [None]:
# order by values in descending order
df.select('productStandardPrice').distinct().orderBy(df.productStandardPrice.desc()).show()

+--------------------+
|productStandardPrice|
+--------------------+
|              1650.0|
|              1200.0|
|              1100.0|
|               890.0|
|               800.0|
|               750.0|
|               500.0|
|               362.0|
|               325.0|
|               300.0|
|               256.0|
|               225.0|
|               200.0|
|               175.0|
|               150.0|
+--------------------+



In [None]:
# describe columns
df.describe().show()

+-------+------------------+-------------+------------------+------------------+--------------------+
|summary|productDescription|productFinish|         productID|     productLineID|productStandardPrice|
+-------+------------------+-------------+------------------+------------------+--------------------+
|  count|                19|           19|                19|                19|                  19|
|   mean|              null|         null|10.526315789473685|               2.0|   534.6315789473684|
| stddev|              null|         null| 6.345325672866354|1.1055415967851332|   433.1982110517996|
|    min|  4-Drawer Dresser|        Birch|                 1|                 1|               150.0|
|    max|     Writer's Desk|       Walnut|                21|                 4|              1650.0|
+-------+------------------+-------------+------------------+------------------+--------------------+



In [None]:
# find the most expensive product
from pyspark.sql.functions import max
df_slt = df.select('productDescription', 'productFinish', 'productStandardPrice')
maxValue = df_slt.agg(max('productStandardPrice')).collect()[0][0]
print(f'max price = {maxValue}')
df_slt.filter(df.productStandardPrice == maxValue).show()

max price = 1650.0
+--------------------+-------------+--------------------+
|  productDescription|productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|Entertainment Center|       Cherry|              1650.0|
+--------------------+-------------+--------------------+



In [None]:
# find the average price per finish
from pyspark.sql.functions import avg
df.groupBy('productFinish').agg(avg('productStandardPrice')).sort(df.productFinish).show()

+-------------+-------------------------+
|productFinish|avg(productStandardPrice)|
+-------------+-------------------------+
|        Birch|        416.6666666666667|
|       Cherry|        658.3333333333334|
|      Leather|                    362.0|
|          Oak|                    592.5|
|         Pine|                    256.0|
|       Walnut|                    525.0|
+-------------+-------------------------+

