# SPARK SQL
If you wish to run HIVE natively under Hadoop please see this notebook [Hadoop and Hive](https://github.com/prithwis/KKolab/blob/main/KK_B2_Hadoop_and_Hive.ipynb)

# Install Spark

## Manual Install is required because we need to use an Spark 3.0.3 to avoid errors

In [1]:
!apt-get update > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#!wget -q http://apache.osuosl.org/spark/spark-2.2.2/spark-2.2.2-bin-hadoop2.7.tgz
#!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
#!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
#!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
#!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
#!wget -q http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz     --- this gives errors
!wget -q https://apache.osuosl.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz     # Using Older Version 

#
# if the current version of Spark is not used, there may be errors
# check here for current versions http://apache.osuosl.org/spark
#
#!tar xf spark-2.4.0-bin-hadoop2.7.tgz
#!tar xf spark-2.4.4-bin-hadoop2.7.tgz
#!tar xf spark-2.4.5-bin-hadoop2.7.tgz
#!tar xf spark-3.0.1-bin-hadoop3.2.tgz
#!tar xf spark-3.1.2-bin-hadoop3.2.tgz -- this gives errors
!tar xf spark-3.0.3-bin-hadoop3.2.tgz



# -----------------------------------------------
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
#os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
#os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
#os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
# os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

!pip install -q findspark  # findspark is no more required
#!pip install -q pyspark
import findspark
findspark.init()
# -----------------------------------------------
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

## 2022 Approach Does not work since it installs the current version of Spark

In [None]:
#!pip3 -q install pyspark
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()
#sc = spark.sparkContext
#sc

# Load Data

In [26]:
#we remove the CRLF character from the end of the row if it exists
!sed 's/\r//' //content/Employee.csv > employee.csv
!sed 's/\r//' //content/Department.csv > department.csv
#!sed 's/\r//' /content/eCommerce_02PC_2021.csv > datafile.csv
#sed -i -e "1d" datafile.csv               # remove the first line containing headers from the file

In [4]:
data_file = 'employee.csv'
raw_data = sc.textFile(data_file)
print ("Data Size", raw_data.count())

Data Size 13


In [28]:
data_file1 = 'department.csv'
raw_data1 = sc.textFile(data_file1)
print ("Data Size", raw_data1.count())

Data Size 5


#Spark Data Frame

In [5]:
employee_df = spark.read.csv(data_file,inferSchema=True, header=True)

In [29]:
department_df = spark.read.csv(data_file1,inferSchema=True, header=True)

In [6]:
employee_df.printSchema()

root
 |-- EmpID: integer (nullable = true)
 |-- LastName: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- JobDesc: string (nullable = true)
 |-- JoinDate: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Comm: double (nullable = true)
 |-- DeptID: integer (nullable = true)



In [30]:
department_df.printSchema()

root
 |-- DeptID: integer (nullable = true)
 |-- DeptName: string (nullable = true)
 |-- ManagerID: integer (nullable = true)
 |-- Location: string (nullable = true)



In [7]:
employee_df.select('LastName','FirstName','EmpID','DeptID').groupby(employee_df.DeptID).count().sort('count', ascending = False).show()

+------+-----+
|DeptID|count|
+------+-----+
|    10|    3|
|    40|    3|
|    20|    3|
|    30|    3|
+------+-----+



In [32]:
department_df.select('DeptID','DeptName','Location').groupby(department_df.Location).count().sort('count', ascending = False).show()

+--------+-----+
|Location|count|
+--------+-----+
|Calcutta|    3|
|  Bombay|    1|
+--------+-----+



In [8]:
employee_df.select('LastName','FirstName','EmpID','DeptID').show()

+-----------+---------+------+------+
|   LastName|FirstName| EmpID|DeptID|
+-----------+---------+------+------+
|    Bacchan|  Amitabh|742866|    10|
|  Mukherjee|     Rani|349870|    40|
|    Dikshit|  Madhuri|865477|    20|
|       Khan| Shahrukh|239456|    20|
|     Sehwag| Virender|897889|    20|
|      Dhoni| Mahender|123980|    40|
|     Dravid|    Rahul|822134|    30|
|     Dalmia| Jagmohan|997445|    30|
|    Ganguly|   Sourav|989007|    40|
|    Ganesan|    Rekha|299034|    10|
|Karthikeyan|  Narayan|546223|    10|
|      Mirza|    Sania|223112|    30|
+-----------+---------+------+------+



In [34]:
department_df.select('DeptID','DeptName','Location').show()

+------+----------+--------+
|DeptID|  DeptName|Location|
+------+----------+--------+
|    10| Corporate|Calcutta|
|    20|     Sales|Calcutta|
|    30|  Accounts|Calcutta|
|    40|Production|  Bombay|
+------+----------+--------+



#Spark SQL
What is the difference between SQLContext and HiveContext? See [here](https://intellipaat.com/community/7599/what-is-the-difference-between-apache-spark-sqlcontext-vs-hivecontext#:~:text=HiveContext%20is%20a%20super%20set,read%20data%20from%20Hive%20tables.&text=The%20more%20basic%20SQLContext%20provides,does%20not%20depend%20on%20Hive.), or [here](https://stackoverflow.com/questions/33666545/what-is-the-difference-between-apache-spark-sqlcontext-vs-hivecontext)

commas getting removed here

In [35]:
#eCommerce2_df = sqlContext.createDataFrame(row_data)
#eCommerce2_df.registerTempTable("eCommerce")
# spark 2 onwards ...

employee_df.createOrReplaceTempView("employee")
department_df.createOrReplaceTempView("department")

In [19]:
#Countries = sqlContext.sql("SELECT * FROM eCommerce limit 20")
employees = spark.sql("SELECT * FROM employee limit 20")
employees.show()

+------+-----------+---------+----------+----------+------+----+------+
| EmpID|   LastName|FirstName|   JobDesc|  JoinDate|Salary|Comm|DeptID|
+------+-----------+---------+----------+----------+------+----+------+
|742866|    Bacchan|  Amitabh| Executive|2003-03-10| 50000| 0.1|    10|
|349870|  Mukherjee|     Rani|   Manager|2005-05-04| 25000|0.06|    40|
|865477|    Dikshit|  Madhuri|     Clerk|2002-04-04| 10000|0.02|    20|
|239456|       Khan| Shahrukh|   Manager|2004-01-03| 30000|0.07|    20|
|897889|     Sehwag| Virender|   Cus_Rep|2005-01-02| 15000|0.05|    20|
|123980|      Dhoni| Mahender|     Clerk|2004-10-09|  9000|0.02|    40|
|822134|     Dravid|    Rahul|Sr Manager|2000-06-04| 40000|0.08|    30|
|997445|     Dalmia| Jagmohan|     Clerk|2001-07-01| 12000|0.02|    30|
|989007|    Ganguly|   Sourav|   Cus_Rep|2002-01-01| 20000|0.03|    40|
|299034|    Ganesan|    Rekha|  Director|2002-10-10| 60000|0.11|    10|
|546223|Karthikeyan|  Narayan| Secretary|2005-12-04| 40000|0.09|

In [36]:
department = spark.sql("SELECT * FROM department limit 20")
department.show()

+------+----------+---------+--------+
|DeptID|  DeptName|ManagerID|Location|
+------+----------+---------+--------+
|    10| Corporate|   299034|Calcutta|
|    20|     Sales|   239456|Calcutta|
|    30|  Accounts|   822134|Calcutta|
|    40|Production|   349870|  Bombay|
+------+----------+---------+--------+



In [25]:
#Countries = sqlContext.sql("SELECT sum(Quantity), sum(UnitPrice), Country from eCommerce group by Country order by sum(Quantity) desc")
employees = spark.sql("SELECT DeptID,sum(Salary), avg(Comm)  from employee group by DeptID  order by sum(Salary) desc")
employees.show()

+------+-----------+-------------------+
|DeptID|sum(Salary)|          avg(Comm)|
+------+-----------+-------------------+
|    10|     150000|0.10000000000000002|
|    30|      77000|0.04666666666666667|
|    20|      55000|0.04666666666666667|
|    40|      54000|0.03666666666666667|
+------+-----------+-------------------+



In [38]:
employees = spark.sql("select * from employee join department where employee.DeptID = department.DeptID")
employees.show()

+------+-----------+---------+----------+----------+------+----+------+------+----------+---------+--------+
| EmpID|   LastName|FirstName|   JobDesc|  JoinDate|Salary|Comm|DeptID|DeptID|  DeptName|ManagerID|Location|
+------+-----------+---------+----------+----------+------+----+------+------+----------+---------+--------+
|742866|    Bacchan|  Amitabh| Executive|2003-03-10| 50000| 0.1|    10|    10| Corporate|   299034|Calcutta|
|349870|  Mukherjee|     Rani|   Manager|2005-05-04| 25000|0.06|    40|    40|Production|   349870|  Bombay|
|865477|    Dikshit|  Madhuri|     Clerk|2002-04-04| 10000|0.02|    20|    20|     Sales|   239456|Calcutta|
|239456|       Khan| Shahrukh|   Manager|2004-01-03| 30000|0.07|    20|    20|     Sales|   239456|Calcutta|
|897889|     Sehwag| Virender|   Cus_Rep|2005-01-02| 15000|0.05|    20|    20|     Sales|   239456|Calcutta|
|123980|      Dhoni| Mahender|     Clerk|2004-10-09|  9000|0.02|    40|    40|Production|   349870|  Bombay|
|822134|     Dravid