# Spark Operations using Spark DataFrames and Spark SQL

### In this activity we will understand
-  What are DataFrames in Spark ?
-  Different ways to create a DataFrames
-  What are Spark Transformations & Actions
-  Verify Summary Statistics
-  Spark SQL
-  Column References
-  Converting to Spark Types - Literals
-  Add/Rename/Remove Columns
-  TypeCasting
-  Column differences
-  Pair-wise frequencies
-  Remove duplicates
-  Working with Nulls
-  Filtering the rows
-  Aggregations
-  Joins
-  Random Samples
-  Random Splits
-  Map Transformations
-  Sorting
-  Union
-  String Manipulations
-  Regular Expressions
-  Working with Dates and Time Stamp
-  User Defined Functions 
-  Broadcase variables and Accumulators
-  Handling Different Data Sources


## Data Representation
- **Pandas** - DataFrames represented on a single machine as Python data structures
- **RDDs** - Spark’s foundational structure Resilient Distributed Dataset is represented as a reference to partitioned data without types
- **DataFrames** - Spark’s optimized distributed collection of rows

##  Spark DataFrame 

#### A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. 
<br> The list that defines the columns and the types within those columns is called the schema. 
<br> One can think of a DataFrame as a spreadsheet with named columns.
<br> A spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers.
<br> The reason for putting the data on more than one computer should be intuitive: 
<br>     either the data is too large to fit on one machine or 
<br>     it would simply take too long to perform that computation on one machine.

#### NOTE
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). 
<br> These different abstractions all represent distributed collections of data. 
<br> The easiest and most efficient are DataFrames, which are available in all languages.

![Spark DataFrame](../Images/SparkDataFrame.png)

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf /content/spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

In [4]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

#### Create a dataframe with one column containing 100 rows with values from 0 to 99.
This range of numbers represents a distributed collection. 
<br> When run on a cluster, each part of this range of numbers exists on a different executor. 
<br> This is a Spark DataFrame.

In [None]:
myRange = spark.range(100)#.toDF('number')

In [None]:
myRange.rdd.getNumPartitions()

2

In [None]:
myRange.show(3)

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+
only showing top 3 rows



In [None]:
type(myRange)

pyspark.sql.dataframe.DataFrame

In [5]:
myDF = spark.createDataFrame([[1, 'Alice', 30],
                              [2, 'Bob', 28],
                              [3, 'Cathy', 31], 
                              [4, 'Dave', 56]], ['Id', 'Name', 'Age'])

myDF.show()

+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|  1|Alice| 30|
|  2|  Bob| 28|
|  3|Cathy| 31|
|  4| Dave| 56|
+---+-----+---+



In [None]:
myDF.dtypes

[('Id', 'bigint'), ('Name', 'string'), ('Age', 'bigint')]

## DataFrame Transformations & Actions

### Transformations
In Spark, the core data structures are immutable, meaning they cannot be changed after they’re created.
<br> To “change” a DataFrame, you need to instruct Spark how you would like to modify it to do what you want.
<br> These instructions are called transformations.
<br> Transformations are the core of how you express your business logic using Spark.
<br> Transformations are simply ways of specifying different series of data manipulation.

![Spark Transformations](../Images/Spark_Transformations.png)

In [None]:
myRange.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



In [None]:
divisBy2 = myRange.where("id % 2 = 0")
divisBy2

DataFrame[id: bigint]

Notice that these return no output. <br>This is because we specified only an abstract transformation, and Spark will not act on transformations until we call an action.

### Actions
Transformations allow us to build up our logical transformation plan. 
<br> To trigger the computation, we run an action.
<br> An action instructs Spark to compute a result from a series of transformations. 
<br> The simplest action is count, which gives us the total number of records in the DataFrame:

#### There are 3 types of actions
Actions to view data in the console
<br>Actions to collect data to native objects in the respective language
<br>Actions to write to output data sources

In [None]:
divisBy2.count()

50

In [None]:
divisBy2.show(3)

+---+
| id|
+---+
|  0|
|  2|
|  4|
+---+
only showing top 3 rows



### Interoperating with RDDs

<br> Spark SQL supports two different methods for converting existing RDDs into DataFrames. 
<br> The first method uses reflection to infer the schema of an RDD that contains specific types of objects. 
<br> This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

<br> The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. 
<br> While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

In [None]:
sc = spark.sparkContext

#### Inferring the Schema Using Reflection

In [None]:
# Create an RDD from a source
tempRDD = sc.textFile("/temp_data.txt") 

In [None]:
tempRDD.getNumPartitions()

2

In [None]:
splitRDD = tempRDD.map(lambda line: line.split("\t"))
splitRDD.take(3)

[['1901', '-78', '1'], ['1901', '-72', '1'], ['1901', '-94', '1']]

In [None]:
from pyspark.sql import Row

In [None]:
schemafiedRDD = splitRDD.map(lambda line: Row(year=line[0], temp=line[1], 
                                              status=line[2]))

In [None]:
schemafiedRDD.toDF().show()

+------+----+----+
|status|temp|year|
+------+----+----+
|     1| -78|1901|
|     1| -72|1901|
|     1| -94|1901|
|     1| -61|1901|
|     1| -56|1901|
|     1| -28|1901|
|     1| -67|1901|
|     1| -33|1901|
|     1| -28|1901|
|     1| -33|1901|
|     1| -44|1901|
|     1| -39|1901|
|     1|   0|1901|
|     1|   6|1901|
|     1|   0|1901|
|     1|   6|1901|
|     1|   6|1901|
|     1| -11|1901|
|     1| -33|1901|
|     1| -50|1901|
+------+----+----+
only showing top 20 rows



In [None]:
# Infer the schema, and register the DataFrame as a table.
tempDF = spark.createDataFrame(schemafiedRDD)
tempDF.show(3)

+------+----+----+
|status|temp|year|
+------+----+----+
|     1| -78|1901|
|     1| -72|1901|
|     1| -94|1901|
+------+----+----+
only showing top 3 rows



In [None]:
tempDF.printSchema()

root
 |-- status: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- year: string (nullable = true)



#### Programmatically Specifying the Schema
- Create an RDD of tuples or lists from the original RDD;
- Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
- Apply the schema to the RDD via createDataFrame method provided by SparkSession.

In [None]:
testRDD = sc.textFile("/test.csv")
print("Total Records with header: ", testRDD.count())
print("\nFirst Two Records Before Removing Header\n")
print(testRDD.take(2))

Total Records with header:  20

First Two Records Before Removing Header

['User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3', '1000004,P00128942,M,46-50,7,B,2,1,1,11,']


In [None]:
header = testRDD.first()
testRDD = testRDD.filter(lambda line: line != header)
print("Total Records without header: ", testRDD.count())
print("\nFirst Two Records After Removing Header\n")
print(testRDD.take(2))

Total Records without header:  19

First Two Records After Removing Header

['1000004,P00128942,M,46-50,7,B,2,1,1,11,', '1000009,P00113442,M,26-35,17,C,0,0,3,5,']


In [None]:
# Split the data into individual columns
splitRDD = testRDD.map(lambda line: line.split(","))
print("\nFirst Two Records After Split/Parsing\n")
print(splitRDD.take(2))


First Two Records After Split/Parsing

[['1000004', 'P00128942', 'M', '46-50', '7', 'B', '2', '1', '1', '11', ''], ['1000009', 'P00113442', 'M', '26-35', '17', 'C', '0', '0', '3', '5', '']]


#### Create a dataframe for the above Data
1. Define Schema
2. Create dataframe using the above schema

#### Create Schema

In [None]:
from pyspark.sql.types import *

