# **Introduction to SparkSQL**


## Objectives


Spark SQL is a Spark module for structured data processing. It is sed to query structured data inside Spark programs, using either SQL or a familiar DataFrame API.

After completing this lab you will be able to:


* Load a data file into a dataframe
* Create a Table View for the dataframe
* Run basic SQL queries and aggregate data on the table view
* Create a Pandas UDF to perform columnar operations


In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pyarrow==0.14.1
!pip install pandas
!pip install numpy==1.19.5

In [None]:
import findspark
findspark.init()

In [None]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Exercise 1 -  Spark session


Create and initialize the Spark session needed to load the data frames and operate on it


#### Task 1: Creating the spark session and context


In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Task 2: Initialize Spark session
To work with dataframes we just need to verify that the spark session instance has been created.


In [None]:
spark

## Exercise 2 - Loading the Data and creating a table view


In this section, you will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe
Pandas is a library used for data manipulation and analysis. The Pandas library offers data structures and operations for creating and manipulating Data Series and DataFrame objects. Data can be imported from various data sources, e.g., Numpy arrays, Python dictionaries, and CSV files. Pandas allows you to manipulate, organize and display the data.

To create a Spark DataFrame we load an external DataFrame, called `mtcars`. This DataFrame includes 32 observations on 11 variables:

| colIndex | colName | units/description |
| :---: | :--- | :--- |
|[, 1] | mpg |Miles per gallon  |
|[, 2] | cyl | Number of cylinders  |
|[, 3] | disp | Displacement (cu.in.) |  
|[, 4] | hp  | Gross horsepower  |
|[, 5] | drat | Rear axle ratio  |
|[, 6] | wt | Weight (lb/1000)  |
|[, 7] | qsec | 1/4 mile time  |
|[, 8] | vs  | V/S  |
|[, 9] | am | Transmission (0 = automatic, 1 = manual)  |
|[,10] | gear | Number of forward gears  |
|[,11] | carb | Number of carburetors |


#### Task 1: Load data into a Pandas DataFrame.

Pandas has a convenient function to load CSV data from a URL directly into a pandas dataframe.


In [None]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [None]:
# Preview a few records
mtcars.head()

In [None]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

#### Task 2: Loading data into a Spark DataFrame


We use the `createDataFrame` function to load the data into a spark dataframe


In [None]:
sdf = spark.createDataFrame(mtcars) 

Let us look at the schema of the loaded spark dataframe


In [None]:
sdf.printSchema()

#### Task 3: Create a Table View
Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the `createTempView()` function


In [None]:
sdf.createTempView("cars")

## Exercise 3 - Running SQL queries and aggregating data


Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly. 


In [None]:
# Showing the whole table
spark.sql("SELECT * FROM cars").show()

In [None]:
# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(5)

In [None]:
# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)

In [None]:
# Aggregating data and grouping by cylinders
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()

## Exercise 4 - Create a Pandas UDF to apply a columnar operation
Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the _best of both worlds_—the ability to define low-overhead, high-performance UDFs entirely in Python. In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).

In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the `@pandas_udf()` decorator. We can then apply this UDF to our `wt` column. 


#### Task 1: Importing libraries and registering a UDF


In [None]:
# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

#### Task 2: Applying the UDF to the tableview

We can now apply the `convert_weight` user-defined-function to our `wt` column from the `cars` table view. This is done very simply using the SQL query shown below. In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons). 


In [None]:
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()