# Spark SQL 

- **Spark SQL** - Spark’s interface for working with structured and semistructured data. 

- **Structured data** is any data that has a schema—that is, a known set of fields for each record.

- Spark SQL lets you query structured data inside Spark programs, using either **SQL** or a familiar **DataFrame API**. Usable in Java, Scala, Python and R.

- Spark SQL is use to execute SQL queries. 

# Part 1: Dataframes

- In Apache Spark, **a DataFrame is a distributed collection of rows under named columns.**

- In simple terms, it is same as a table in relational database or an Excel sheet with Column headers. It also shares some common characteristics with RDD:

    - **Immutable in nature :** We can create DataFrame / RDD once but can’t change it. And we can transform a DataFrame / RDD  after applying transformations.
    - **Lazy Evaluations:** Which means that a task is not executed until an action is performed.

    - **Distributed:** RDD and DataFrame both are distributed in nature.

- When running SQL from within another programming language the results will be returned as a DataFrame. 

## Why DataFrames are Useful ?

After learning about pandas dataframes, you must be aware of many advantages that Dataframes provides us with. But the question is, what additional advantages Dataframes in spark provides us with?

- DataFrames are designed for processing large collection of structured or semi-structured data.

- Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.

- DataFrame in Apache Spark has the ability to handle petabytes of data.

- DataFrame has a support for wide range of data format and sources.

- It has API support for different languages like Python, R, Scala, Java.

- Like our RDDs are distibuted across machines in a cluster similarly dataframes provides us with distributed computation capability.

**As a general rule of thumb, one should consider an alternative to Pandas whenever the data set has more than 10,000,000 rows which, depending on the number of columns and data types, translates to about 5-10 GB of memory usage. At that point PySpark might be an option for you that does the job**

**Check if SparkContext is running!**

**Recall - Why do we need a SparkContext running?**

- First step, in any Apache programming is to create a SparkContext. SparkContext is required when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster.



In [1]:
sc

Next, start a **SQLContext**. Now, Why **SQLContext**?

- The entry point into all relational functionality in Spark is the SQLContext class.
- Basically it is must to have SQLContext in order to perform SQL related operations.

In [2]:
# Into spark versions<2.0
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [1]:
# Into spark version>2.0

from pyspark.sql import SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark dataframe basic example") \
    .getOrCreate()

**Note: `getOrCreate()`-** Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

## How to create a DataFrame ?
A DataFrame in Apache Spark can be created in multiple ways:

It can be created using different data formats. For example:
1. Loading data from Existing RDD.
2. Loading the data from JSON, CSV.

### 1.  Creating DataFrame from RDD

One can easily create a dataframe out of a List of tuples. Steps can be as follows:

1. Create a list of tuples. Each tuple contains name of a person with age.
2. Create a RDD from the list above.
3. Convert each tuple to a row.
4. Create a DataFrame by applying createDataFrame on RDD with the help of sqlContext.

In [2]:
from pyspark.sql import Row
l = [('Sam',25, 'M'),('Jalfaizy',22, 'F'),('Tom',20, 'M'),('Nicky',26, 'F'),('Wrick', 30, 'M')]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]), Gender=x[2]))
schemaPeople = sqlContext.createDataFrame(people)

Let's check the type!

In [3]:
type(schemaPeople)

pyspark.sql.dataframe.DataFrame

## 2. Creating the DataFrame from external file

## Introduction to Dataset

### Context

H-1B visas are a category of employment-based, non-immigrant visas for temporary foreign workers in the United States. For a foreign national to apply for H1-B visa, a US employer must offer them a job and submit a petition for a H-1B visa to the US immigration department. This is also the most common visa status applied for and held by international students once they complete college or higher education and begin working in a full-time position.

This dataset contains H-1B petition data. The columns in the dataset include case status, employer name, worksite coordinates, job title, prevailing wage, occupation code, and year filed.