testSchema = StructType([
    StructField("User_ID", StringType(), True),
    StructField("Product_ID", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Occupation", StringType(), True),
    StructField("City_Category", StringType(), True),
    StructField("Stay_In_Current_City_Years", StringType(), True),
    StructField("Marital_Status", StringType(), True),
    StructField("Product_Category_1", StringType(), True),
    StructField("Product_Category_2", StringType(), True),
    StructField("Product_Category_3", StringType(), True)
])

#### Create DataFrame using the above schema

In [None]:
testDF = spark.createDataFrame(data = splitRDD, schema=testSchema)

In [None]:
testDF.take(2)

[Row(User_ID='1000004', Product_ID='P00128942', Gender='M', Age='46-50', Occupation='7', City_Category='B', Stay_In_Current_City_Years='2', Marital_Status='1', Product_Category_1='1', Product_Category_2='11', Product_Category_3=''),
 Row(User_ID='1000009', Product_ID='P00113442', Gender='M', Age='26-35', Occupation='17', City_Category='C', Stay_In_Current_City_Years='0', Marital_Status='0', Product_Category_1='3', Product_Category_2='5', Product_Category_3='')]

In [None]:
testDF.show(4)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000004| P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|                  |
|1000009| P00113442|     M|26-35|        17|            C|                         0|             0|                 3|                 5|                  |
|1000010| P00288442|     F|36-45|         1|            B|                        4+|             1|                 5|                14|                  |
|1000010| P00145342|     F|36-45|         1|        

In [None]:
testRDD.count()

19

In [None]:
testRDD.take(2)

['1000004,P00128942,M,46-50,7,B,2,1,1,11,',
 '1000009,P00113442,M,26-35,17,C,0,0,3,5,']

In [None]:
# Take 10% Sample
testSample = testDF.sample(False, 0.1, 1234)
testSample.count()

0

In [None]:
testSamplePD = testSample.toPandas()

In [None]:
type(testSamplePD)

pandas.core.frame.DataFrame

Reading a CSV file into a DataFrame and converting it to a local array or list of rows.


![Reading CSV](../Images/csvDataFrame.png)

In [None]:
trainDF = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("/train.csv")

#### Verify Schema

In [None]:
## Print Schema
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [None]:
## Above results are comprised of row like format. 
## To see the result in more interactive manner (rows under the columns), Use the show operation. 
## Show operation on train and take first 5 rows of it. 
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [None]:
## To Count the number of rows in DataFrame
print('Total records count in train dataset is {}'.format(trainDF.count()))
print('Total records count in test dataset is {}'.format(testDF.count()))

Total records count in train dataset is 12
Total records count in test dataset is 19


In [None]:
## Columns count and column names
print("Total Columns count in train dataset is {}".format(len(trainDF.columns)))
print("\n\nColumns in train dataset are: {} \n".format(trainDF.columns))

print("Total Columns count in test dataset is {}".format(len(testDF.columns)))
print("\n\nColumns in test dataset are: {} \n".format(testDF.columns))

Total Columns count in train dataset is 12


Columns in train dataset are: ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Purchase'] 

Total Columns count in test dataset is 11


Columns in test dataset are: ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3'] 



#### Summary statistics

In [None]:
## To get the summary statistics (mean, standard deviance, min ,max , count) of numerical columns in a DataFrame
trainDF.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [None]:
## Check what happens when we specify the name of a categorical / String columns in describe operation.
## describe operation is working for String type column but the output for mean, stddev are null and 
## min & max values are calculated based on ASCII value of categories.
trainDF.describe('Purchase').show()

+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+



### Spark SQL
With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. 
<br>There is no performance difference between writing SQL queries or writing DataFrame code, <br>they both “compile” to the same underlying plan that we specify in DataFrame code.

In [None]:
## Create view/table
trainDF.createOrReplaceTempView("trainDFTable")

In [None]:
## Verify Dataframe
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [None]:
## Verify Table
spark.sql("SELECT * FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [None]:
from pyspark.sql.functions import expr, col, column
dfWay = trainDF.filter(col('Age') != '0-17').groupBy('Age').count()
dfWay.show()

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+



In [None]:
spark.sql("Select Age,count(*) from trainDFTable where Age!='0-17' group by Age ").show()

+-----+--------+
|  Age|count(1)|
+-----+--------+
|18-25|   99660|
|26-35|  219587|
|46-50|   45701|
|51-55|   38501|
|36-45|  110013|
|  55+|   21504|
+-----+--------+



#### Column References

#### Select & SelectExpr

In [None]:
## Multiple ways of referring a column in a dataframe
from pyspark.sql.functions import expr, col, column

trainDF.select(expr("User_ID AS userID") , 
               col("User_ID"), 
               column("User_ID"), "User_ID").show(2)

+-------+-------+-------+-------+
| userID|User_ID|User_ID|User_ID|
+-------+-------+-------+-------+
|1000001|1000001|1000001|1000001|
|1000001|1000001|1000001|1000001|
+-------+-------+-------+-------+
only showing top 2 rows



In [None]:
trainDF.select(col("User_ID"), "User_ID")

DataFrame[User_ID: int, User_ID: int]

#### Pandas dot notation doesn't work here 

In [None]:
result = trainDF.User_ID

In [None]:
result.show(3)

TypeError: ignored

This will save/assign a column name to the newly created variable

In [None]:
# select content from the above column
trainDF.select(result).show(2)

+-------+
|User_ID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [None]:
trainDF.select(expr("User_ID AS userID")).show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [None]:
spark.sql("SELECT User_ID AS userID FROM trainDFTable").show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [None]:
trainDF.selectExpr("User_ID AS userID", "Product_ID AS productID").show(2)

+-------+---------+
| userID|productID|
+-------+---------+
|1000001|P00069042|
|1000001|P00248942|
+-------+---------+
only showing top 2 rows



In [None]:
trainDF.select("User_ID", "Product_ID", "Age").show(2)

+-------+----------+----+
|User_ID|Product_ID| Age|
+-------+----------+----+
|1000001| P00069042|0-17|
|1000001| P00248942|0-17|
+-------+----------+----+
only showing top 2 rows



#### Converting to Spark Types (Literals)
Sometimes we need to pass explicit values into Spark that aren’t a new column but are just a value in all the rows. This might be a constant value or something we’ll need to compare to later on. The way we do this is through literals. 
This is basically a translation from a given programming language’s literal value to one that Spark understands. 
Literals are expressions and can be used in the same way.

In [None]:
trainDF.show(1)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 1 row



In [None]:
from pyspark.sql.functions import lit
trainDF=trainDF.select("*", lit(1).alias('One'))

In [None]:
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- One: integer (nullable = false)



In [None]:
## In SQL, literals are just the specific value.
spark.sql("SELECT *, 10 as Two FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Two|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370| 10|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200| 10|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

In [None]:
trainDF.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('One', 'int')]

#### Adding Columns

In [None]:
## More Formal way
from pyspark.sql.functions import lit
trainDF.withColumn("One", lit(1)).show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

In [None]:
trainDF.withColumn("Str", lit("hi"))

DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int, One: int, Str: string]

In [None]:
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- One: integer (nullable = false)



In [None]:
trainDF  = trainDF.withColumn("One", lit(1))

In [None]:
trainDF.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('One', 'int')]

In [None]:
spark.sql("SELECT *, 1 AS One FROM trainDFTable LIMIT 2").show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

In [None]:
tempDF = trainDF.withColumn("SameCategoryCode", 
trainDF["Product_Category_1"] == trainDF["Product_Category_2"])
tempDF.show(20)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+----------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|SameCategoryCode|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+----------------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|            null|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|           false|
|1000001| P00087842|     F| 0-17|        10|            A|  

#### Renaming Columns

In [None]:
tempDF.withColumnRenamed("SameCategoryCode", "SimilarCategory").show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+---------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|SimilarCategory|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+---------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|           null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|          false|
|1000001| P00087842|     F|0-17|        10|            A|             

In [None]:
tempDF.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('One', 'int'),
 ('SameCategoryCode', 'boolean')]

In [None]:
uq_val=tempDF.select('SameCategoryCode').distinct()

In [None]:
uq_val.show()

+----------------+
|SameCategoryCode|
+----------------+
|            null|
|           false|
+----------------+



#### Removing Columns

In [None]:
tempDF.drop("SameCategoryCode").show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

#### Changing a Column’s Type (cast)

In [None]:
tempDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- One: integer (nullable = false)
 |-- SameCategoryCode: boolean (nullable = true)



In [None]:
tempDF=tempDF.drop("SameCategoryCode")

In [None]:
from pyspark.sql.functions import expr, col, column
tempDF.withColumn("Purchase", col("Purchase").cast("string")).printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: string (nullable = true)
 |-- One: integer (nullable = false)



In [None]:
tempDF = tempDF.withColumn("Purchase", col("Purchase").cast("string"))

In [None]:
tempDF.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'string'),
 ('One', 'int')]

#### Distinct Values

In [None]:
## To find the number of distinct product in train and test datasets
## To calculate the number of distinct products in train and test datasets apply distinct operation.
print("Distinct values in Product_ID's in train dataset are {}".format(trainDF.select('Product_ID').distinct().count()))
print("Distinct values in Product_ID's in test dataset are {}".format(testDF.select('Product_ID').distinct().count()))

Distinct values in Product_ID's in train dataset are 3631
Distinct values in Product_ID's in test dataset are 3491


#### Differences in two columns

In [None]:
## From the above we can see the train file has more categories than test file. 
## Let us check what are the categories for Product_ID, which are in test file but not in train file by 
## applying subtract operation.
## We can do the same for all categorical features.
diff_cat_in_test_train=testDF.select('Product_ID').subtract(trainDF.select('Product_ID'))
print("Count of Product_ID's there in test dataset but not train dataset are {}".
      format(diff_cat_in_test_train.count()))

diff_cat_in_train_test=trainDF.select('Product_ID').subtract(testDF.select('Product_ID'))
print("Count of Product_ID's there in train dataset but not test dataset are {}".format(diff_cat_in_train_test.count()))

Count of Product_ID's there in test dataset but not train dataset are 46
Count of Product_ID's there in train dataset but not test dataset are 186


In [None]:
diff_cat_in_test_train.show(3)

+----------+
|Product_ID|
+----------+
| P00322642|
| P00300142|
| P00077642|
+----------+
only showing top 3 rows



#### Pair wise Frequencies - Crosstab

In [None]:
## To calculate pair wise frequency of categorical columns
## Use crosstab operation on DataFrame to calculate the pair wise frequency of columns. 
## Apply crosstab operation on ‘Age’ and ‘Gender’ columns of train DataFrame.
trainDF.crosstab('Age', 'Gender').show()

+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+



In [None]:
trainDF.groupBy('Age', 'Gender').count().show()

