# Spark Operations using Spark DataFrames and Spark SQL

### In this activity we will understand
-  What are DataFrames in Spark ?
-  Different ways to create a DataFrames
-  What are Spark Transformations & Actions
-  Verify Summary Statistics
-  Spark SQL
-  Column References
-  Converting to Spark Types - Literals
-  Add/Rename/Remove Columns
-  TypeCasting
-  Column differences
-  Pair-wise frequencies
-  Remove duplicates
-  Working with Nulls
-  Filtering the rows
-  Aggregations
-  Joins
-  Random Samples
-  Random Splits
-  Map Transformations
-  Sorting
-  Union
-  String Manipulations
-  Regular Expressions
-  Working with Dates and Time Stamp
-  User Defined Functions 
-  Broadcase variables and Accumulators
-  Handling Different Data Sources


## Data Representation
- **Pandas** - DataFrames represented on a single machine as Python data structures
- **RDDs** - Spark’s foundational structure Resilient Distributed Dataset is represented as a reference to partitioned data without types
- **DataFrames** - Spark’s optimized distributed collection of rows

##  Spark DataFrame 

#### A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. 
<br> The list that defines the columns and the types within those columns is called the schema. 
<br> One can think of a DataFrame as a spreadsheet with named columns.
<br> A spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers.
<br> The reason for putting the data on more than one computer should be intuitive: 
<br>     either the data is too large to fit on one machine or 
<br>     it would simply take too long to perform that computation on one machine.

#### NOTE
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). 
<br> These different abstractions all represent distributed collections of data. 
<br> The easiest and most efficient are DataFrames, which are available in all languages.


In [None]:
cd /content/drive/MyDrive/Big_data/

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf /content/spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/MyDrive/Big_data/spark-2.4.8-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

#### Create a dataframe with one column containing 100 rows with values from 0 to 99.
This range of numbers represents a distributed collection. 
<br> When run on a cluster, each part of this range of numbers exists on a different executor. 
<br> This is a Spark DataFrame.

## DataFrame Transformations & Actions

### Transformations
In Spark, the core data structures are immutable, meaning they cannot be changed after they’re created.
<br> To “change” a DataFrame, you need to instruct Spark how you would like to modify it to do what you want.
<br> These instructions are called transformations.
<br> Transformations are the core of how you express your business logic using Spark.
<br> Transformations are simply ways of specifying different series of data manipulation.


Notice that these return no output. <br>This is because we specified only an abstract transformation, and Spark will not act on transformations until we call an action.

### Actions
Transformations allow us to build up our logical transformation plan. 
<br> To trigger the computation, we run an action.
<br> An action instructs Spark to compute a result from a series of transformations. 
<br> The simplest action is count, which gives us the total number of records in the DataFrame:

#### There are 3 types of actions
Actions to view data in the console
<br>Actions to collect data to native objects in the respective language
<br>Actions to write to output data sources

### Interoperating with RDDs

<br> Spark SQL supports two different methods for converting existing RDDs into DataFrames. 
<br> The first method uses reflection to infer the schema of an RDD that contains specific types of objects. 
<br> This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

<br> The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. 
<br> While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

In [None]:
sc = spark.sparkContext
sc

### Inferring the Schema Using Reflection

#### Programmatically Specifying the Schema
- Create an RDD of tuples or lists from the original RDD;
- Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
- Apply the schema to the RDD via createDataFrame method provided by SparkSession.

In [None]:
testRDD = sc.textFile("/content/drive/MyDrive/Big_data/data/test.csv")
print("Total Records with header: ", testRDD.count())
print("\nFirst Two Records Before Removing Header\n")
print(testRDD.take(2))

In [None]:
header = testRDD.first()
testRDD = testRDD.filter(lambda line: line != header)
print("Total Records without header: ", testRDD.count())
print("\nFirst Two Records After Removing Header\n")
print(testRDD.take(2))

In [None]:
# Split the data into individual columns
splitRDD = testRDD.map(lambda line: line.split(","))
print("\nFirst Two Records After Split/Parsing\n")
print(splitRDD.take(2))

#### Create a dataframe for the above Data
1. Define Schema
2. Create dataframe using the above schema

#### Create Schema

In [None]:
from pyspark.sql.types import *

testSchema = StructType([
    StructField("User_ID", StringType(), True),
    StructField("Product_ID", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Occupation", StringType(), True),
    StructField("City_Category", StringType(), True),
    StructField("Stay_In_Current_City_Years", StringType(), True),
    StructField("Marital_Status", StringType(), True),
    StructField("Product_Category_1", StringType(), True),
    StructField("Product_Category_2", StringType(), True),
    StructField("Product_Category_3", StringType(), True)
])

Reading a CSV file into a DataFrame and converting it to a local array or list of rows.

In [None]:
trainDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/Big_data/data/train.csv")

#### Verify Schema

In [None]:
print('Total records count in train dataset is {}'.format(trainDF.count()))

#### Summary statistics

### Spark SQL
With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. 
<br>There is no performance difference between writing SQL queries or writing DataFrame code, <br>they both “compile” to the same underlying plan that we specify in DataFrame code.

In [None]:
## Create view/table
trainDF.createOrReplaceTempView("trainDFTable")