For more information on individual columns, refer to the column metadata. A detailed description of the underlying raw dataset is available in an [official data dictionary](https://www.foreignlaborcert.doleta.gov/docs/Performance_Data/Disclosure/FY15-FY16/H-1B_FY16_Record_Layout.pdf).

For my tutorial session, I'll use the data file `h1b_sample.csv` but would strongly recommed that you use the data file `h1b_learners.csv` to do the hands-on practice.

In [2]:
df = spark.read.csv('../data/h1b_sample.csv', header=True)

When we read data into the SQLContext object, Spark:

- Instantiates a Spark DataFrame object
- Infers the schema from the data and associates it with the DataFrame
- Reads in the data and distributes it across clusters (if multiple clusters are available)
- Returns the DataFrame object

Let's have a look at the **schema for the DataFrame** we created out of our dataset. For this, we can call `printSchema()` method on our dataframe. This will provide us the datatype of columns.

In [3]:
df.printSchema()

root
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: string (nullable = true)
 |-- YEAR: string (nullable = true)



**`show()`** method on a DataFrame can give us a quick look on rows of the DataFame. Use `show()` to display 5 Rows of the dataframe.

In [7]:
df.show(50)

+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+
|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|
+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+
|CERTIFIED-WITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|        36067.0|2016|
|CERTIFIED-WITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|       242674.0|2016|
|CERTIFIED-WITHDRAWN|PORTS AMERICA GRO...|    CHIEF EXECUTIVES|CHIEF PROCESS OFF...|                 Y|       193066.0|2016|
|CERTIFIED-WITHDRAWN|GATES CORPORATION...|    CHIEF EXECUTIVES|REGIONAL PRESIDEN...|                 Y|       220314.0|2016|
|          WITHDRAWN|PEABODY INVESTMEN...|    CHIEF EXECUTIVES|PRESIDENT MONGOLI...|                 Y|       157518.4|2016|


### Statistics

Let's have statistical view of our dataframe.

We can use `describe(*cols)` method on a dataframe to compute statistics for numeric and string columns.

This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.

In [8]:
df.describe().show()

+-------+-----------+--------------------+--------------------+--------------------+------------------+------------------+------+
|summary|CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|   PREVAILING_WAGE|  YEAR|
+-------+-----------+--------------------+--------------------+--------------------+------------------+------------------+------+
|  count|       5000|                5000|                5000|                5000|              5000|              5000|  5000|
|   mean|       null|                null|                null|                null|              null|127988.90342799998|2016.0|
| stddev|       null|                null|                null|                null|              null|1029719.7954029571|   0.0|
|    min|  CERTIFIED|+421 FOUNDATION INC.|ADVERTISING AND P...|             11-1021|                 N|          100000.0|  2016|
|    max|  WITHDRAWN|ZYME SOLUTIONS, INC.|PUBLIC RELATIONS ...|WW MARKETING MANA...|      

In pandas, we used the `head()` method to return the first n rows. This is one of the differences between the DataFrame implementations. Instead of returning a nicely formatted table of values, the head() method in Spark returns a list of row objects. Spark needs to return row objects for certain methods, such as head(), collect() and take().
 

In [9]:
df.head(5)

[Row(CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'UNIVERSITY OF MICHIGAN', SOC_NAME=u'BIOCHEMISTS AND BIOPHYSICISTS', JOB_TITLE=u'POSTDOCTORAL RESEARCH FELLOW', FULL_TIME_POSITION=u'N', PREVAILING_WAGE=u'36067.0', YEAR=u'2016'),
 Row(CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'GOODMAN NETWORKS, INC.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'CHIEF OPERATING OFFICER', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'242674.0', YEAR=u'2016'),
 Row(CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'PORTS AMERICA GROUP, INC.', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'CHIEF PROCESS OFFICER', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'193066.0', YEAR=u'2016'),
 Row(CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'GATES CORPORATION, A WHOLLY-OWNED SUBSIDIARY OF TOMKINS PLC', SOC_NAME=u'CHIEF EXECUTIVES', JOB_TITLE=u'REGIONAL PRESIDEN, AMERICAS', FULL_TIME_POSITION=u'Y', PREVAILING_WAGE=u'220314.0', YEAR=u'2016'),
 Row(CASE_STATUS=u'WITHDRAWN', EMPLOYER_NAME=u'PEABODY INVESTMENT

**Next, print the first row out the five fetched rows. Then print the `EMPLOYER_NAME` for the first row entry.**

In [10]:
df.head(5)[0]

Row(CASE_STATUS=u'CERTIFIED-WITHDRAWN', EMPLOYER_NAME=u'UNIVERSITY OF MICHIGAN', SOC_NAME=u'BIOCHEMISTS AND BIOPHYSICISTS', JOB_TITLE=u'POSTDOCTORAL RESEARCH FELLOW', FULL_TIME_POSITION=u'N', PREVAILING_WAGE=u'36067.0', YEAR=u'2016')

In [11]:
df.head(5)[0].EMPLOYER_NAME

u'UNIVERSITY OF MICHIGAN'

In [12]:
first_five = df.head(5)
for each_element in first_five:
    print each_element.JOB_TITLE

POSTDOCTORAL RESEARCH FELLOW
CHIEF OPERATING OFFICER
CHIEF PROCESS OFFICER
REGIONAL PRESIDEN, AMERICAS
PRESIDENT MONGOLIA AND INDIA


### Selecting columns
In pandas, we pass a string into a single pair of brackets ([]) to select an individual column, and pass in a list to select multiple columns. For example:

#### Pandas DataFrame
df['age']

df[['age', 'males']]

Spark also allows us to use bracket notation. Pass in a list of string objects with column name to select any column.

**Print the age value for first five employees in the dataframe**

In [13]:
df.select('YEAR')

DataFrame[YEAR: string]

Dataframes being lazily evaluated like RDDs will only display the results of an operation when we call any action upon it. We can call the show() method.

In [14]:
df.select('YEAR').show()

+----+
|YEAR|
+----+
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
|2016|
+----+
only showing top 20 rows



**Display Employer Name with their case status.**


In [15]:
# Hint: Use select() to display required columns
# df.select()
# df[[]]

Let's look at the total number of rows in our dataframe. We can use count() to give us total number of rows in our dataframe.

In [16]:
df.count()

5000

**Drop Rows containing NULL values**

We can use `drop(how='any', thresh=None, subset=None)` method on our dataframe to drop rows with null values and return a new dataframe.

**Parameters:**	

**how** – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.

**thresh** – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.

**subset** – optional list of column names to consider.

In [17]:
df1 = df.na.drop()
df1.show()

+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+
|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|
+-------------------+--------------------+--------------------+--------------------+------------------+---------------+----+
|CERTIFIED-WITHDRAWN|UNIVERSITY OF MIC...|BIOCHEMISTS AND B...|POSTDOCTORAL RESE...|                 N|        36067.0|2016|
|CERTIFIED-WITHDRAWN|GOODMAN NETWORKS,...|    CHIEF EXECUTIVES|CHIEF OPERATING O...|                 Y|       242674.0|2016|
|CERTIFIED-WITHDRAWN|PORTS AMERICA GRO...|    CHIEF EXECUTIVES|CHIEF PROCESS OFF...|                 Y|       193066.0|2016|
|CERTIFIED-WITHDRAWN|GATES CORPORATION...|    CHIEF EXECUTIVES|REGIONAL PRESIDEN...|                 Y|       220314.0|2016|
|          WITHDRAWN|PEABODY INVESTMEN...|    CHIEF EXECUTIVES|PRESIDENT MONGOLI...|                 Y|       157518.4|2016|


### Replace Null values
**What if don't want to drop entire row but just replace the null values?**

`fillna(value, subset=None)` enables us to replace null values in our dataframe. We can optionally specify the set of columns into which we want to replace nul values.

In [18]:
# Replace null values in all the columns
df = df.fillna(0)
# df.fillna(0).show()

Replace null values only in the Columns `CASE_STATUS` and `EMPLOYER_NAME`. 

Hint: Use `fillna(value, subset=None)` and specify required column names in the subset parameter. For example - `df.fillna(0, subset=['a', 'b'])`

In [19]:
# df_filterd = 

What are the possible categories in the case status?

`distinct()`: Returns a new DataFrame containing the distinct rows in this DataFrame.

So next, select the `CASE_STATUS` column and apply `distint()` method on it.

In [20]:
df.select('CASE_STATUS').distinct().show()

+-------------------+
|        CASE_STATUS|
+-------------------+
|          CERTIFIED|
|CERTIFIED-WITHDRAWN|
|          WITHDRAWN|
|             DENIED|
+-------------------+



Determine Distinct `CASE_STATUS` count for each `EMPLOYER_NAME`

For example, determine how many visa applications are certified under the employer name `SAMSUNG ELECTRONICS`

We can use `crosstab()` method to get this done. **crosstab(col1, col2)** computes a pair-wise frequency table of the given columns.

**Parameters:**	

**col1** – The name of the first column. Distinct items will make the first item of each row.

**col2** – The name of the second column. Distinct items will make the column names of the DataFrame.

In [21]:
df2 = df.crosstab('EMPLOYER_NAME', 'CASE_STATUS')

**Determine the top Employers getting more visa applications into a Certified Status**

Find out the top 10 companies having highest number of certified visa applications.

In [22]:
df2.show()

+-------------------------+---------+-------------------+------+---------+
|EMPLOYER_NAME_CASE_STATUS|CERTIFIED|CERTIFIED-WITHDRAWN|DENIED|WITHDRAWN|
+-------------------------+---------+-------------------+------+---------+
|        PROFORM GROUP INC|        2|                  0|     0|        0|
|          MIROCULUS, INC.|        1|                  0|     0|        0|
|       TTI LEXINGTON, LLC|        2|                  0|     0|        0|
|     SAMSUNG ELECTRONI...|        4|                  1|     0|        1|
|         SWAGELOK COMPANY|        1|                  0|     0|        0|
|     MOSS & ASSOCIATES...|        1|                  0|     0|        1|
|            QULINARY INC.|        1|                  0|     0|        0|
|       IDM PRODUCTION LLC|        0|                  1|     0|        0|
|              ACCUEN INC.|        2|                  1|     0|        0|
|          COVER-MORE INC.|        1|                  0|     1|        0|
|     AMENITY SERVICES,..

In [23]:
df2 = df2.orderBy(df2['CERTIFIED'].desc())
df2.show()

+-------------------------+---------+-------------------+------+---------+
|EMPLOYER_NAME_CASE_STATUS|CERTIFIED|CERTIFIED-WITHDRAWN|DENIED|WITHDRAWN|
+-------------------------+---------+-------------------+------+---------+
|     AMAZON CORPORATE LLC|       50|                  1|     0|        1|
|     ADOBE SYSTEMS INC...|       32|                  0|     1|        3|
|               APPLE INC.|       20|                  0|     0|        0|
|      CISCO SYSTEMS, INC.|       16|                  0|     1|        0|
|              GOOGLE INC.|       15|                  6|     0|        0|
|     WAL-MART ASSOCIAT...|       14|                  2|     0|        3|
|             VMWARE, INC.|       12|                  0|     0|        0|
|          EMC CORPORATION|       11|                  0|     0|        0|
|     SEARS HOLDINGS MA...|        9|                  9|     0|        0|
|     BECTON, DICKINSON...|        9|                  0|     0|        0|
|     BURGER KING CORPO..

**Which `JOB_TITLE` got the highest number of certified visa applications?**

In [24]:
df3 = df.crosstab('JOB_TITLE', 'CASE_STATUS')

In [25]:
df4 = df3.orderBy(df3['CERTIFIED'].desc())
df3.orderBy(df3['CERTIFIED'].desc()).show()

+---------------------+---------+-------------------+------+---------+
|JOB_TITLE_CASE_STATUS|CERTIFIED|CERTIFIED-WITHDRAWN|DENIED|WITHDRAWN|
+---------------------+---------+-------------------+------+---------+
|   OPERATIONS MANAGER|      206|                  9|    21|        6|
|    MARKETING MANAGER|      191|                 11|    25|        9|
|      GENERAL MANAGER|      128|                  2|    17|        3|
| CHIEF EXECUTIVE O...|      127|                  3|    17|        5|
|      PRODUCT MANAGER|       94|                 13|     0|        7|
| BUSINESS DEVELOPM...|       91|                  2|     1|        2|
| CHIEF OPERATING O...|       70|                  9|     6|        4|
| PRODUCT MARKETING...|       47|                  8|     0|        4|
| DIRECTOR OF OPERA...|       46|                  0|     3|        2|
| SENIOR PRODUCT MA...|       45|                  5|     2|        0|
|    MANAGING DIRECTOR|       34|                  2|     3|        0|
|     

## Operations on columns

**Classify Application status for each job title as either `Certified` or `NON-CERTIFIED`.**

Hint: For each row we can sum up the values of columns `CERTIFIED-WITHDRAWN` + `WITHDRAWN` + `DENIED` into one single column as `NON-CERTIFIED`

In [26]:
df5 = df4.select(df4['JOB_TITLE_CASE_STATUS'], df4['CERTIFIED'], df4['CERTIFIED-WITHDRAWN']+df4['WITHDRAWN']+df4['DENIED'])

df4.select(df4['JOB_TITLE_CASE_STATUS'], df4['CERTIFIED'], df4['CERTIFIED-WITHDRAWN']+df4['WITHDRAWN']+df4['DENIED']).show()

+---------------------+---------+--------------------------------------------+
|JOB_TITLE_CASE_STATUS|CERTIFIED|((CERTIFIED-WITHDRAWN + WITHDRAWN) + DENIED)|
+---------------------+---------+--------------------------------------------+
|   OPERATIONS MANAGER|      206|                                          36|
|    MARKETING MANAGER|      191|                                          45|
|      GENERAL MANAGER|      128|                                          22|
| CHIEF EXECUTIVE O...|      127|                                          25|
|      PRODUCT MANAGER|       94|                                          20|
| BUSINESS DEVELOPM...|       91|                                           5|
| CHIEF OPERATING O...|       70|                                          19|
| PRODUCT MARKETING...|       47|                                          12|
| DIRECTOR OF OPERA...|       46|                                           5|
| SENIOR PRODUCT MA...|       45|                   

In [27]:
from pyspark.sql.functions import *

In [28]:
df6 = df5.select('JOB_TITLE_CASE_STATUS', 'CERTIFIED', col("((CERTIFIED-WITHDRAWN + WITHDRAWN) + DENIED)").alias("NON-CERTIFIED"))

In [29]:
df6.show()

+---------------------+---------+-------------+
|JOB_TITLE_CASE_STATUS|CERTIFIED|NON-CERTIFIED|
+---------------------+---------+-------------+
|   OPERATIONS MANAGER|      206|           36|
|    MARKETING MANAGER|      191|           45|
|      GENERAL MANAGER|      128|           22|
| CHIEF EXECUTIVE O...|      127|           25|
|      PRODUCT MANAGER|       94|           20|
| BUSINESS DEVELOPM...|       91|            5|
| CHIEF OPERATING O...|       70|           19|
| PRODUCT MARKETING...|       47|           12|
| DIRECTOR OF OPERA...|       46|            5|
| SENIOR PRODUCT MA...|       45|            7|
|    MANAGING DIRECTOR|       34|            5|
|            PRESIDENT|       33|            5|
|                  CEO|       31|            6|
| GENERAL AND OPERA...|       29|           12|
|     ACCOUNT DIRECTOR|       26|            5|
|   EXECUTIVE DIRECTOR|       24|            1|
| ADVERTISING AND P...|       23|            5|
| BUSINESS DEVELOPM...|       23|       

**Find the total number of `CERTIFIED` and `NON-CERTIFIED` applications in your dataframe.**

Hint: Use aggregation function like sum() to compute total number in each category.

In [30]:
total_certified = float(df6.groupBy().sum('CERTIFIED').collect()[0][0])
total_certified

4149.0

In [31]:
# Similarly, calculate total number of NON-CERTIFIED applications.

# total_noncertified = 

Here’s the problem: I have a Python function that iterates over my data, but going through each row in the dataframe takes several days. If I have a computing cluster with many nodes, how can I distribute this Python function in PySpark to speed up this process — maybe cut the total time down to less than a few hours — with the least amount of work?

In other words, how do I turn a Python function into a Spark user defined function, or UDF?

<img src = "../images/dataframe.png">

### How can we make use of User Defined Functions on our Dataframes?

Recall, what was the use of `map()` and `flatMap()` methods when we were operating on our RDDs. Basically these help us to apply the user defined functions on each partition of our RDD.

Similarly, spark allow us to operate on dataframe using our custom functions.

**Steps:**

1. Define your custom function
2. Register UDF


In [32]:
def share(s):
  return (s / total_certified)

### Registering a UDF

- PySpark UDFs work in a similar way as the pandas .map() and .apply() methods for pandas series and dataframes. If I have a function that can use values from a row in the dataframe as input, then I can map it to the entire dataframe. The only difference is that with PySpark UDFs we have to specify the output data type.

- As long as the python function’s output has a corresponding data type in Spark, it can be turned into a UDF. When registering UDFs, we have to specify the data type using the types from pyspark.sql.types. All the types supported by PySpark [can be found here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=types#module-pyspark.sql.types).

- udf(): Returns a **UDFRegistration** for UDF registration.

- register(name, f, returnType=StringType): Registers a python function (including lambda function) as a **UDF** so it can be used in SQL statements.



In [33]:
from pyspark.sql.functions import udf

sqlContext.udf.register("Employershare", share)

print total_certified

4149.0


In [34]:
df7 = df2.select(df2.EMPLOYER_NAME_CASE_STATUS, df2.CERTIFIED.cast("float"))
df7.show()

+-------------------------+---------+
|EMPLOYER_NAME_CASE_STATUS|CERTIFIED|
+-------------------------+---------+
|     AMAZON CORPORATE LLC|     50.0|
|     ADOBE SYSTEMS INC...|     32.0|
|               APPLE INC.|     20.0|
|      CISCO SYSTEMS, INC.|     16.0|
|              GOOGLE INC.|     15.0|
|     WAL-MART ASSOCIAT...|     14.0|
|             VMWARE, INC.|     12.0|
|          EMC CORPORATION|     11.0|
|     SEARS HOLDINGS MA...|      9.0|
|     BECTON, DICKINSON...|      9.0|
|          ECOLAB USA INC.|      8.0|
|      GENENTECH USA, INC.|      8.0|
|        EPAM SYSTEMS, INC|      8.0|
|     MICROSOFT CORPORA...|      8.0|
|               ABBVIE INC|      8.0|
|              DROGA5, LLC|      8.0|
|            NETFLIX, INC.|      8.0|
|       CVS PHARMACY, INC.|      8.0|
|     BURGER KING CORPO...|      8.0|
|              INTUIT INC.|      7.0|
+-------------------------+---------+
only showing top 20 rows



In [35]:
from pyspark.sql.types import FloatType
share_udf = udf(share, FloatType())
df8 = df7.select("EMPLOYER_NAME_CASE_STATUS", share_udf(df7.CERTIFIED).alias("%share"))


In [36]:
df8.show()

+-------------------------+------------+
|EMPLOYER_NAME_CASE_STATUS|      %share|
+-------------------------+------------+
|     AMAZON CORPORATE LLC| 0.012051096|
|     ADOBE SYSTEMS INC...| 0.007712702|
|               APPLE INC.|0.0048204386|
|      CISCO SYSTEMS, INC.| 0.003856351|
|              GOOGLE INC.| 0.003615329|
|     WAL-MART ASSOCIAT...| 0.003374307|
|             VMWARE, INC.|0.0028922632|
|          EMC CORPORATION|0.0026512414|
|     BECTON, DICKINSON...|0.0021691974|
|     SEARS HOLDINGS MA...|0.0021691974|
|              DROGA5, LLC|0.0019281755|
|               ABBVIE INC|0.0019281755|
|     BURGER KING CORPO...|0.0019281755|
|     MICROSOFT CORPORA...|0.0019281755|
|          ECOLAB USA INC.|0.0019281755|
|        EPAM SYSTEMS, INC|0.0019281755|
|            NETFLIX, INC.|0.0019281755|
|      GENENTECH USA, INC.|0.0019281755|
|       CVS PHARMACY, INC.|0.0019281755|
|      GENERAL MILLS, INC.|0.0016871535|
+-------------------------+------------+
only showing top

**Here’s a small gotcha** — because Spark UDF doesn’t convert integers to floats, unlike Python function which works for both integers and floats, a Spark UDF will return a column of NULLs if the input data type doesn’t match the output data type, as in the following example.

In [37]:
from pyspark.sql.types import IntegerType
share_integer_udf = udf(share, IntegerType())
df9 = df7.select("EMPLOYER_NAME_CASE_STATUS", share_integer_udf(df7.CERTIFIED).alias("%share"))

In [38]:
df9.show()

+-------------------------+------+
|EMPLOYER_NAME_CASE_STATUS|%share|
+-------------------------+------+
|     AMAZON CORPORATE LLC|  null|
|     ADOBE SYSTEMS INC...|  null|
|               APPLE INC.|  null|
|      CISCO SYSTEMS, INC.|  null|
|              GOOGLE INC.|  null|
|     WAL-MART ASSOCIAT...|  null|
|             VMWARE, INC.|  null|
|          EMC CORPORATION|  null|
|     BECTON, DICKINSON...|  null|
|     SEARS HOLDINGS MA...|  null|
|     MICROSOFT CORPORA...|  null|
|      GENENTECH USA, INC.|  null|
|       CVS PHARMACY, INC.|  null|
|              DROGA5, LLC|  null|
|          ECOLAB USA INC.|  null|
|     BURGER KING CORPO...|  null|
|        EPAM SYSTEMS, INC|  null|
|            NETFLIX, INC.|  null|
|               ABBVIE INC|  null|
|      GENERAL MILLS, INC.|  null|
+-------------------------+------+
only showing top 20 rows



## Convert Spark Dataframe to Pandas

In [45]:
pandas_df = df9.toPandas()
pandas_df.columns

Index([u'EMPLOYER_NAME_CASE_STATUS', u'%share'], dtype='object')

# Part 2: SQL Queries

We can create a temproary table view from a Dataframe, which can be further used to perform SQL queries on the data. In part 1, we saw operations using Dataframes. We will pick the same dataset and perform some basic SQL queries.

**Create a table out of `df2`**

Hint: We can use `registerTempTable(name)` method on any dataframe to create a table out of it.

**`registerTempTable(name)`**: Registers this RDD as a temporary table using the given name.

- The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame.

In [39]:
df.registerTempTable("visa_table")

## How to perform SQL queries through Spark?

TO perform SQL Queries we can `SparkSession.sql(sqlQuery)` where `sqlQuery` can be any valid sql query.

- **`SparkSession.sql(sqlQuery)`**: **Returns a DataFrame** representing the result of the given query.

In [49]:
df_visa = spark.sql("select * from visa_table")

**Let's check if the above query gave us the identical dataframe in the result!**

In [50]:
sorted(df_visa.collect()) == sorted(df.collect())

True

In [51]:
#Top10 companies getting visa approval (for all the years)
spark.sql("SELECT EMPLOYER_NAME, count(EMPLOYER_NAME) as CERTIFIED_COUNT FROM visa_table where CASE_STATUS = 'CERTIFIED' GROUP BY EMPLOYER_NAME order by CERTIFIED_COUNT desc").show(10)

+--------------------+---------------+
|       EMPLOYER_NAME|CERTIFIED_COUNT|
+--------------------+---------------+
|AMAZON CORPORATE LLC|             50|
|ADOBE SYSTEMS INC...|             32|
|          APPLE INC.|             20|
| CISCO SYSTEMS, INC.|             16|
|         GOOGLE INC.|             15|
|WAL-MART ASSOCIAT...|             14|
|        VMWARE, INC.|             12|
|     EMC CORPORATION|             11|
|BECTON, DICKINSON...|              9|
|SEARS HOLDINGS MA...|              9|
+--------------------+---------------+
only showing top 10 rows



In [55]:
spark.sql("SELECT JOB_TITLE, count(*) as Approved FROM visa_table where CASE_STATUS = 'CERTIFIED' GROUP BY JOB_TITLE order by Approved desc").show(5)

+--------------------+--------+
|           JOB_TITLE|Approved|
+--------------------+--------+
|  OPERATIONS MANAGER|     206|
|   MARKETING MANAGER|     191|
|     GENERAL MANAGER|     128|
|CHIEF EXECUTIVE O...|     127|
|     PRODUCT MANAGER|      94|
+--------------------+--------+
only showing top 5 rows



We can see that in our dataset the Job Title as `OPERATIONS MANAGER` has got highest number of approvals.

**Let's find out the `EMPLOYER_NAME` having the highest number of operations manager getting visa approved.**

In [59]:
spark.sql("SELECT EMPLOYER_NAME,count(*) as Approved FROM visa_table where CASE_STATUS = 'CERTIFIED' AND JOB_TITLE ='OPERATIONS MANAGER' GROUP BY EMPLOYER_NAME order by Approved desc").show(5)

+--------------------+--------+
|       EMPLOYER_NAME|Approved|
+--------------------+--------+
|     TAKETOURS, INC.|       3|
|       SOLE COOL INC|       3|
|PRINTRONIC CORPOR...|       2|
|MARUTI MANAGEMENT...|       2|
|JAMSAN HOTEL MANA...|       2|
+--------------------+--------+
only showing top 5 rows



**Next, find out the approved applications having the highest paid salaries.**

In [62]:
spark.sql("SELECT EMPLOYER_NAME as businesses, PREVAILING_WAGE as wage, SOC_NAME, JOB_TITLE, YEAR, FULL_TIME_POSITION, CASE_STATUS  FROM visa_table where CASE_STATUS ='CERTIFIED' order by PREVAILING_WAGE desc").show(10)

+--------------------+-------+------------------+--------------------+----+------------------+-----------+
|          businesses|   wage|          SOC_NAME|           JOB_TITLE|YEAR|FULL_TIME_POSITION|CASE_STATUS|
+--------------------+-------+------------------+--------------------+----+------------------+-----------+
|BANDY CANYON RANC...|99986.0|  CHIEF EXECUTIVES|CHIEF EXECUTIVE O...|2016|                 Y|  CERTIFIED|
|CALIFORNIA GALVAN...|99986.0|  CHIEF EXECUTIVES|CHIEF EXECUTIVE O...|2016|                 Y|  CERTIFIED|
|         LOMICS, LLC|99986.0|  CHIEF EXECUTIVES|                 CEO|2016|                 Y|  CERTIFIED|
|UC UNIVERSITY HIG...|99986.0|  CHIEF EXECUTIVES|CHIEF FINANCIAL O...|2016|                 Y|  CERTIFIED|
|         SUN BUM LLC|99986.0|  CHIEF EXECUTIVES|CHIEF EXECUTIVE O...|2016|                 Y|  CERTIFIED|
|THE STUDENT LOAN ...|99972.0|MARKETING MANAGERS|     PRODUCT MANAGER|2016|                 Y|  CERTIFIED|
|SEARS HOLDINGS MA...|99972.0|MARKETI

**Next, determine maximum salaries by `JOB_TITLE` for FULL TIME POSITIONS**

In [64]:
spark.sql("SELECT JOB_TITLE ,MAX(PREVAILING_WAGE) as Max_Salary FROM visa_table where CASE_STATUS ='CERTIFIED' AND  FULL_TIME_POSITION ='Y' GROUP BY JOB_TITLE ORDER BY Max_Salary DESC").show(10)

+--------------------+----------+
|           JOB_TITLE|Max_Salary|
+--------------------+----------+
|CHIEF EXECUTIVE O...|   99986.0|
|CHIEF EXECUTIVE O...|   99986.0|
|CHIEF FINANCIAL O...|   99986.0|
|                 CEO|   99986.0|
|     PRODUCT MANAGER|   99972.0|
|DIRECTOR, BUSINES...|   99972.0|
|   DIRECTOR, PRICING|   99972.0|
|DIRECTOR, PROGRAM...|   99972.0|
|MANAGER, BUSINESS...|   99972.0|
|PLASTICS MOVEMENT...|   99819.0|
+--------------------+----------+
only showing top 10 rows