+-----+------+------+
|  Age|Gender| count|
+-----+------+------+
|51-55|     F|  9894|
|18-25|     M| 75032|
| 0-17|     F|  5083|
|46-50|     M| 32502|
|18-25|     F| 24628|
|  55+|     M| 16421|
|  55+|     F|  5083|
|36-45|     M| 82843|
|26-35|     F| 50752|
| 0-17|     M| 10019|
|36-45|     F| 27170|
|51-55|     M| 28607|
|26-35|     M|168835|
|46-50|     F| 13199|
+-----+------+------+



In [None]:
spark.sql("""select Age,
    sum(case when Gender = 'F' then 1 else 0 end) F,
    sum(case when Gender = 'M' then 1 else 0 end) M
from trainDFTable
group by Age""").show()

# spark.sql("""select Age,
#     count(*) total,
#     sum(case when Gender = 'F' then 1 else 0 end) F,
#     sum(case when Gender = 'M' then 1 else 0 end) M
# from trainDFTable
# group by Age""").show()

+-----+-----+------+
|  Age|    F|     M|
+-----+-----+------+
|18-25|24628| 75032|
|26-35|50752|168835|
| 0-17| 5083| 10019|
|46-50|13199| 32502|
|51-55| 9894| 28607|
|36-45|27170| 82843|
|  55+| 5083| 16421|
+-----+-----+------+



#### Removing Duplicates

In [None]:
##To get the DataFrame without any duplicate rows of given a DataFrame
##Use dropDuplicates operation to drop the duplicate rows of a DataFrame. 
## In this command, performing this on two columns Age and Gender of train dataset and 
## Get the all unique rows for these two columns.
trainDF.select('Age','Gender').dropDuplicates().show()

+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|18-25|     M|
| 0-17|     F|
|46-50|     M|
|18-25|     F|
|  55+|     M|
|  55+|     F|
|36-45|     M|
|26-35|     F|
| 0-17|     M|
|36-45|     F|
|51-55|     M|
|26-35|     M|
|46-50|     F|
+-----+------+



#### Working with Nulls in Data

In [None]:
## To drop the all rows with null value?
## Use dropna operation. 
## To drop row from the DataFrame it consider three options.
## how – ‘any’ or ‘all’. If ‘any’, drop a row if it 
## contains any nulls. If ‘all’, drop a row only if 
## all its values are null.

## thresh – int, default None If specified, drop rows that 
## have less than thresh non-null values. 
## This overwrites the how parameter.

## subset – optional list of column names to consider.

##Drop null rows in train with default parameters and count the rows in output DataFrame. 
## Default options are any, None, None for how, thresh, subset respectively.
print(trainDF.dropna().count())
print(trainDF.na.drop().count())
print(trainDF.na.drop("any").count())

166821
166821
166821


In [None]:
## To replace the null values in DataFrame with constant number
## Use fillna operation. 

##The fillna will take two parameters to fill the null values.
## value:
##     It will take a dictionary to specify which column will replace with which value.
##     A value (int , float, string) for all columns.
##subset: Specify some selected columns.

##Fill ‘-1’ inplace of null values in train DataFrame.
trainDF.fillna(-1).show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                -1|             

In [None]:
## Filling with different values for different columns
fill_cols_vals = {
"Gender": 'M',
"Purchase" : 999999
}
trainDF.na.fill(fill_cols_vals).count()

550068

#### Filtering the rows

