# Spark APIs
## Introdution
One of the advantages of Spark is that although it is written in Scala, it provides several langauge APIs: you can work with using Scala, Java, Python or R. Withing each programming environment, Spark provides a common way to interact with its functionality through its APIs. At a lower level, you can use the RDD API which is the foundational API upon which other higher level APIs are built on. These higher level API's are structured APIs suitable for dealing with data. All the structured APIs are built on top of Spark-SL engine. Spark provides the following structured APIs:
- Spark SQL
- DataFrame
- pandas API on Spark (available in Python)
- Datasets (only available in Scala and Java)
In addition to these core data APIs, Spark provides other API such as the MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

## Learning outcomes
In this tutorial, you will explore Spark RDDs, DataFrame, SQL to appreciate how they differ. At the end of the tutorial, you should be able to:
- Write Spark program using RDDs
- Write MapReduce program style in Spark using RDDs
- Use Spark DataFrame API
- Use pandas API on Spark
- Write a Spark-SQL queries
- Appreciate the difference between RDD as a low-level API and Spark structured API's

## Python setup

In [2]:
## Python setup
from IPython.display import Image
import pandas as pd
import numpy as np
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

## Inputs setup
Lets provide paths to input files we will use.
Its a good practice to create these as global variables. Also, use Python module ```Path``` from pathlib to manage file paths.

In [3]:
# Altenatively, you can put a full-path to wheree your data is located like below
# DATA_DIR = Path(full-path-folder-where-you-are-keeping-data)
#DATA_DIR = Path().cwd().parents[0].joinpath("DATA")


# Path to any large CSV file (e.g., activity_log_raw,csv)
#LARGE_CSV = Path().cwd().parents[2].joinpath("WBG-LOCAL/MADAGASCAR-POV-MAPPING/data/input/census/data/ResidentIBEIPM.csv")

# path to hh_data.txt
HH_DATA = "/content/drive/MyDrive/DATASETS/hh_data.txt"

# word_count_files folder
# in your data folder, create a word_count_files folder
WORD_COUNT = "/content/drive/MyDrive/DATASETS/word_count_file.txt"

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## The Data Science Task
### Description of the data
In order to explore these Spark API, we use the [following dataset](). The data is from a population and housing census of some country ```X```  not identified for privacy reasons although this data is a very small subset of the actual data.
Each row in the data represent a single individual in the population.
 A summary of column description is provided below:
- **Geographic identifiers:** province, region, district represented by ```prov_id, reg_id, dist_id and adm4``` (administrative region level 4-which comes after district)	 respectively. Also, these same variables are embsedded in the ```hh_id``` column which represents unique id of each household.
- **urban_rural:** A classification iof whether this person lived in urban or rural area
- **Sex**. ```P05```==>[1	- Masc 2	- Fém]

- **P03:** whether the person is the head of the household, wife. child etc==>[0- Chef de Ménage (CM) 1- Conjoint(e) (CJ) 2-	Fils/Fille3-	Père/Mère 4-	Beau-Père/Belle-Mère 5-	Beau-Fils/Belle-Fille 6-	Petit fils/Petite-fille
 Autre Proche du CM 8- Autre proche du CJ 9 -Sans lien de parenté]