In [None]:
## To filter the rows in train dataset which has Purchases more than 15000
## apply the filter operation on Purchase column in train DataFrame 
## to filter out the rows with values more than 15000. 
print("Count of rows where Purchase Amount more than 15000 are {}"
      .format(trainDF.filter(trainDF.Purchase > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}"
      .format(trainDF.filter(col("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(column("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(expr("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(trainDF["Purchase"] > 15000).count()))

Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523


In [None]:
spark.sql("""
SELECT 
COUNT(*) AS Count
FROM trainDFTable
WHERE Purchase > 15000""").show()

+------+
| Count|
+------+
|110523|
+------+



In [None]:
trainDF.where("Purchase > 15000").where("Gender = 'F'").count()

21429

In [None]:
trainDF.filter("Purchase > 15000").where("Gender = 'F'").count()

21429

In [None]:
trainDF.where((col("Purchase") > 15000) & (col("Gender") == 'M')).count()

89094

In [None]:
trainDF.filter((col("Purchase") > 15000) & (col("Gender") == 'M')).count()

89094

In [None]:
spark.sql("SELECT * FROM trainDFTable WHERE Purchase > 15000 AND Gender = 'F'").count()

21429

## Aggregations

#### Count Distinct

In [None]:
from pyspark.sql.functions import countDistinct
trainDF.select(countDistinct("Product_ID")).show()

+--------------------------+
|count(DISTINCT Product_ID)|
+--------------------------+
|                      3631|
+--------------------------+



#### Approximate Count Distinct

In [None]:
from pyspark.sql.functions import approx_count_distinct
trainDF.select(approx_count_distinct("Product_ID", 0.1)).show()

+---------------------------------+
|approx_count_distinct(Product_ID)|
+---------------------------------+
|                             3277|
+---------------------------------+



#### First and Last

In [None]:
from pyspark.sql.functions import first, last
trainDF.select(first("Product_ID"), last("Product_ID")).show()

+------------------------+-----------------------+
|first(Product_ID, false)|last(Product_ID, false)|
+------------------------+-----------------------+
|               P00069042|              P00371644|
+------------------------+-----------------------+



#### Min and Max

In [None]:
from pyspark.sql.functions import min, max
trainDF.select(min("Purchase"), max("Purchase")).show()

+-------------+-------------+
|min(Purchase)|max(Purchase)|
+-------------+-------------+
|           12|        23961|
+-------------+-------------+



#### Sum

In [None]:
from pyspark.sql.functions import sum
trainDF.select(sum("Purchase")).show()

+-------------+
|sum(Purchase)|
+-------------+
|   5095812742|
+-------------+



#### Avg

In [None]:
from pyspark.sql.functions import sum, count, avg, expr

trainDF.select(
    count("Purchase").alias("total_transactions"),
    sum("Purchase").alias("total_purchases"),
    avg("Purchase").alias("avg_purchases"),
    expr("mean(Purchase)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+-----------------+-----------------+
|(total_purchases / total_transactions)|    avg_purchases|   mean_purchases|
+--------------------------------------+-----------------+-----------------+
|                     9263.968712959126|9263.968712959126|9263.968712959126|
+--------------------------------------+-----------------+-----------------+



#### Grouping

In [None]:
trainDF.groupBy("Age", "Gender").count().show()

+-----+------+------+
|  Age|Gender| count|
+-----+------+------+
|51-55|     F|  9894|
|18-25|     M| 75032|
| 0-17|     F|  5083|
|46-50|     M| 32502|
|18-25|     F| 24628|
|  55+|     M| 16421|
|  55+|     F|  5083|
|36-45|     M| 82843|
|26-35|     F| 50752|
| 0-17|     M| 10019|
|36-45|     F| 27170|
|51-55|     M| 28607|
|26-35|     M|168835|
|46-50|     F| 13199|
+-----+------+------+



#### Grouping with Expressions

In [None]:
trainDF.groupBy("Age").agg(
  count("Purchase").alias("quan"),
  expr("count(Purchase)")).show()

+-----+------+---------------+
|  Age|  quan|count(Purchase)|
+-----+------+---------------+
|18-25| 99660|          99660|
|26-35|219587|         219587|
| 0-17| 15102|          15102|
|46-50| 45701|          45701|
|51-55| 38501|          38501|
|36-45|110013|         110013|
|  55+| 21504|          21504|
+-----+------+---------------+



In [None]:
trainDF.groupBy("Age").agg(expr("avg(Purchase)"),expr("stddev_pop(Purchase)")).show()

+-----+-----------------+--------------------+
|  Age|    avg(Purchase)|stddev_pop(Purchase)|
+-----+-----------------+--------------------+
|18-25|9169.663606261289|  5034.2967396277945|
|26-35|9252.690632869888|   5010.515894010154|
| 0-17|8933.464640444974|   5110.944823427661|
|46-50|9208.625697468327|   4967.162022122706|
|51-55|9534.808030960236|   5087.302011173869|
|36-45|9331.350694917874|   5022.901050378538|
|  55+|9336.280459449405|    5011.37746955577|
+-----+-----------------+--------------------+



In [None]:
## To find the mean of each age group in train dataset - Average purchases in each age group
trainDF.groupby('Age').agg({'Purchase': 'mean'}).show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
| 0-17|8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|  55+|9336.280459449405|
+-----+-----------------+



In [None]:
trainDF.groupby('Age').agg({'Purchase': 'sum'}).show()

+-----+-------------+
|  Age|sum(Purchase)|
+-----+-------------+
|18-25|    913848675|
|26-35|   2031770578|
| 0-17|    134913183|
|46-50|    420843403|
|51-55|    367099644|
|36-45|   1026569884|
|  55+|    200767375|
+-----+-------------+



In [None]:
## Apply sum, min, max, count with groupby to get different summary insight for each group. 
exprs = {x: "count" for x in trainDF.columns}
trainDF.groupBy("Age").agg(exprs).show()

+-----+--------------------+----------+-------------------------+---------------------+---------------+--------------+-----------------+---------------------------------+-------------------------+----------+-------------+-------------------------+-----------------+
|  Age|count(City_Category)|count(One)|count(Product_Category_3)|count(Marital_Status)|count(Purchase)|count(User_ID)|count(Occupation)|count(Stay_In_Current_City_Years)|count(Product_Category_1)|count(Age)|count(Gender)|count(Product_Category_2)|count(Product_ID)|
+-----+--------------------+----------+-------------------------+---------------------+---------------+--------------+-----------------+---------------------------------+-------------------------+----------+-------------+-------------------------+-----------------+
|18-25|               99660|     99660|                    31316|                99660|          99660|         99660|            99660|                            99660|                    99660|     9

## Joins

In [None]:
# in Python
person = spark.createDataFrame([
    (0, "Dr. Murthy", 0, [250, 100]),
    (1, "Dr. Sridhar Pappu", 1, [500, 250, 100]),
    (2, "Dr. Manoj", 2, [100])])\
  .toDF("id", "name", "graduate_program", "role_status")
graduateProgram = spark.createDataFrame([
    (0, "Ph.D", "School of Information", "Carnegie Mellon University"),
    (1, "Ph.D", "The University of Texas", "El Paso"),
    (2, "Ph.D.", "School of Information", "Oklahoma State University")])\
  .toDF("id", "degree", "department", "school")
roleStatus = spark.createDataFrame([
    (500, "President"),
    (250, "Founder"),
    (100, "Mentor")])\
  .toDF("id", "status")

In [None]:
person.show()

+---+-----------------+----------------+---------------+
| id|             name|graduate_program|    role_status|
+---+-----------------+----------------+---------------+
|  0|       Dr. Murthy|               0|     [250, 100]|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  2|        Dr. Manoj|               2|          [100]|
+---+-----------------+----------------+---------------+



In [None]:
graduateProgram.show(truncate=False)

+---+------+-----------------------+--------------------------+
|id |degree|department             |school                    |
+---+------+-----------------------+--------------------------+
|0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Ph.D. |School of Information  |Oklahoma State University |
+---+------+-----------------------+--------------------------+



In [None]:
roleStatus.show()

+---+---------+
| id|   status|
+---+---------+
|500|President|
|250|  Founder|
|100|   Mentor|
+---+---------+



In [None]:
person.createOrReplaceTempView("personTbl")
graduateProgram.createOrReplaceTempView("graduateProgramTbl")
roleStatus.createOrReplaceTempView("roleStatusTbl")

#### Inner Joins

In [None]:
joinExpression = person["graduate_program"] == graduateProgram['id']
person.join(graduateProgram, joinExpression).show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



In [None]:
spark.sql("""SELECT * FROM personTbl JOIN graduateProgramTbl
  ON personTbl.graduate_program = graduateProgramTbl.id""").show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



In [None]:
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



In [None]:
spark.sql("""SELECT * FROM personTbl INNER JOIN graduateProgramTbl
  ON personTbl.graduate_program = graduateProgramTbl.id""").show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



In [None]:
test = person.join(graduateProgram, on=(person.graduate_program == graduateProgram.id))

In [None]:
test.show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



#### Outer Joins

In [None]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show(truncate=False)

+---+-----------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name             |graduate_program|role_status    |id |degree|department             |school                    |
+---+-----------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr. Murthy       |0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr. Sridhar Pappu|1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr. Manoj        |2               |[100]          |2  |Ph.D. |School of Information  |Oklahoma State University |
+---+-----------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [None]:
spark.sql("""SELECT * FROM personTbl FULL OUTER JOIN graduateProgramTbl
  ON personTbl.graduate_program = graduateProgramTbl.id""").show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



#### Left Outer Joins

In [None]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()

+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
| id|degree|          department|              school| id|             name|graduate_program|    role_status|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|  0|       Dr. Murthy|               0|     [250, 100]|
|  1|  Ph.D|The University of...|             El Paso|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  2| Ph.D.|School of Informa...|Oklahoma State Un...|  2|        Dr. Manoj|               2|          [100]|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+



In [None]:
spark.sql("""SELECT * FROM personTbl LEFT OUTER JOIN graduateProgramTbl
  ON personTbl.graduate_program = graduateProgramTbl.id""").show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



#### Right Outer Joins

In [None]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



In [None]:
spark.sql("""SELECT * FROM personTbl RIGHT OUTER JOIN graduateProgramTbl
  ON personTbl.graduate_program = graduateProgramTbl.id""").show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  2|        Dr. Manoj|               2|          [100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+



#### Natural Joins
Natural joins make implicit guesses at the columns on which you would like to join. 
It finds matching columns and returns the results. 
Left, right, and outer natural joins are all supported.

WARNING:
Implicit is always dangerous! 
The following query will give us incorrect results because 
the two DataFrames/tables share a column name (id), but it means different things in the datasets. 
You should always use this join with caution.

In [1]:
spark.sql("""SELECT * FROM graduateProgramTbl NATURAL JOIN personTbl""").show()

NameError: ignored

#### Cross (Cartesian) Joins
Cross-joins in simplest terms are inner joins that do not specify a predicate. 
Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. 
This will cause an absolute explosion in the number of rows contained in the resulting DataFrame. 
If you have 1,000 rows in each DataFrame, the cross-join of these will result in 1,000,000 (1,000 x 1,000) rows. 
For this reason, you must very explicitly state that you want a cross-join by using the cross join keyword:

In [None]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
| id|degree|          department|              school| id|             name|graduate_program|    role_status|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|  0|       Dr. Murthy|               0|     [250, 100]|
|  1|  Ph.D|The University of...|             El Paso|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  2| Ph.D.|School of Informa...|Oklahoma State Un...|  2|        Dr. Manoj|               2|          [100]|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+



In [None]:
spark.sql("""SELECT * FROM graduateProgramTbl CROSS JOIN personTbl
  ON graduateProgramTbl.id = personTbl.graduate_program""").show()

+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
| id|degree|          department|              school| id|             name|graduate_program|    role_status|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|  0|       Dr. Murthy|               0|     [250, 100]|
|  1|  Ph.D|The University of...|             El Paso|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  2| Ph.D.|School of Informa...|Oklahoma State Un...|  2|        Dr. Manoj|               2|          [100]|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+



In [None]:
person.crossJoin(graduateProgram).show()

+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
| id|             name|graduate_program|    role_status| id|degree|          department|              school|
+---+-----------------+----------------+---------------+---+------+--------------------+--------------------+
|  0|       Dr. Murthy|               0|     [250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  0|       Dr. Murthy|               0|     [250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  0|       Dr. Murthy|               0|     [250, 100]|  2| Ph.D.|School of Informa...|Oklahoma State Un...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  2|        Dr. Manoj|               2|          [100]|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|  1|  Ph.D|The University of...|             El Paso|
|  1|Dr. S

In [None]:
spark.sql("""SELECT * FROM graduateProgramTbl CROSS JOIN personTbl""").show()

+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
| id|degree|          department|              school| id|             name|graduate_program|    role_status|
+---+------+--------------------+--------------------+---+-----------------+----------------+---------------+
|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|  0|       Dr. Murthy|               0|     [250, 100]|
|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  0|  Ph.D|School of Informa...|Carnegie Mellon U...|  2|        Dr. Manoj|               2|          [100]|
|  1|  Ph.D|The University of...|             El Paso|  0|       Dr. Murthy|               0|     [250, 100]|
|  2| Ph.D.|School of Informa...|Oklahoma State Un...|  0|       Dr. Murthy|               0|     [250, 100]|
|  1|  Ph.D|The University of...|             El Paso|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  1|  Ph.

#### Joins on Complex Types

In [None]:
person.show()

+---+-----------------+----------------+---------------+
| id|             name|graduate_program|    role_status|
+---+-----------------+----------------+---------------+
|  0|       Dr. Murthy|               0|     [250, 100]|
|  1|Dr. Sridhar Pappu|               1|[500, 250, 100]|
|  2|        Dr. Manoj|               2|          [100]|
+---+-----------------+----------------+---------------+



In [None]:
roleStatus.show()

+---+---------+
| id|   status|
+---+---------+
|500|President|
|250|  Founder|
|100|   Mentor|
+---+---------+



In [None]:
from pyspark.sql.functions import expr

person.withColumnRenamed("id", "personId")\
  .join(roleStatus, expr("array_contains(role_status, id)")).show()

+--------+-----------------+----------------+---------------+---+---------+
|personId|             name|graduate_program|    role_status| id|   status|
+--------+-----------------+----------------+---------------+---+---------+
|       0|       Dr. Murthy|               0|     [250, 100]|250|  Founder|
|       0|       Dr. Murthy|               0|     [250, 100]|100|   Mentor|
|       1|Dr. Sridhar Pappu|               1|[500, 250, 100]|500|President|
|       1|Dr. Sridhar Pappu|               1|[500, 250, 100]|250|  Founder|
|       1|Dr. Sridhar Pappu|               1|[500, 250, 100]|100|   Mentor|
|       2|        Dr. Manoj|               2|          [100]|100|   Mentor|
+--------+-----------------+----------------+---------------+---+---------+



In [None]:
spark.sql("""SELECT * FROM
  (select id as personId, name, graduate_program, role_status FROM personTbl)
  INNER JOIN roleStatusTbl ON array_contains(role_status, id)
""").show()

+--------+-----------------+----------------+---------------+---+---------+
|personId|             name|graduate_program|    role_status| id|   status|
+--------+-----------------+----------------+---------------+---+---------+
|       0|       Dr. Murthy|               0|     [250, 100]|250|  Founder|
|       0|       Dr. Murthy|               0|     [250, 100]|100|   Mentor|
|       1|Dr. Sridhar Pappu|               1|[500, 250, 100]|500|President|
|       1|Dr. Sridhar Pappu|               1|[500, 250, 100]|250|  Founder|
|       1|Dr. Sridhar Pappu|               1|[500, 250, 100]|100|   Mentor|
|       2|        Dr. Manoj|               2|          [100]|100|   Mentor|
+--------+-----------------+----------------+---------------+---+---------+



In [None]:
person = spark.createDataFrame([
    (0, "Dr. Murthy", 0, [250, 100]),
    (1, "Dr. Sridhar Pappu", 1, [500, 250, 100]),
    (2, "Dr. Manoj", 2, [100])])\
  .toDF("id", "name", "graduate_program_id", "role_status_code")
graduateProgram = spark.createDataFrame([
    (0, "Ph.D", "School of Information", "Carnegie Mellon University"),
    (1, "Ph.D", "The University of Texas", "El Paso"),
    (2, "Ph.D.", "School of Information", "Oklahoma State University")])\
  .toDF("graduate_program_id", "degree", "department", "school")
roleStatus = spark.createDataFrame([
    (500, "President"),
    (250, "Founder"),
    (100, "Mentor")])\
  .toDF("role_status_code", "status")

In [None]:
person.show()

+---+-----------------+-------------------+----------------+
| id|             name|graduate_program_id|role_status_code|
+---+-----------------+-------------------+----------------+
|  0|       Dr. Murthy|                  0|      [250, 100]|
|  1|Dr. Sridhar Pappu|                  1| [500, 250, 100]|
|  2|        Dr. Manoj|                  2|           [100]|
+---+-----------------+-------------------+----------------+



In [None]:
graduateProgram.show()

+-------------------+------+--------------------+--------------------+
|graduate_program_id|degree|          department|              school|
+-------------------+------+--------------------+--------------------+
|                  0|  Ph.D|School of Informa...|Carnegie Mellon U...|
|                  1|  Ph.D|The University of...|             El Paso|
|                  2| Ph.D.|School of Informa...|Oklahoma State Un...|
+-------------------+------+--------------------+--------------------+



In [None]:
roleStatus.show()

+----------------+---------+
|role_status_code|   status|
+----------------+---------+
|             500|President|
|             250|  Founder|
|             100|   Mentor|
+----------------+---------+



In [None]:
df = person.join(graduateProgram, "graduate_program_id")
df.show()

+-------------------+---+-----------------+----------------+------+--------------------+--------------------+
|graduate_program_id| id|             name|role_status_code|degree|          department|              school|
+-------------------+---+-----------------+----------------+------+--------------------+--------------------+
|                  0|  0|       Dr. Murthy|      [250, 100]|  Ph.D|School of Informa...|Carnegie Mellon U...|
|                  1|  1|Dr. Sridhar Pappu| [500, 250, 100]|  Ph.D|The University of...|             El Paso|
|                  2|  2|        Dr. Manoj|           [100]| Ph.D.|School of Informa...|Oklahoma State Un...|
+-------------------+---+-----------------+----------------+------+--------------------+--------------------+



In [None]:
df = person.join(graduateProgram, ["graduate_program_id"])
df.show()

+-------------------+---+-----------------+----------------+------+--------------------+--------------------+
|graduate_program_id| id|             name|role_status_code|degree|          department|              school|
+-------------------+---+-----------------+----------------+------+--------------------+--------------------+
|                  0|  0|       Dr. Murthy|      [250, 100]|  Ph.D|School of Informa...|Carnegie Mellon U...|
|                  1|  1|Dr. Sridhar Pappu| [500, 250, 100]|  Ph.D|The University of...|             El Paso|
|                  2|  2|        Dr. Manoj|           [100]| Ph.D.|School of Informa...|Oklahoma State Un...|
+-------------------+---+-----------------+----------------+------+--------------------+--------------------+



#### Random Samples

In [None]:
trainDF.count()

550068

In [None]:
## To create a sample DataFrame from the base DataFrame
## Use sample operation to take sample of a DataFrame. 
## The sample method on DataFrame will return a DataFrame containing the sample of base DataFrame. 
## The sample method takes 3 parameters.
## withReplacement = True or False to select a observation with or without replacement.
## fraction = x, where x = .5 shows that we want to have 50% data in sample DataFrame.
## seed to reproduce the result
sampleDF1 = trainDF.sample(False, 0.2, 1234)
sampleDF2 = trainDF.sample(False, 0.2, 4321)
print(sampleDF1.count(), sampleDF2.count())

110354 109752


#### Random Splits

In [None]:
splitDF = trainDF.randomSplit([0.7, 0.3], seed=1234)
print(splitDF[0].count())
print(splitDF[1].count())

385465
164603


In [None]:
splitDF[0].show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00051842|     F|0-17|        10|            A|                         2|             0|                 4|                 8|              null|    2849|
|1000001| P00059442|     F|0-17|        10|            A|                         2|             0|                 6|                 8|                16|   16622|
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
+---

#### Map Transformation

In [None]:
## To apply map operation on DataFrame columns
## Apply a function on each row of DataFrame using map operation. 
## After applying this function, we get the result in the form of RDD. 
## Apply a map operation on User_ID column of train and print the first 5 elements of mapped RDD(x,1) 
## ----- Applying lambda function.

trainDF.select('User_ID').rdd.map(lambda x:(x,1)).take(5)

[(Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000002), 1)]

*__Prior to Spark 2.0, spark_df.map would alias to spark_df.rdd.map(). 
With Spark 2.0, you must explicitly call .rdd first.__*

#### Sorting Rows

In [None]:
## To sort the DataFrame based on column(s)
## Use orderBy operation on DataFrame to get sorted output based on some column. 
## The orderBy operation take two arguments.
## List of columns.
## ascending = True or False for getting the results in ascending or descending order(list in case of more than two columns )
## Sort the train DataFrame based on ‘Purchase’.
trainDF.orderBy(trainDF.Purchase.desc()).show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1003160| P00052842|     M|26-35|        17|            C|                         3|             0|                10|                15|              null|   23961|
|1002272| P00052842|     M|26-35|         0|            C|                         1|             0|                10|                15|              null|   23961|
|1001474| P00052842|     M|26-35|         4|            A|                         2|             1|                10|                15|              null|   23961

#### Repartition and Coalesce
Another important optimization opportunity is to partition the data according to some frequently filtered columns
which controls the physical layout of data across the cluster including the partitioning scheme and the number of
partitions.

Repartition will incur a full shuffle of the data, regardless of whether or not one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of
partitions or when you are looking to partition by a set of columns.

In [None]:
## Find existing partitions count
trainDF.rdd.getNumPartitions()
## Do the repartition
## trainDF.repartition(5)

## Repartition based on a column
## If we know we are going to be filtering by a certain column often, 
## it can be worth repartitioning based on that column.
## trainDF.repartition(col(“Purchase”))

## We can optionally specify the number of partitions we would like too.
## trainDF.repartition(5, col(“Purchase”))

## Coalesce on the other hand will not incur a full shuffle and will try to combine partitions. 
## This operation will shuffle our data into 5 partitions based on the Purchase, 
## then coalesce them (without a full shuffle).
## trainDF.repartition(5, col("Purchase")).coalesce(2)

2

## Miscellaneous

#### Unions

In [None]:
df1 = spark.createDataFrame([[1, 'Alex', 25],[3, 'Carol', 53],[5, 'Emily', 25],[7, 'Gabriel', 32],[9, 'Ilma', 35],[11, 'Kim', 45]], ['id', 'name', 'age'])
df2 = spark.createDataFrame([[2, 'Ben', 66],[4, 'Daniel', 28],[6, 'Frank', 64],[8, 'Harley', 29],[10, 'Jack', 35],[12, 'Litmya', 45]], ['id', 'name', 'age'])
print("Before")
print("DataFrame-1")
print(df1.show())
print("DataFrame-2")
print(df2.show())
print("After")
df1 = df1.union(df2)
print("DataFrame-1")
print(df1.show())

Before
DataFrame-1
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
+---+-------+---+

None
DataFrame-2
+---+------+---+
| id|  name|age|
+---+------+---+
|  2|   Ben| 66|
|  4|Daniel| 28|
|  6| Frank| 64|
|  8|Harley| 29|
| 10|  Jack| 35|
| 12|Litmya| 45|
+---+------+---+

None
After
DataFrame-1
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
|  2|    Ben| 66|
|  4| Daniel| 28|
|  6|  Frank| 64|
|  8| Harley| 29|
| 10|   Jack| 35|
| 12| Litmya| 45|
+---+-------+---+

None


#### Unions and condtional append

In [None]:
df1.union(df2).where("age < 60").show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
|  4| Daniel| 28|
|  8| Harley| 29|
| 10|   Jack| 35|
| 12| Litmya| 45|
|  4| Daniel| 28|
|  8| Harley| 29|
| 10|   Jack| 35|
| 12| Litmya| 45|
+---+-------+---+



In [None]:
trainDF.withColumn('Purchase_new1', trainDF.Purchase /2.0)

DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int, Purchase_new1: double]

In [None]:
trainDF.show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
+---

In [None]:
## To add the new column in DataFrame
## Use withColumn operation to add new column (we can also replace) in base DataFrame and return a new DataFrame. 
## The withColumn operation will take 2 parameters.
## Column name to be added /replaced.
## Expression on column.

## Derive new column, ‘Purchase_new’ in train which is calculated by dviding Purchase column by 2.

trainDF.withColumn('Purchase_new', trainDF.Purchase /2.0).select('Purchase','Purchase_new').show(5)

+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
|    7969|      3984.5|
+--------+------------+
only showing top 5 rows



In [None]:
## To drop a column in DataFrame
## To drop a column from the DataFrame use drop operation. 
## Drop the column called ‘Comb’ from the test and get the remaining columns in test dataframe
testDF.drop('Comb').columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

In [None]:
## To remove some categories of Product_ID column in test that are not present in Product_ID column in train
## Use an user defined function ( udf ) to remove the categories of a column which are in test but not in train.
## Calculate the categories in Product_ID column which are in test but not in train.
diff_cat_in_train_test=testDF.select('Product_ID').subtract(trainDF.select('Product_ID'))
diff_cat_in_train_test.count() # For distict count

46

In [None]:
diff_cat_in_train_test.show(2)

+----------+
|Product_ID|
+----------+
| P00322642|
| P00300142|
+----------+
only showing top 2 rows



In [None]:
## There are 46 different categories in test. 
## To remove these categories from the test ‘Product_ID’ column.

## Create the distinct list of categories called ‘not_found_cat’ from the diff_cat_in_train_test using map operation.
## Register a udf(user define function).
## User defined function will take each element of test column and search this in not_found_cat list and 
## it will put -1 ifit finds in this list otherwise it will do nothing.
not_found_cat = diff_cat_in_train_test.rdd.map(lambda x: x[0]).collect()
print(len(not_found_cat))
print(type(not_found_cat))
print(not_found_cat)

46
<class 'list'>
['P00322642', 'P00300142', 'P00077642', 'P00249942', 'P00294942', 'P00106242', 'P00239542', 'P00074942', 'P00092742', 'P00082142', 'P00030342', 'P00062542', 'P00063942', 'P00013042', 'P00279042', 'P00227242', 'P00359842', 'P00061642', 'P00042642', 'P0099542', 'P00306842', 'P00140842', 'P00165542', 'P00322842', 'P00268942', 'P00236842', 'P00038942', 'P00172942', 'P00012642', 'P00270342', 'P00312642', 'P00336842', 'P00105742', 'P00309842', 'P00166542', 'P00082642', 'P00253842', 'P00062242', 'P00100242', 'P00315342', 'P00058842', 'P00168242', 'P00156942', 'P00039042', 'P00056942', 'P00204642']


#### User Defined Functions - UDF

In [None]:
trainDF.show(1)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 1 row



In [None]:
## Register the udf, we need to import StringType from the pyspark.sql and udf from the pyspark.sql.functions. 
## The udf function takes 2 parameters as arguments:
## Return type (in my case StringType())
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

Function1 = udf(lambda x: '-1' if x in not_found_cat else x, StringType())

In [None]:
## In the above code function name is ‘Function1’ and we are putting ‘-1’  for not found catagories in test ‘Product_ID’. 
## Finally apply above ‘Function1’ function on test ‘Product_ID’ and take result in k for new column calles “NEW_Product_ID”.

k = testDF.withColumn("NEW_Product_ID",Function1(testDF["Product_ID"])).select('NEW_Product_ID')
k.where(k['NEW_Product_ID'] == -1).show(2)

+--------------+
|NEW_Product_ID|
+--------------+
|            -1|
|            -1|
+--------------+
only showing top 2 rows



In [None]:
k.select('New_Product_ID').distinct().show()

+--------------+
|New_Product_ID|
+--------------+
|     P00281742|
|      P0096342|
|     P00026042|
|      P0098242|
|     P00313242|
|     P00048442|
|     P00323242|
|     P00159842|
|     P00015342|
|     P00146342|
|     P00180642|
|     P00078842|
|     P00162642|
|     P00318342|
|     P00256142|
|     P00162742|
|     P00014542|
|     P00165442|
|     P00119442|
|     P00212242|
+--------------+
only showing top 20 rows



In [None]:
## See the results by again calculating the different categories in k and train subtract operation.
diff_cat_in_train_test=k.select('NEW_Product_ID').subtract(trainDF.select('Product_ID'))
print(diff_cat_in_train_test.count())# For distinct count
print(diff_cat_in_train_test.distinct().count())# For distinct count

1
1


In [None]:
## The output 1 means we have now only 1 different category k and train.
diff_cat_in_train_test.distinct().collect()

[Row(NEW_Product_ID='-1')]

In [None]:
from pyspark.sql.functions import lit, round, bround
trainDF.select(round(lit("2.5")), bround(lit(2.5))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [None]:
spark.sql("SELECT bround(2.5), round(2.9), round(2.4), bround(2.9)").show(2)

+--------------+-------------+-------------+--------------+
|bround(2.5, 0)|round(2.9, 0)|round(2.4, 0)|bround(2.9, 0)|
+--------------+-------------+-------------+--------------+
|             2|            3|            2|             3|
+--------------+-------------+-------------+--------------+



In [None]:
spark.sql("SELECT round(2.5), bround(2.5)").show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|            3|             2|
+-------------+--------------+



In [None]:
from pyspark.sql.functions import corr
print(trainDF.stat.corr("Purchase", "Product_Category_1"))
trainDF.select(corr("Purchase", "Product_Category_1")).show()

-0.3437033459199084
+----------------------------------+
|corr(Purchase, Product_Category_1)|
+----------------------------------+
|               -0.3437033459199084|
+----------------------------------+



In [None]:
trainDF.stat.freqItems(["Age"],.6).show(truncate = False)

+-------------+
|Age_freqItems|
+-------------+
|[26-35]      |
+-------------+



#### String Manipulations

In [None]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

trainDF.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 7, "?").alias("lp"),
rpad(lit("HELLO"), 10, "?").alias("rp"))\
.show(2)

+------+------+-----+-------+----------+
| ltrim| rtrim| trim|     lp|        rp|
+------+------+-----+-------+----------+
|HELLO | HELLO|HELLO|??HELLO|HELLO?????|
|HELLO | HELLO|HELLO|??HELLO|HELLO?????|
+------+------+-----+-------+----------+
only showing top 2 rows



In [None]:
trainDF.registerTempTable("trainDFTable")

In [None]:
spark.sql("""SELECT
ltrim(' HELLLOOOO '),
rtrim(' HELLLOOOO '),
trim(' HELLLOOOO '),
lpad('HELLOOOO ', 3, ' '),
rpad('HELLOOOO ', 10, ' ')
FROM
trainDFTable""").show(2)

+------------------+------------------+-----------------+---------------------+----------------------+
|ltrim( HELLLOOOO )|rtrim( HELLLOOOO )|trim( HELLLOOOO )|lpad(HELLOOOO , 3,  )|rpad(HELLOOOO , 10,  )|
+------------------+------------------+-----------------+---------------------+----------------------+
|        HELLLOOOO |         HELLLOOOO|        HELLLOOOO|                  HEL|            HELLOOOO  |
|        HELLLOOOO |         HELLLOOOO|        HELLLOOOO|                  HEL|            HELLOOOO  |
+------------------+------------------+-----------------+---------------------+----------------------+
only showing top 2 rows



#### Regular Expressions

In [None]:
from pyspark.sql.functions import expr, col, column
from pyspark.sql.functions import regexp_replace
regex_string = "F"

trainDF.select(
regexp_replace(col("Gender"), regex_string, "MALE_OR_FEMALE")
.alias("Gender_DECODE"),
col("Gender"))\
.show(10)

+--------------+------+
| Gender_DECODE|Gender|
+--------------+------+
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|             M|     M|
|             M|     M|
|             M|     M|
|             M|     M|
|             M|     M|
|             M|     M|
+--------------+------+
only showing top 10 rows



In [None]:
spark.sql("""
SELECT
regexp_replace(Gender, 'F|M', 'MALE_OR_FEMALE') as
Gender_DECODE,
Gender
FROM
trainDFTable
""").show(2)

+--------------+------+
| Gender_DECODE|Gender|
+--------------+------+
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
+--------------+------+
only showing top 2 rows



In [None]:
from pyspark.sql.functions import translate
trainDF.select(
translate(col("Gender"), "FM", "01"),
col("Gender"))\
.show(10)

+-------------------------+------+
|translate(Gender, FM, 01)|Gender|
+-------------------------+------+
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
+-------------------------+------+
only showing top 10 rows



In [None]:
spark.sql("""
SELECT
translate(Gender, 'FM', '01'),
Gender
FROM
trainDFTable
""").show(10)

+-------------------------+------+
|translate(Gender, FM, 01)|Gender|
+-------------------------+------+
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
+-------------------------+------+
only showing top 10 rows



## Working with Date and Time

In [None]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.show(truncate = False)

+---+----------+-----------------------+
|id |today     |now                    |
+---+----------+-----------------------+
|0  |2019-12-06|2019-12-06 04:26:10.668|
|1  |2019-12-06|2019-12-06 04:26:10.668|
|2  |2019-12-06|2019-12-06 04:26:10.668|
|3  |2019-12-06|2019-12-06 04:26:10.668|
|4  |2019-12-06|2019-12-06 04:26:10.668|
|5  |2019-12-06|2019-12-06 04:26:10.668|
|6  |2019-12-06|2019-12-06 04:26:10.668|
|7  |2019-12-06|2019-12-06 04:26:10.668|
|8  |2019-12-06|2019-12-06 04:26:10.668|
|9  |2019-12-06|2019-12-06 04:26:10.668|
+---+----------+-----------------------+



In [None]:
dateDF.createOrReplaceTempView("dateDFTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [None]:
from pyspark.sql.functions import expr, col, column
from pyspark.sql.functions import regexp_replace

In [None]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 5),date_add(col("today"), 5)).show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2019-12-01|        2019-12-11|
+------------------+------------------+
only showing top 1 row



In [None]:
spark.sql("""
SELECT
date_sub(today, 5),
date_add(today, 5)
FROM
dateDFTable
""").show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2019-12-01|        2019-12-11|
+------------------+------------------+
only showing top 1 row



In [None]:
from pyspark.sql.functions import datediff, months_between, to_date
dateDF\
.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today")),"week_ago")\
.show()

+-------------------------+----------+
|datediff(week_ago, today)|  week_ago|
+-------------------------+----------+
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
|                       -7|2019-11-29|
+-------------------------+----------+



In [None]:
from pyspark.sql.functions import lit
dateDF\
.select(
to_date(lit("2017-01-01")).alias("start"),
to_date(lit("2018-02-18")).alias("end"))\
.select(months_between(col("start"), col("end")))\
.show(1)

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                     -13.5483871|
+--------------------------------+
only showing top 1 row



In [None]:
spark.sql("""
SELECT
to_date('2016-01-01') as To_Date,
months_between('2016-01-01', '2017-01-01') as Months_Between,
datediff('2016-01-01', '2017-01-01') as Diff,
now
FROM
dateDFTable
""").show(2)

+----------+--------------+----+--------------------+
|   To_Date|Months_Between|Diff|                 now|
+----------+--------------+----+--------------------+
|2016-01-01|         -12.0|-366|2019-12-06 04:29:...|
|2016-01-01|         -12.0|-366|2019-12-06 04:29:...|
+----------+--------------+----+--------------------+
only showing top 2 rows



In [None]:
from pyspark.sql.functions import to_date, lit
spark.range(5).withColumn("date", lit("2017-01-01"))\
.select(to_date(col("date")))\
.show()

+---------------+
|to_date(`date`)|
+---------------+
|     2017-01-01|
|     2017-01-01|
|     2017-01-01|
|     2017-01-01|
|     2017-01-01|
+---------------+



__WARNING__
<br>Spark will not throw an error if it cannot parse the date, it’ll just return null. This can be a bit tricky in larger pipelines because you may be expecting your data in one format and getting it in another. To illustrate, let’s take a look at the date format that has switched from year-month-day to year-day-month. Spark will fail to parse this date and silently return null instead.

In [None]:
### 2016-20-12 - year-day-month
### 2017-12-11 - year-month-day
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

+---------------------+---------------------+
|to_date('2016-20-12')|to_date('2017-12-11')|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row



In [None]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql.functions import to_date, lit
dateFormat = "yyyy-dd-MM"

cleanDateDF = spark.range(1)\
.select(to_date(unix_timestamp(lit("2017-12-11"), dateFormat)
.cast("timestamp"))\
.alias("date"),
to_date(unix_timestamp(lit("2017-20-12"), dateFormat)
.cast("timestamp"))\
.alias("date2"))

cleanDateDF.show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [None]:
textDF = spark.range(10).withColumn("Description", lit("This is long string"))
textDF.show()

+---+-------------------+
| id|        Description|
+---+-------------------+
|  0|This is long string|
|  1|This is long string|
|  2|This is long string|
|  3|This is long string|
|  4|This is long string|
|  5|This is long string|
|  6|This is long string|
|  7|This is long string|
|  8|This is long string|
|  9|This is long string|
+---+-------------------+



In [None]:
from pyspark.sql.functions import split
textDF.select(split(col("Description"), " ")).show(2)

+---------------------+
|split(Description,  )|
+---------------------+
| [This, is, long, ...|
| [This, is, long, ...|
+---------------------+
only showing top 2 rows



In [None]:
textDF.createOrReplaceTempView('textDFTable')

spark.sql("""
SELECT
split(Description, ' ')
FROM
textDFTable
""").show(2)

+---------------------+
|split(Description,  )|
+---------------------+
| [This, is, long, ...|
| [This, is, long, ...|
+---------------------+
only showing top 2 rows



## User-Defined Functions

In [None]:
udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):
    return double_value ** 3

power3(3.0)

27.0

Once the function is created, we need to register them with Spark so that we can used
them on all of our worker machines. Spark will serialize the function on the driver, and transfer it over the network to all executor processes. This happens regardless of language.

<br>Once we go to use the function, there are essentially two different things that occur. If the function is written in Scala or Java then we can use that function within the JVM. This means there will be little performance penalty aside from the fact that we can’t take advantage of code generation capabilities that Spark has for built-in functions.

<br>If the function is written in Python, something quite different happens. 
Spark will start up a python process on the worker, serialize all of the data to a format that python can understand (remember it was in the JVM before), execute the function row by row on that data in the python process, before finally returning the results of the row operations to the JVM and Spark.

![UDF_Spark_Python](../Images/UDF_Spark_Python.png)

In [None]:
udfExampleDF.show()

+---+
|num|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [None]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [None]:
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show()

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+



#### UDF Written in Scala
#### Try in spark-shell
val udfExampleDF = spark.range(5).toDF("num")

def power3(number:Double):Double = {
<br>          number X number X number
<br>}

power3(2.0)

![UDF_Scala_PySpark](../Images/Scala_UDF.png)

### Distributed Shared Variables

#### Broadcast Variables

In [None]:
my_collection = "Postgraduate Program in Big Data Analytics and Optimization"\
  .split(" ")
    
words = sc.parallelize(my_collection, 2)

In [None]:
words.take(10)

['Postgraduate',
 'Program',
 'in',
 'Big',
 'Data',
 'Analytics',
 'and',
 'Optimization']

In [None]:
supplementalData = {"Postgraduate":1000, "Analytics":200, "Optimization": 400,
                    "Big":-300, "Data": 100, "Program":100}

In [None]:
suppBroadcast = sc.broadcast(supplementalData)

In [None]:
suppBroadcast.value

{'Analytics': 200,
 'Big': -300,
 'Data': 100,
 'Optimization': 400,
 'Postgraduate': 1000,
 'Program': 100}

In [None]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
  .sortBy(lambda wordPair: wordPair[1])\
  .collect()

[('Big', -300),
 ('in', 0),
 ('and', 0),
 ('Program', 100),
 ('Data', 100),
 ('Analytics', 200),
 ('Optimization', 400),
 ('Postgraduate', 1000)]

#### Accumulators

In [None]:
cwgDF = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("XXI_Commonwealth_Games.csv")

In [None]:
cwgDF.show(5)

+---+----------+------------+----+------+------+-----+
|Seq|NationCode|  NationName|Gold|Silver|Bronze|Total|
+---+----------+------------+----+------+------+-----+
|  1|       AUS|   Australia|  60|    45|    46|  151|
|  2|       ENG|     England|  28|    31|    24|   83|
|  3|       IND|       India|  14|     6|     9|   29|
|  4|       CAN|      Canada|  11|    26|    19|   56|
|  5|       RSA|South Africa|  11|     9|    12|   32|
+---+----------+------------+----+------+------+-----+
only showing top 5 rows



In [None]:
cwgDF.schema

StructType(List(StructField(Seq,IntegerType,true),StructField(NationCode,StringType,true),StructField(NationName,StringType,true),StructField(Gold,IntegerType,true),StructField(Silver,IntegerType,true),StructField(Bronze,IntegerType,true),StructField(Total,IntegerType,true)))

#### Define Accumulator

In [None]:
accIND = sc.accumulator(0)

In [None]:
def accINDFunc(each_row):
  countryCD = each_row["NationCode"]
  list_ctrys = ["IND", "SRI", "PAK", "BAN"]
  if countryCD in list_ctrys:
    accIND.add(each_row["Total"])

In [None]:
cwgDF.foreach(lambda each_row: accINDFunc(each_row))

In [None]:
accIND.value

38

### Handling Different Data Sources

There are variety of data sources that one can use out of the box aswell as the countless other sources built by the greater community.

<br> **Spark** has six “core” data sources and hundreds of external data sources written by the community.

-  CSV
-  JSON
-  Parquet
-  ORC
-  JDBC/ODBC Connections
-  Plain-text files

<br> As mentioned, Spark has numerous community-created data sources. Here’s just a small sample:
-  Cassandra
-  HBase
-  MongoDB
-  AWS Redshift
-  XML
-  And many many others.

**Read API Structure**
<br>DataFrameReader.format(...).option("key", "value").schema(...).load(...)
<br>After we have a DataFrame reader, we specify several values:
-  The format
-  The schema
-  The read mode
-  A series of options

*Ex. spark.read.format("csv")
<br>  .option("mode", "FAILFAST")
<br>  .option("inferSchema", "true")
<br>  .option("path", "path/to/file(s)")
<br>  .schema(someSchema)
<br>  .load()
*

** READ MODES **
-  permissive - Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record.
-  dropMalformed - Drops the row that contains malformed records
-  failFast - Fails immediately upon encountering malformed records
<br><br>The default is permissive.

** Write API Structure **
<br>We will use this format to write to all of our data sources. 
<br>format is optional because by default, Spark will use the **Parquet** format. 
<br>option, again, allows us to configure how to write out our given data. 
<br>PartitionBy, bucketBy, and sortBy work only for file-based data sources; 
<br>you can use them to control the specific layout of files at the destination.

<br> DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
<br> The foundation for writing data is quite similar to that of reading data. 
<br>Instead of the DataFrameReader, we have the DataFrameWriter. 
<br>Because we always need to write out some given data source, 
<br>we access the DataFrameWriter on a per-DataFrame basis via the write attribute:

<br>After we have a DataFrameWriter, we specify three values: the format, a series of options, and the save mode. 

<br>Example: 
<br>dataframe.write.format("csv")
<br>  .option("mode", "OVERWRITE")
<br>  .option("dateFormat", "yyyy-MM-dd")
<br>  .option("path", "path/to/file(s)")
<br>  .save()

** SAVE MODES **
-  append - Appends the output files to the list of files that already exist at that location
-  overwrite - Will completely overwrite any data that already exists there
-  errorIfExists - Throws an error and fails the write if data or files already exist at the specified location
-  ignore - If data or files exist at the location, do nothing with the current DataFrame

#### CSV

In [None]:
tcs_CSV_DF = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.load("TCS_BO.csv")

In [None]:
tcs_CSV_DF.show(4)

+-------------------+---------+---------+---------+---------+---------+------+
|               Date|     Open|     High|      Low|    Close|Adj Close|Volume|
+-------------------+---------+---------+---------+---------+---------+------+
|2002-01-14 00:00:00|38.500000|39.500000|38.062500|38.400002|20.859608| 83688|
|2002-01-15 00:00:00|38.112499|38.724998|37.150002|37.412498|20.323185| 47496|
|2002-01-16 00:00:00|38.049999|38.500000|37.125000|37.700001|20.479355| 51624|
|2002-01-17 00:00:00|36.250000|38.750000|36.250000|38.337502|20.825661| 85840|
+-------------------+---------+---------+---------+---------+---------+------+
only showing top 4 rows



In [None]:
tcs_CSV_DF.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import expr, col, column

tcs_CSV_DF = tcs_CSV_DF.select(col("Date").cast("date"), 
                     col("Open").cast("double"),
                     col("High").cast("double"),
                     col("Low").cast("double"),
                     col("Close").cast("double"),
                     col("Adj Close").cast("double"), 
                     col("Volume").cast("int"))

In [None]:
tcs_CSV_DF.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [None]:
tcs_CSV_DF = tcs_CSV_DF.withColumnRenamed("Adj Close", "Adj_Close")
tcs_CSV_DF = tcs_CSV_DF.withColumnRenamed("Date", "Stock_Date")

In [None]:
tcs_CSV_DF.write.format("json").mode("overwrite").save("TCS_JSON/")

In [None]:
tcs_CSV_DF.write.format("parquet").mode("overwrite").save("TCS_PARQUET/")

In [None]:
tcs_CSV_DF.write.format("orc").mode("overwrite").save("TCS_ORC/")

#### JSON Files
Those coming from the world of JavaScript are likely familiar with JavaScript Object Notation, or JSON, as it’s commonly called.

In [None]:
tcs_JSON_DF = spark.read.format("json")\
.option("inferSchema", "True")\
.load("TCS_JSON/")

In [None]:
tcs_JSON_DF.show(4)

+---------+---------+---------+---------+---------+----------+------+
|Adj_Close|    Close|     High|      Low|     Open|Stock_Date|Volume|
+---------+---------+---------+---------+---------+----------+------+
|20.859608|38.400002|     39.5|  38.0625|     38.5|2002-01-14| 83688|
|20.323185|37.412498|38.724998|37.150002|38.112499|2002-01-15| 47496|
|20.479355|37.700001|     38.5|   37.125|38.049999|2002-01-16| 51624|
|20.825661|38.337502|    38.75|    36.25|    36.25|2002-01-17| 85840|
+---------+---------+---------+---------+---------+----------+------+
only showing top 4 rows



#### Parquet Files
Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads. 
<br>It provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files. 
<br>It is a file format that works exceptionally well with Apache Spark and is in fact the default file format.
<br>It is recommended writing data out to Parquet for long-term storage because reading from a Parquet file will always be more efficient than JSON or CSV. 
<br>Another advantage of Parquet is that it supports complex types. 
<br>This means that if your column is an array (which would fail with a CSV file, for example), map, or struct, you’ll still be able to read and write that file without issue. 

** Reading Parquet Files **
<br>Parquet has very few options because it enforces its own schema when storing data. 
<br>Thus, all we need to set is the format and you are good to go. 
<br>We can set the schema if we have strict requirements for what our DataFrame should look like. 
<br>Oftentimes this is not necessary because we can use schema on read, which is similar to the inferSchema with CSV files. 
<br>However, with Parquet files, this method is more powerful because the schema is built into the file itself (so no inference needed).

In [None]:
tcs_PARQUET_DF = spark.read.format("parquet").load("TCS_PARQUET/")
tcs_PARQUET_DF.show(4)

+----------+---------+---------+---------+---------+---------+------+
|Stock_Date|     Open|     High|      Low|    Close|Adj_Close|Volume|
+----------+---------+---------+---------+---------+---------+------+
|2002-01-14|     38.5|     39.5|  38.0625|38.400002|20.859608| 83688|
|2002-01-15|38.112499|38.724998|37.150002|37.412498|20.323185| 47496|
|2002-01-16|38.049999|     38.5|   37.125|37.700001|20.479355| 51624|
|2002-01-17|    36.25|    38.75|    36.25|38.337502|20.825661| 85840|
+----------+---------+---------+---------+---------+---------+------+
only showing top 4 rows



In [None]:
tcs_PARQUET_DF.dtypes

[('Stock_Date', 'date'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Adj_Close', 'double'),
 ('Volume', 'int')]

#### ORC Files
ORC (Optimized Row Columnar file format) is a self-describing, type-aware columnar file format designed for Hadoop workloads. 
<br>It is optimized for large streaming reads, but with integrated support for finding required rows quickly. 
<br>ORC actually has no options for reading in data because Spark understands the file format quite well. 
<br>What is the difference between ORC and Parquet? 
<br>For the most part, they’re quite similar; 
<br>The fundamental difference is that Parquet is further optimized for use with Spark, 
<br>whereas ORC is further optimized for Hive.

In [None]:
tcs_ORC_DF = spark.read.format("orc").load("TCS_ORC/")
tcs_ORC_DF.show(4)

+----------+---------+---------+---------+---------+---------+------+
|Stock_Date|     Open|     High|      Low|    Close|Adj_Close|Volume|
+----------+---------+---------+---------+---------+---------+------+
|2002-01-14|     38.5|     39.5|  38.0625|38.400002|20.859608| 83688|
|2002-01-15|38.112499|38.724998|37.150002|37.412498|20.323185| 47496|
|2002-01-16|38.049999|     38.5|   37.125|37.700001|20.479355| 51624|
|2002-01-17|    36.25|    38.75|    36.25|38.337502|20.825661| 85840|
+----------+---------+---------+---------+---------+---------+------+
only showing top 4 rows



In [None]:
tcs_ORC_DF.dtypes

[('Stock_Date', 'date'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Adj_Close', 'double'),
 ('Volume', 'int')]

#### Text Files
Spark also allows you to read in plain-text files. 
<br>Each line in the file becomes a record in the DataFrame. 
<br>It is then up to you to transform it accordingly. 
<br>As an example of how you would do this, 
<br>suppose that you need to parse some Apache log files to some more structured format, 
<br>or perhaps you want to parse some plain text for natural-language processing. 

In [None]:
tcs_TEXT_DF = spark.read.text("TCS_BO.csv")\
  .selectExpr("split(value, ',') as rows")

In [None]:
tcs_TEXT_DF.show(2,truncate=False)

+--------------------------------------------------------------------------+
|rows                                                                      |
+--------------------------------------------------------------------------+
|[Date, Open, High, Low, Close, Adj Close, Volume]                         |
|[2002-01-14, 38.500000, 39.500000, 38.062500, 38.400002, 20.859608, 83688]|
+--------------------------------------------------------------------------+
only showing top 2 rows