- **Age:**. Person's date of birth is given by column ```P07M``` (month of birth), ```P07A``` (year of birth) and ```P08``` (age)
- **Marital status:** ```P28``` (whether the person is married or not)==>[1-	Célibataire, 2-	Marié(e), 3-	Divorcé(e)/Séparé(e), 4-	Veuf(ve)
] while ```P29``` (age at marriage).
- **School attendance:** ```P21``` ==>[0 N'a Jamais fréquenté 1-A	fréquenté 2- Fréquente actuellement]
- **Highest school level attended:**```P22N``` (see screenshot below for interpretation of values)
- **Whether the person worked or not:** ```P23```==> [1-	0ccupé 2-	Chômeur 3-	En quête du 1er emploi 4-	Ménagère 5-	Elève/Etudiant 6-	Retraité 7- lncapacité à travailler 8- Autre]

### Description of the data science task
We would like to find out the following from the data:
1. What is the mean age in the country?
2. Which province has the largest population
3. Whats the mean household size in the country?

In [None]:
Image('../DOCS/images/P22N.png')

## RDDs
````Most of these notes are shamelessly copied from here; please visit the site for more indepth discussion````

We learned during the lectures that every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster.
Although Spark provides high level APIs such as DataFrames and SQL, all these are built on top the lowe level abstraction called resilient distributed dataset (RDD). Although you will not often work with RDDs,
its still important to understand the basics of how they work as more often than not, functionality in the high level APIs will become limited and you will need to work with RDDs.

### What are RDDs and how do create them
An RDD is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are considered as the main/first abstraction provided by Spark. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

### Spark shared variables
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: **broadcast variables**, which can be used to cache a value in memory on all nodes, and **accumulators**, which are variables that are only “added” to, such as counters and sums.

### Initializing Spark
When working with RDD's, unlike with DataFrames, we use a **SparkContext** object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application. However, we can stitll create a **SparkContext**  object from SparkSession.

When we initialize Spark, we need to provide several parameters as follows:
- ```appName```. Any name you would like tomgive to the spark application
- ```master()```. In a cluster setup, there is always one computer node which acts as a controller or master node and thats where the driver program runs. Spark needs to know the IP adreess of this node.
Also, in a cluster, as you saw, every node has an IP address as thats how the nodes communicate with each other.
When running in local mode, the driver program and Spark executors all run on the same node (or the local host). We tell Spark by using string: ```local[num_cores]```
and we can specify how many cores to use on your machine. For example, ```local[4]``` or ```local[*]``` to use all cores on your machine.
The IP adress for local host is often ```127.0.0.1```: in some cases you want to tell Spark the exact adress.
- ```config()```. When we initialize Spark, we can also pass many configurations through the ```config()``` property.
This is to provide run-time settings such as how much memory to give to the driver, how many cores etc. Please see [Spark configuration properties](https://spark.apache.org/docs/latest/configuration.html) for the full list of configuration. For example, you can tell Spark to give the driver program 8GB of RAM like this: config("spark.driver.memory", "8g")

#### Viewing current Spark configurations and settings
Its important to view the current settings as in some cases you may be getting an error because Spark doesnt have access to enough resources.
Given a SparkSession object called ```spark```,
you can get confihiurations by invoking the SparkConfig object through the SparkContext object like this: ```spark.sparkContext.getConf().getAll()```

#### Initialize Spark with default configurations

In [4]:
# create a SparkSession which is enough to access DataFrame, Datasets and SQL API's
spark = SparkSession.builder.appName("intro").master("local[*]").getOrCreate()

# We create SparkContext object which we need to access the RDD API
sc = spark.sparkContext

In [5]:
rdd_hh = sc.textFile(str(HH_DATA))
rdd_hh.first()

'\turban_rural\thh_id\tP03\tP05\tP07M\tP07A\tP08\tP21\tP22N\tP23\tP28\tP29'

#### Initialize Spark with custom configurations
When we set Spark configurations this way, we are doing it in runtime and we override all other Spark settings and configurations.
Its worth mentioning that in some cases (at least it happens on my MacBook),
a Spark application fails to pick up these new settings and still runs with default configs. So, where are these default configurations?

##### Spark default configurations
If you set configurations in a Spark application they seem not to work, you need to change the Spark default configurations.
In the Spark installation folder, you will find a config folder where you can make changes to the ```spark-defaults.conf```
to add the configurations we are providing below.

In [None]:
Image("../DOCS/images/spark-confs.png", width=600)

In [None]:
# spark = SparkSession.builder.appName("intro").master("local[*]")\
#                     .config("spark.executor.cores", '3') \
#                     .config("spark.executor.memory", '8g')\
#                     .config('spark.driver.maxResultSize', '6g')\
#                     .getOrCreate()


<font color='blue'>**EXERCISE-1: EXPLORE SPARK CONFIGS** </font>
1. Print out spark current configs using ```spark.sparkContext.getConf().getAll()```
2. Explore the configuration ```spark.driver.maxResultSize```. What does it control?
3. Whats the difference between ```spark.executor.instances``` and ```spark.executor.cores```

### Creating RDDs

#### RDDs from Python objects on the driver program
Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program in Python.
The elements of the collection are copied to form an RDD that can be operated on in parallel.

In [6]:
# we can use the parallelize function to create an RDD from Python objects
# however, we need a SparkContext object to create RDD and we can create  it from SparkSession
data = [("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)]
colnames = ["name", "age"]
rdd_data = sc.parallelize(data)
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average

In [7]:
rdd_data.first()

('Brooke', 20)

#### RDDs from external data sources
In practice, we often ingest data from external sources. PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext’s textFile method. This method takes a URI (full path to the file) for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and
reads it as a collection of lines.

<font color='blue'>**EXERCISE-2: LOADING TEXT FILES WITH SPARKCONTEXT** </font>

1. Read through the documentation for the method ```sc.textFile()```.
2. Compared to how we read data with pandas DataFrame and/or in R, what are some limitations of this method? Mention at least 3.

In [8]:
# Load the data
rdd_from_file = sc.textFile(str(HH_DATA))

In [9]:
rdd_from_file.take(5)

['\turban_rural\thh_id\tP03\tP05\tP07M\tP07A\tP08\tP21\tP22N\tP23\tP28\tP29',
 '0\t1\t11101101010011066020020002\t0\t1\t10\t1954\t63\t1\t3\t6\t2\t24',
 '1\t1\t11101101010011066020020002\t1\t2\t8\t1950\t67\t1\t3\t6\t2\t28',
 '2\t1\t11101101010011066020020002\t2\t1\t3\t1980\t38\t1\t5\t1\t1\t ',
 '3\t1\t11101101010011066020020002\t2\t1\t9\t1984\t33\t1\t5\t2\t1\t ']

### Working with RDDs
Once we have loaded our external dataset ito an RDD, what can we do with it?
Unlike a DataFrame, RDD's dont provide many already made data functions such a aggregation. Instead, you have to use low level methods such as map to create such functions for yourself.
In order to explore the available methods on a Spark RDD object, refer to the [Pyspark RDD API documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis).

<font color='blue'>**EXERCISE-3: USEFUL METHODS FOR DATA PROCESSING/EXPLORATION/MANIPULATION ON THE SPARK RDD OBJECT** </font>

Clearly, the Spark RDD was not designed to provide the type of functionality we are used to in Pandas DataFrames and/or DataFrames in R.
However, there is still some reasonable functionality to enable quick data exploration.
1. Provide any such functions whi the RDD obkect has which are useful for data wrangling? Provideat at least 3 methods
2. For each method mentioned above, use it on the them RDD defined from ```hh_data.txt``` and report the results

### Compute national mean age from scratch from RDD
1. Split each line in the RDD into separate columns
2. Make sure we skip the header column
3. We need to convert the string into numbers
4. Use RDD [map](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html#pyspark.RDD.map) and [reduce](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduce.html#pyspark.RDD.reduce) functions to compute mean.
Other function which is also useful is ```filter```.

In [10]:
def split_line(line):
    """
    Split the line which has a single string
    into a list where each element represents a
    column
    """

    # Split the line
    col_items = line.split("\t")

    # Do some extra processing
    # Because we know age is in col 7, we can only
    # return age and also convert to numeric
    try:
        return float(col_items[7])
    except:
        return 'NaN'

# The lines which are chained can be split and run separately

In [11]:
rdd_from_file_split = rdd_from_file.map(split_line)

# To check that its running okay, use collect()
# collected = rdd_from_file_split.collect()

rdd_from_file_split_nums = rdd_from_file_split.filter(lambda x: x != 'NaN')
#collected_nums = rdd_from_file_split_nums.collect()
#print(collected_nums[:5])
# Now run reduce

rdd_sum = rdd_from_file_split_nums.reduce(lambda x, y: x + y)

obs_cnt = rdd_from_file_split.count()
avg_age = rdd_sum/obs_cnt

In [12]:
# Skip the header
# Split lines and keep only age and convert it to numeric
rdd_from_file_split = (rdd_from_file
                .map(split_line)
                .filter(lambda x: x != 'NaN'))

# Apply reduce to get sum of al numbers
rdd_sum = rdd_from_file_split.reduce(lambda x, y: x + y)

# Get number of elements in list using count() and then get mean
obs_cnt = rdd_from_file_split.count()
avg_age = rdd_sum/obs_cnt


# Check out output
print('National level aveerage age: {}'.format(int(avg_age)))

National level aveerage age: 22


<font color='blue'>**EXERCISE-4: WHAT IF WE WANTED TO COMPUTE AVERAGE AGE FOR EACH DISTRICT?** </font>
1. Use toy example below to learn how to compute average when we have keys.
2. Next, use the same strategy to compute and report mean age by district.
    - Recall that district code is embedded in the hh_id columnn. Refer to data description above
3. Save the output into a CSV file

In [13]:
# we can use the parallelize function to create an RDD from Python objects
# however, we need a SparkContext object to create RDD and we can create  it from SparkSession
data = [("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)]
colnames = ["name", "age"]
rdd_data = sc.parallelize(data)
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average

ages_rdd = (rdd_data
.map(lambda x: (x[0], (x[1], 1)))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.map(lambda x: (x[0], x[1][0]/x[1][1])))

# you can use collect() function on RDD to bring all the data on a single core and
# look at it
ages_rdd_list = ages_rdd.collect()

### MapReduce type of computations in with Spark RDD's
The Spark RDD has several **map** and **reduce** which are similar in style to the MapReduce type of functions
which we write with Hadoop  MapReduce but they are not exactly the same. We expore some of the functions below:
- map
- flatmap
- mapValues
- mapPartitions
- reduce
- reduceByKey
- reduceByKeyLocally

#### Word count using Spark
Lets do the classical word-count with spark to explore differences in how the different map functions work

In [14]:
# sc = spark.sparkContext
words  = ['hadoop is fast',  'hive is sql on hdfs', 'spark is superfast',
          'spark is awesome', 'spark is faster than hadoop', 'spark is very fast']
rdd_data = sc.parallelize(words)

##### First, lets use ```flatmap```
Note that with ```reduceByKey``` we dont need to do the groupByKey step required in MapReduce.

In [15]:
# Split lines into separate words
fm = rdd_data.flatMap(lambda line:line.split(" "))

# View flatmap results
print('Results of flatmap \n')
print(fm.collect())

# create key-value pairs
fm_kv = fm.map(lambda x: (x,1))
print()
print('Results of map on flatmap results \n')
print(fm_kv.collect())

# Now reduce to get totals by key
wc = fm_kv.reduceByKey(lambda x, y: x + y)

print()
print('Results of reduceByKey \n')
print(wc.collect())

Results of flatmap 

['hadoop', 'is', 'fast', 'hive', 'is', 'sql', 'on', 'hdfs', 'spark', 'is', 'superfast', 'spark', 'is', 'awesome', 'spark', 'is', 'faster', 'than', 'hadoop', 'spark', 'is', 'very', 'fast']

Results of map on flatmap results 

[('hadoop', 1), ('is', 1), ('fast', 1), ('hive', 1), ('is', 1), ('sql', 1), ('on', 1), ('hdfs', 1), ('spark', 1), ('is', 1), ('superfast', 1), ('spark', 1), ('is', 1), ('awesome', 1), ('spark', 1), ('is', 1), ('faster', 1), ('than', 1), ('hadoop', 1), ('spark', 1), ('is', 1), ('very', 1), ('fast', 1)]

Results of reduceByKey 

[('hadoop', 2), ('fast', 2), ('hive', 1), ('superfast', 1), ('awesome', 1), ('faster', 1), ('than', 1), ('is', 6), ('sql', 1), ('on', 1), ('hdfs', 1), ('spark', 4), ('very', 1)]


<font color='blue'>**EXERCISE-5: WORD-COUNT WITH EXACT MAPREDUCE STEPS** </font>

If you really wanted to replicate the MapReduce computations in Hadoop, you can do the following
1. Grab the key-value pairs RDD from above
2. Group them by key using ```groupByKey()```
3. Import functions we will use from Python built-in modules.
    - From the ```operator``` module import function ```add```. What does this little func do?
    - From the ```functools``` import ```reduce```. Recall this function from our functional programming lesson.
4. Use the ```mapValues()``` function together with ```reduce``` and ```add``` to perfom reduction on the contents of the lis from each key.

> You can just complete the code in cell below

In [16]:
# import the above mentioned functions here
from functools import reduce
from operator import add


## Take the key_value pairs and group them by key
# create key-value pairs
fm_kv_grp_bykey = None

print()
print('Results of groupByKey \n')
print(fm_kv_grp_bykey.collect())

# We need to use mapValues on each key-value pair to get list of values
# and then use Python reduce and add on that list
fm_kv_grp_bykey_list = None

print()
print('Results of groupByKey \n')
print(fm_kv_grp_bykey_list.collect())


Results of groupByKey 



AttributeError: 'NoneType' object has no attribute 'collect'

<div class="alert alert-info">
    We did all of the above tedious word counting stuff just for learning, otherwise, it is much easier to get those type of stats from an RDD using built-in Spark RDD functions as shoen below.
</div>

In [None]:
# Lets load the text files for word_count
word_count_rdd = sc.textFile(str(WORD_COUNT))

In [None]:
word_count = word_count_rdd.flatMap(lambda line:line.split(" ")).collect()

In [None]:
chunks = [word_count[x:x+3] for x in range(0, len(word_count), 3)]

In [None]:
from nltk import ngrams

In [None]:
sentence = 'this is a foo bar sentences and i want to ngramize it'

n = 3
sixgrams = ngrams(sentence.split(), n)

for grams in sixgrams:
  print (grams)

In [None]:
# We can countByKey after using flatMap() and map()
word_count = word_count_rdd.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).countByKey()

In [None]:
# Use MapReduce style
word_count = word_count_rdd.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x, y: x+y)

### RDD from DataFrame

# Loading external data into RDD

In [None]:
sdf = spark.read.csv(str(HH_DATA), header=True, sep="\t")

## Spark DataFrames
In this course we will mostly use the Spark DataFrame API because its the most convinient for Daata Analysts, ML Engineers,
Data Scientists and in some cases Data Engineer. We will delve deeper into the DataFrame API in other notebooks. In this tutorial, the idea is just to show how it differs from RDD and SQL API's.

Just like RDD's, there are several ways to create a Spark DataFrame but here, we will load a DataFrame from an external file. With DataFrame, answering the analysis questions below is straightfoward:
1. What is the mean age in the country?
2. Which province has the largest population
3. Whats the mean household size in the country?

In working with Spark DataFrames, you can seamlessly work with other Python objects and Python packages such as pandas. Everytime a large dataframe has been reduced to a small size, you can ```collect``` it as pandas DataFrame and work with it on the driver program as a non-distributed dataset.

In [None]:
sdf_hh = spark.read.csv(str(HH_DATA), header=True, sep='\t')

In [None]:
sdf_hh.show()

### Compute national mean age from scratch from Spark DataFrame
We simply call the function ```avg``` on the column of interest.

In [None]:
from pyspark.sql.functions import avg, col, udf
sdf_hh.select(avg(col('P08'))).show()

### Which province has the largest population?

In [None]:
# add prov_id to the dataframe
sdf_hh2 = sdf_hh.withColumn('prov_id', udf(lambda x: x[0])('hh_id'))
pdf_prov_mean_age = sdf_hh2.groupby('prov_id').count().toPandas()
largest_pop_prov = pdf_prov_mean_age.sort_values(by='count', ascending=False).iloc[0]['prov_id']
largest_pop = pdf_prov_mean_age.sort_values(by='count', ascending=False).iloc[0]['count']

print('Province with ID: {} has the largest population of {:,} people'.format(largest_pop_prov, largest_pop))

### Whats the mean household size in the country? Which province has the highest mean hh_size?
A household is provided by the column ```hh_id```. With this column, we can group the persons into households and calculate household size for each household.

In [None]:
# pdf_hhs = sdf_hh2.groupby('hh_id').count().toPandas()
# pdf_hhs['prov_id'] = pdf_hhs.hh_id.apply(lambda x: x[0])
pdf_hhs.rename(columns={'count': 'hh_size'}, inplace=True)
pdf_hhs_prov_stats = pdf_hhs.groupby('prov_id').agg({'hh_size':'mean'}).reset_index()

national_avg_hh_size = pdf_hhs.hh_size.mean()
prov_with_largest_hh_size = pdf_hhs_prov_stats.sort_values(by='hh_size', ascending=False).iloc[0]['prov_id']
largest_hh_size = pdf_hhs_prov_stats.sort_values(by='hh_size', ascending=False).iloc[0]['hh_size']
print('Province with ID: {} has the largest average HH-SIZE of {:.2f} compared to national average of {:.2f}'.format(prov_with_largest_hh_size,
                                                                                                                     largest_hh_size,
                                                                                                                    national_avg_hh_size))

<div class="alert alert-info">
    Clearly, working with Spark DataFrames is much much easier when compared to working with RDD's for common data science tasks.
However, the keyword here is common: in almost all of the large scale *Big Data* projects where I used Spark,
I encountered scenarios where I had no choice but to use RDD because the functionality I was lookig for wasnt working well with DataFrame. So, thats the main reason we still bother about RDD's.
    </div>

<font color='blue'>**EXERCISE-6: COMPUTE THE FOLLOWING USING THE SPARK RDD** </font>

Instead of using DataFrame which is straightfoward, please use RDD to compute the following from the ```hh_data.txt``` data.
1. **Largest population**. Which province has the largest population and whats the population?
2. **Household size**. Whats the average household size? The province with largest average household size and the corresponding average household size.

## Pandas API on Spark
Recently, Spark introduced a new API in allowing users to run code directly as they do in Pandas.
Please take some time to explore this API using the documentation [here](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html). Although this makes life very easy since onc edoesnt have to learn Spark APIs function and commands, such simplicty often comes at a cost such as failure to fully control how the DataFrames are partitioned, loss of speed because Spark is iincorporating with Pandas and more.

In [None]:
import pyspark.pandas as ps

In [None]:
spark_pandas_df = ps.read_csv(str(HH_DATA), sep='\t')

## Spark SQL
Spark SQL is a foundational component of Apache Spark that integrates relational processing with Spark’s functional programming API.Spark SQL lets Spark programmers leverage the benefits of faster performance and relational programming (e.g., declarative queries and optimized storage), as well as call complex analytics libraries (e.g., machine learning).

### Run SQL queries programmatically on DataFrames
The sql function on a SparkSession object enables applications to run SQL queries programmatically and returns the result as a DataFrame.

In SQL, ```views``` are kind of virtual tables. A view also has rows and columns as they are in a real table in the database. We can create a view by selecting fields from one or more tables present in the database. In Spark, we need to create a view from the DataFrame before we can run SQL commands.

# The SQL API: interact with a CSV file read as DataFrame with SQL commands
As mentioned, Spark allows you to read (ee.g., a CSV file) in data as DataFrame but you can interact with it using good old SQL commands. The following steps are required in order to
1. **Create a DataFrame as required:**. In our case, we will read from external source.
2. **Create a table  view:**. Views are a special version of tables in SQL. They provide a virtual table environment for various complex operations. You can select data from multiple tables, or you can select specific data based on certain criteria in views. It does not hold the actual data; it holds only the definition of the view in the data dictionary (you will learn more about this in the Database course).

Once you have a temporary view, you can issue SQL queries using Spark SQL. These queries are no different from those you might issue against a SQL table in, say, a MySQL or PostgreSQL database.

In [17]:
spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [18]:
sdf = spark.read.csv(str(HH_DATA), header=True, sep="\t")

In [20]:
sdf.head(10)

[Row(_c0='0', urban_rural='1', hh_id='11101101010011066020020002', P03='0', P05='1', P07M='10', P07A='1954', P08='63', P21='1', P22N='3', P23='6', P28='2', P29='24'),
 Row(_c0='1', urban_rural='1', hh_id='11101101010011066020020002', P03='1', P05='2', P07M='8', P07A='1950', P08='67', P21='1', P22N='3', P23='6', P28='2', P29='28'),
 Row(_c0='2', urban_rural='1', hh_id='11101101010011066020020002', P03='2', P05='1', P07M='3', P07A='1980', P08='38', P21='1', P22N='5', P23='1', P28='1', P29=' '),
 Row(_c0='3', urban_rural='1', hh_id='11101101010011066020020002', P03='2', P05='1', P07M='9', P07A='1984', P08='33', P21='1', P22N='5', P23='2', P28='1', P29=' '),
 Row(_c0='4', urban_rural='1', hh_id='11101101010011066020240024', P03='0', P05='1', P07M='12', P07A='1984', P08='33', P21='1', P22N='5', P23='1', P28='2', P29='25'),
 Row(_c0='5', urban_rural='1', hh_id='11101101010011066020240024', P03='1', P05='2', P07M='10', P07A='1988', P08='29', P21='1', P22N='5', P23='1', P28='2', P29='21'),
 Ro

In [21]:
# Register the DataFrame as a SQL temporary view
sdf.createOrReplaceTempView("pop")

sql_df = spark.sql("SELECT urban_rural, hh_id FROM pop")
sql_df.show()

+-----------+--------------------+
|urban_rural|               hh_id|
+-----------+--------------------+
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
|          1|11101101010011066...|
+-----------+--------------------+
only showing top 20 rows



In [22]:
type(sql_df)

### Largest population.
Which province has the largest population and whats the population?

<div class="alert alert-warning">
  Since I'm not an SQL expert, I will add the <strong>prov_id</strong> column using DataFrame API. Otherwise, you can do all this with SQL.
</div>

In [23]:
# Add prov_id column using DataFrame API
sql_df2 = sql_df.withColumn('prov_id', udf(lambda x: x[0])('hh_id'))

#### DataFrame methods() which look like SQL commands

In [24]:
# SQL function queries in DataFrames
# Find the aggregate count for California by filtering
sql_df3 = (sql_df2
     .groupBy("prov_id")
     .agg(count("prov_id").alias("pop"))
     .orderBy("pop", ascending=False)
     .first())

#### Run actual SQL commands

In [25]:
# Create and register another view based on sql_df2
sql_df2.createOrReplaceTempView("pop")

# Define SQL statemente to aggregate and get total population by province
sql_statement = """
SELECT COUNT(prov_id) As pop, prov_id
FROM pop
GROUP BY prov_id
"""

# Run the SQL command
sql_df3 = spark.sql(sql_statement)
sql_df3.show()

# We can also use SQL to select largest
sql_df3.createOrReplaceTempView("pop_by_prov")

largest = spark.sql("SELECT MAX(pop) from pop_by_prov")
largest_pop = largest.collect()[0]['max(pop)']

+-------+-------+
|    pop|prov_id|
+-------+-------+
|7273126|      1|
|5170076|      2|
|3878492|      3|
|3139125|      4|
|4199643|      5|
|2013734|      6|
+-------+-------+



In [26]:
sql_statement2 = """
SELECT prov_id
FROM pop_by_prov
WHERE pop_by_prov.pop = {}
""".format(largest_pop)

spark.sql(sql_statement2).show()

+-------+
|prov_id|
+-------+
|      1|
+-------+



<font color='blue'>**EXERCISE-7: ANSWER THE REST OF THE ANALYSIS QUESTIONS USING SQL** </font>

As a challenge, you can use SQL to run aggregations to get answers to the rest of the analysis questions. Otherwise, SQL is not the focus of this course.

## Conclusion
Congratulations, you learned the basics and challenges of working with Spark RDD's in this notebook. By now, you can write simple programs uitlizing RDDs different map and reduce functions. Although you will not often work with RDDs, this understanding is crucial as RDDs, DataFrames work together and its important to know how to switch betweeen these different dataa structures depending on the use case and need. When using Spark on data science projects, you will utilize Pandas, Numpy, Spark DataFrames, Spark RDD and other Python data structures in a seamlless fashion.