###  Hadoop V/S Spark

| Parameter       | Hadoop                                                                                        | Spark                                                                                                         |
|-----------------|-----------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------|
| Performance     | Hadoop is slower then spark. it writes data back to the disk and read again from to in-memory | Spark is faster then hadoop because spark do all the the computation in memory.                               |
| Batch/Streaming | Build for batch data processing.                                                              | Build for batch as well as streaming data processing.                                                         |
| Ease Of Use     | Difficult to write code in hadoop. Hive was built to make it easier                           | Easy to write and debug code. Interactive shell to develop and test. Spark provides high and low level API's. |
| Security        | Use kerberos Authentication and ACL autirization. (YARN)                                      | Don't have solid security. (HDFS->ACL)(YARN->kerberos).                                                       |
| Fault Talerance | It has block of data (128 MB) and replication factor to handle the failure.                   | Use DAG to provide fault talerance (DAG).                                                                           |

### Read CSV Data In Spark
1. Format (Optional) -> CSV, JSON, JDBC/ODBC, Table, parquate. By default it takes parquate as read method.
2. Option (Optional) -> InferSchema, Mode, header.
3. Schema (Optional) -> Custom schema can be used.
4. Load -> File path.

Read Mode:
1. Failfest: Fail execution if malformed record in dataset.
2. Dropmalformed: Drop the corrupted record.
3. Permissive: Default mode. Set null values to all the corrupted fields.

In [331]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as Fun


In [332]:
spark = SparkSession.builder.master("local[*]").appName("SparkSumbittest").getOrCreate()

23/08/08 00:14:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [333]:
df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferschema", "true")\
                .option("mode", "PERMISSIVE")\
                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")
df.show(5)

+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
| airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
| AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
| Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
| Vistara| UK-963|      Delhi|       Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5955|
+--------+-------+-----------+--------------+-----+-------------+-------

### Spark Job Submit 

(Assumptions DriverMemory-20GB, TotalExecutor-5, ExeutorCores-4, ExecutorMemory-25GB )

1. Master node first create Driver in any worker node.
2. Driver which is also known as Application Driver. Spark is writen in scala, and scala is a JVM process. Inside the Driver container it will create 2 main methods, one is for pyspark and another is for JVM.  Spark Core -> Java Wrapper -> Python Wrapper. JVM is called Application driver and pyspark is called pyspark driver.
3. Then driver check the executor details and then it send the request to the resource manager. 
4. The resource manager sent request to node manager (worker), then it creates 5 executors in the ideal workers.
5. Application driver send data and other details to the executors for the processing.
6. All the excutors send computated result tot the driver.
7. In the end all the container driver & exector will be delete. 

Note: Avoid writing/using UDF funtion in the pysaprk, it will require python worker in the executor container so it will impact on the performace. Always use buit-in function.
  


### Spark Schema

There are 2 types of schema
1. Using StructType & StructField
    i. StructType: Defines structure of dataframe. List of StructField
    ii. StructField: Define the column data type.
    Example: StructType([StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
2. Using DDL: In quotes comma seperated columns with data type in space.
    Example: "id integer, name string, age integer"

In [334]:
flight_schema = StructType([StructField("airline", StringType(), True), StructField("flight", StringType(), True), \
                            StructField("source_city", StringType(), True), StructField("departure_time", StringType(), True), \
                            StructField("stops", StringType(), True), StructField("arrival_time", StringType(), True), \
                            StructField("destination_city", StringType(), True), StructField("class", StringType(), True), \
                            StructField("duration", IntegerType(), True), StructField("days_left", IntegerType(), True), StructField("price", IntegerType(), True)])

In [335]:
df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferschema", "false")\
                .schema(flight_schema)\
                .option("mode", "PERMISSIVE")\
                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")
df.printSchema()
df.count()

root
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



300153

### Trasformation & Action

When action is called then only transformation is triggered.

Transformation: Filter, Select, Union, Join, GroupBy, Distinct.

Action: count, collect, show, read.

Types Of Transformations
1. Narrow -> Trasformation that doesn't require data movement within partition. Eg. Filter, Select, Union, Map
2. Wide -> Trasformation that require datat movement within partition. Eg Join, GroupBy, Distinct. In wide transformation need to do data suffling between the partitions.

Data Shuffling: Data is transferred through network among different partitions. Which is a very expensive trasformation.

Note: When action is executed, the output data is collected by Driver. Driver memory should be always greater then the action output data. Otherwise there will be a memory error. 

In [336]:
df1 = df.filter(df["departure_time"]=="Evening")    # Trasormation
df1.count()     # Action

65102

### DAG & Lazy Evaluation

DAG -> Directed Acyclic Graph. It will never run loop, and execution happens in tree structure.

On every action, a job is created and each job has it's own DAG.

df = spark.read.format("csv")\          # Action
                .option("header", "true")\
                .option("inferschema", "false")\        # Action
                .schema(flight_schema)\
                .option("mode", "PERMISSIVE")\
                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")       
data_repartition = df.repartition(3)        # Wide Trasformation
df1 = df.filter(df["departure_time"]=="Evening")        # Narrow Trasformation
df = df.filter((Fun.col("destination_city")=="Mumbai") | (Fun.col("destination_city")=="Delhi"))        # Narrow Trasformation
df = df.groupby("stops").sum("price")           # Wide Trasformation
df.show()       # Action

In the above code, after the execution it will create 4 jobs. 3 for actions and 1 for _______. DAG can be view from spark UI.
1. Read -> Reading and generating java byte code. 
2. Inferschema -> Mapping
3. Show

Wide & Narrow trasformation will trigger, when action is defined. This is called Lazy Evaluation. Through Lazy Evaluation it also optimize the code, example in the above code it will merge the departure_time & destination_city filter to a single query.

In [337]:
df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferschema", "false")\
                .schema(flight_schema)\
                .option("mode", "PERMISSIVE")\
                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")

data_repartition = df.repartition(3)

df1 = df.filter(df["departure_time"]=="Evening") 

df = df.filter((Fun.col("destination_city")=="Mumbai") | (Fun.col("destination_city")=="Delhi"))

df = df.groupby("stops").sum("price")

df.show()

+-----------+----------+
|      stops|sum(price)|
+-----------+----------+
|two_or_more|  26808235|
|       zero| 195028713|
|        one|2098748431|
+-----------+----------+



### Spark SQL Engine

Spark SQL Engine / Catayst Optimizer: 

SQL / Dataframe / Dataset -> Spark SQL Engine / Catayst Optimizer -> RDD Java Byte Code 

4 phases of Spark SQl Engine:
1. Analysis : Linked with catalog. It checks whether table, columns or path is present or not. If not, then it will throw the "Analysis Exception". 
2. Optimized Logical Planning : In the Lazy evalution it performs the code optimization automatically. Example Merge multiple filters to single filter or During the Computation only 2 columns are required, so it will only pull the 2 column data automation for the optimation.  
3. Physical Planning : Spark created multiple plans and among the best plan, it automatically choose the best one for the cost optimization. Example : Suppose there is 2 table one is big and other one is small. So it will broadcast the small table to avoid the shuffling. 
4. Code Generation

<u>Code</u> --------> <u>Unresolved Logical Planning</u> ----Analysis----> <u>Resolved Logical Planning</u> ----Logical-Optimization----> <u>Optimized Logical Planning</u> --------> <u>Physical Planning</u> ----Cost-Model----> <u>Best Physical Plan</u> --------> <u>Final Code</u> 
                                                                        
Catalog -> it is just a metadata of the data.                       
Analysis Exception Error:



In [338]:
"""
df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferschema", "false")\
                .schema(flight_schema)\
                .option("mode", "PERMISSIVE")\
                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")
df.select("name1").count()      # Column does not exsist so it will show the Analysis Error.

"""

'\ndf = spark.read.format("csv")                .option("header", "true")                .option("inferschema", "false")                .schema(flight_schema)                .option("mode", "PERMISSIVE")                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")\ndf.select("name1").count()      # Column does not exsist so it will show the Analysis Error.\n\n'

### RDD (Resilient Distributed Dataset)

When "Full Control On Data" is required then RDD is used. RDD is a data structure in spark. Example: RDD distribute list in different node for the processing.

Resilient -> In case of failure recovery. <BR>
Destributed -> Data is over the cluster. <BR>
Dataset -> Actual data over different partitions. <BR>

RDD is immutable. From RDD1, RDD2 is creted. It means in the filteration it is creating new RDD i.e. RDD2 but it is not loosing unfiltered RDD1 data. <br>
Example: <u>RDD1 (100 Rows Data)</u> --Filter-On-Age-Column--> <u>RDD2 (70 Rows Data)</u> --Filter-On-City-Column--> <u>RDD3 (30 Rows Data)</u> <br>
Suppose RDD3 fails, then through DAG it knows how to create RDD3 from RDD2, so it will automatically create the new RDD3. This feature is also called as Fault Tolerance.  <br>

Advantage:
1. Best for unstructured data.
2. It is tyoe safe. It will throw column error during the compile time but dataframe will through error during the run time (Run time - After 2 hrs of code execution it will through error in dataframe case).
3. Flexibility & Control.


Disadvantage: 
1. No optimization is done by spark. Developer need to write the optimization methods.
2. In RDD it is "How To", and in Dataframe it is "What To".
3. Very difficult to write code in RDD.

| Method          | Code                                                                                          | 
|-----------------|-----------------------------------------------------------------------------------------------|
| Dataframe       | data.groupBy("dept").avg("age")                                                               |
| SQL             | SELECT dept, avg(age) from data group by dept                                                 |
| RDD             | data.map {case dept, age} => dept -> (age, 1) <br /> .reduceByKey   { case ((a1,c1), (a2,c2)) => (a1+a2,c1+c2) } <br /> .map { case (dept, (age, c)) => dept -> age/c}                                          |


### Parquet

| Title | Data | Chart|
|-------|------|------|
| 1     | 2    | 3    |
| 4     | 5    | 6    |
| 7     | 8    | 9    |

Types in which parquet is saved on disk:
1. Columnar based file format. | 1 | 4 | 7 | 2 | 5 | 8 | 3 | 6 | 9 | <br>
    Example: OLTP (Online Transactional Processing) Required when need to analysis on few columns like groupby, join 
2. Row based file format. | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | => If we read specific index like 0 and 2, so everything time it need to jumps/skip the index 1. Due to this process will be slow. <br>
    Example: OLAP (Online Analytical Processing) Required when need to update, insert, deletion is done on the date.

Big Data -. Write Once & Read Many

It is binary file format (Can be read with naked eyes). 3 main Advantages:
1. Cost Reduce
2. Time Reduce
3. Performance Increase


Data Encoding
![Data Compression](/home/manish/Documents/VSCodeProjects/SparkTutorial/CompressionInParquet.png)

Parquet Format:
1. GZIP
2. LZO
3. Snappy

Sample query executed, time taken by :-
1. CSV is 2892 sec.
2. LZO is 50 sec.
3. GZIP is 40 sec.
4. Snappy is 28 sec.

### Write in Spark

df.write.format("csv")<br>
        .option("header", "true")<br>
        .option("mode", "overwrite")<br>
        .option("path", "file_path")<br>
        .save() # Path can also be provided here also

Type of wite modes:
1. Append
2. Overwrite
3. errorIfExsists
4. ignore

### Partitioning & Bucketing

Both the methods are for witing the data. Spark code performace increases when suck data queried.

Partitioning: Create number of directors based on column categorical value. Like City, Gender

Bucketing: Create number of directors is provided  by user, when column is continous value type. Like Age, ID 

Suppose in a backend 200 task are running, and in the end you are creating a bucket 5. Then it will create 200*5=1000 bucket. So we need to define repartition 5 in the code. like df.repartition(5)

Bucket Suffling Eliminated: If need to join 2 tables which are saved in bucket. Then they should have:
1. Same number of bucket count.
2. Bucketing column name should be same.

Bucket Pruning: It makes searching and joining faster.
Example: 1234 5678 9102 > Aadhar card number and if you devide 123456789102/10000 then output will be 9102. it neams I need to search 9102 bucket number to get complete details of this aadhar card. This bucket could 2%-20% of the total data.

In [339]:
'''
df.write.format("csv")\
        .option("header", "true")\
        .option("mode", "overwrite")\
        .option("path", "/home/manish/Documents/VSCodeProjects/SparkTutorial/partition_destination_city")\
        .partitionBy("destination_city")\
        .save() 

df.write.format("csv")\
        .option("header", "true")\
        .option("mode", "overwrite")\
        .option("path", "/home/manish/Documents/VSCodeProjects/SparkTutorial/bucket_duration")\
        .bucketBy(3, "duration")\
        .saveAsTable("bucket_duration_flight") 
'''

'\ndf.write.format("csv")        .option("header", "true")        .option("mode", "overwrite")        .option("path", "/home/manish/Documents/VSCodeProjects/SparkTutorial/partition_destination_city")        .partitionBy("destination_city")        .save() \n\ndf.write.format("csv")        .option("header", "true")        .option("mode", "overwrite")        .option("path", "/home/manish/Documents/VSCodeProjects/SparkTutorial/bucket_duration")        .bucketBy(3, "duration")        .saveAsTable("bucket_duration_flight") \n'

### Repartition & Coalesce 

In [340]:
print("Add ### Repartition & Coalesce Code")

Add ### Repartition & Coalesce Code


### Application Jobs, Stages & Tasks

Application: Code which executed through spark-submit. Single application is submitted at a time, mutiple application deployment is also feasible. <br>
Job: In a application, total number of jobs is always equal to total number actions in your code. <br>
Stages: Job is divided into stages, and stages is nothing but number of transformation in your code. When there is wide transformation the it split into next stage.<br> 
Tasks: Execute the code on the actual data. Triggered on executor.

When job is created then minimum one stage is created in job and again minimum one task is created in stage. 

Sample Code: <br>
from pyspark.sql.functions import *<br>
spark = SparkSession.builder.master("Local[5]").appName("Testing").getorCreate()<br>
employ_df = spark.read.format("csv")<br>        # Action 1
                 .option("header", "true")<br>
                 .load("path_to_file")<br>
print(employ_df.rdd.getNumpartitions())<br>
employ_df = empoy_df.repartition(2)<br>         # Wide Trasformation 1
print(employ_df.rdd.getNumpartitions())<br>
employ_df = employ_df.filter(col("salary")>90000)<br>       # Narrow Trasformation 1
                     .select("id", "name", "age", "salary")<br>         # Narrow Trasformation 2
                     .groupby("age").count()<br>    # Wide Trasformation 2
employ_df.collect()<br>         # Action 2

Code Flow: <br>
Read => Repartion => Filter => Select => GroupBy => Collect <br>
By default there are 200 partitions in wide transformation.

Total Jobs: 2 Actions => 2 Jobs will be created.
Total Stages : 2 Wide trasformation => 3 Stages will be created.
Total tasks: Stage 1 (Restructure) 1 + Stage 2 (Read Exchange) 2 + Stage 3 (GroupBy) 200 => 203



### SparkSession & SparkContext

Both provide entry to the spark cluster.

In SparkSession, SQLSession, HiveSession etc. is encapsulated. Through SparkContext you need to create a context then only you can start using. 

RDD V/S DataFrame V/S Dataset

| RDD                                     | DataFrame                              | Dataset                                                    |
|-----------------------------------------|----------------------------------------|------------------------------------------------------------|
| Fault Tolerant                          | Fault Tolerant                         | Fault Tolerant                                             |
| Distributed                             | Distributed                            | Distributed                                                |
| Immutable                               | Immutable                              | Immutable                                                  |
| No Schema                               | Schema                                 | Schema                                                     |
| Slow on Non-JVM Language                | Faster                                 | Faster                                                     |
| No Execution optimization               | Optimization - Catalyst Optimizer      | Optimization                                               |
| Low Level API                           | High Level API                         | High Level API                                             |
| No SQL Support                          | SQL Support                            | SQL Support                                                |
| Type Safe                               | No Type Safe                           | Type Safe                                                  |
| Syntex error detected at compline time  | Syntex error detected at compline time | Syntex error detected at compline time                     |
| Analysis error detected at compile time | Analysis error detected at run time    | Analysis error detected at compile time                    |
| Java, Scala, Python, R                  | Java, Scala, Python, R                 | Java, Scala                                                |
| High memory is used                     | High memory is used                    | Low memory is used. Tungstun encoder provide great benefit |

Low & High level API means, that there function in high level for aggregation eg. sum, avg, select.<br>
Time taken By Code:<br>
![Time Taken By Code](/home/manish/Documents/VSCodeProjects/SparkTutorial/CodeRunTimeDistribution.png)

### Dataframe Functions

1. Aliasing - Renaming a column in dataframe.
2. Filter/Where - Filter dataframe.
3. Literal - Add new column with same value in the complete dataframe.
4. Adding New Columns - Adding new column.
5. Renaming Columns - Renaming a column in dataframe.
6. Casting Data Types - Converting datatype of a column.
7. Removing Columns - Droping column from dataframe.

Different Methods For Selecting Columns: <br>
df.select("airline", col("flight"), df["source_city"], df.departure_time)

Experation Method: In this you can SQL functions<br>
df.select(expr("price + 100"))  => Without expr, price column will act as a String data type.

In [341]:
# | airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferschema", "false")\
                .schema(flight_schema)\
                .option("mode", "PERMISSIVE")\
                .load("/home/manish/Documents/VSCodeProjects/SparkTutorial/flight_data.csv")

# Aliasing
df.select(Fun.col("airline").alias("airline_id"), "departure_time", "days_left").show(2)

+----------+--------------+---------+
|airline_id|departure_time|days_left|
+----------+--------------+---------+
|  SpiceJet|       Evening|        1|
|  SpiceJet| Early_Morning|        1|
+----------+--------------+---------+
only showing top 2 rows



In [342]:
# Filter/Where
df.filter(Fun.col("days_left")>1).show(2)
df.where(Fun.col("days_left")>1).show(2)
df.where((Fun.col("days_left")>1) & (Fun.col("days_left")<3)).show(2)

+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
| airline| flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|
+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
|SpiceJet|SG-8803|      Delhi| Early_Morning| zero|     Morning|          Mumbai|Economy|    null|        2| 5953|
|SpiceJet|SG-8169|      Delhi|       Evening| zero|       Night|          Mumbai|Economy|    null|        2| 5953|
+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
only showing top 2 rows

+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
| airline| flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|
+--------+-------+-----------+--------------+-----+----

In [343]:
# Literal
df.select("*", Fun.lit("flight_data").alias("data_type")).show(2)

+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+-----------+
| airline| flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|  data_type|
+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+-----------+
|SpiceJet|SG-8709|      Delhi|       Evening| zero|       Night|          Mumbai|Economy|    null|        1| 5953|flight_data|
|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|     Morning|          Mumbai|Economy|    null|        1| 5953|flight_data|
+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+-----------+
only showing top 2 rows



In [344]:
# Adding New Columns 
df.withColumn("data_domain", Fun.lit("airplanes_data")).show(2)

+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+--------------+
| airline| flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|   data_domain|
+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+--------------+
|SpiceJet|SG-8709|      Delhi|       Evening| zero|       Night|          Mumbai|Economy|    null|        1| 5953|airplanes_data|
|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|     Morning|          Mumbai|Economy|    null|        1| 5953|airplanes_data|
+--------+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+--------------+
only showing top 2 rows



In [345]:
# Renaming Columns
df.withColumnRenamed("flight", "flight_id").show(2)

+--------+---------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
| airline|flight_id|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|
+--------+---------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
|SpiceJet|  SG-8709|      Delhi|       Evening| zero|       Night|          Mumbai|Economy|    null|        1| 5953|
|SpiceJet|  SG-8157|      Delhi| Early_Morning| zero|     Morning|          Mumbai|Economy|    null|        1| 5953|
+--------+---------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+
only showing top 2 rows



In [346]:
# Casting Data Types
df.withColumn("price", Fun.col("price").cast("integer"))\
     .withColumn("days_left", Fun.col("days_left").cast("integer")).printSchema()

root
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



In [347]:
# Removing Columns
df.drop("price", Fun.col("airline")).show(2)

+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+
| flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|
+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+
|SG-8709|      Delhi|       Evening| zero|       Night|          Mumbai|Economy|    null|        1|
|SG-8157|      Delhi| Early_Morning| zero|     Morning|          Mumbai|Economy|    null|        1|
+-------+-----------+--------------+-----+------------+----------------+-------+--------+---------+
only showing top 2 rows



In [348]:
# SQL Query
df.createOrReplaceTempView("flight")
spark.sql("""
     SELECT *, "flight_data" AS data_type, CONCAT(source_city, ' ',destination_city) AS flight_cities, flight AS flight_id, CAST(price as long) FROM flight WHERE days_left >= 2 AND price > 6000
""").show(2)

+-------+------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+-----------+-------------+---------+-----+
|airline|flight|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|price|  data_type|flight_cities|flight_id|price|
+-------+------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+-----------+-------------+---------+-----+
|Vistara|UK-953|      Delhi|         Night| zero|       Night|          Mumbai|Economy|    null|        2| 6060|flight_data| Delhi Mumbai|   UK-953| 6060|
|Vistara|UK-995|      Delhi|       Morning| zero|   Afternoon|          Mumbai|Economy|    null|        2| 6375|flight_data| Delhi Mumbai|   UK-995| 6375|
+-------+------+-----------+--------------+-----+------------+----------------+-------+--------+---------+-----+-----------+-------------+---------+-----+
only showing top 2 rows



### If Else 

When need to add new column, logical operations on dataframe columns.

Most of the if else scenario are covered below.

In [349]:
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]

emp_schema = ["id", "name", "age", "salary", "country", "dept"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

emp_df.withColumn("adult", Fun.when(Fun.col("age")<18, "No")
                                .when(Fun.col("age")>18, "Yes")
                                .otherwise("Nonvalue")).show()

+----+-------+----+------+-------+-----------+--------+
|  id|   name| age|salary|country|       dept|   adult|
+----+-------+----+------+-------+-----------+--------+
|   1| manish|  26| 20000|  india|         IT|     Yes|
|   2|  rahul|null| 40000|germany|engineering|Nonvalue|
|   3|  pawan|  12| 60000|  india|      sales|      No|
|   4|roshini|  44|  null|     uk|engineering|     Yes|
|   5|raushan|  35| 70000|  india|      sales|     Yes|
|   6|   null|  29|200000|     uk|         IT|     Yes|
|   7|   adam|  37| 65000|     us|         IT|     Yes|
|   8|  chris|  16| 40000|     us|      sales|      No|
|null|   null|null|  null|   null|       null|Nonvalue|
|   7|   adam|  37| 65000|     us|         IT|     Yes|
+----+-------+----+------+-------+-----------+--------+



In [350]:
emp_df.withColumn("age", Fun.when(Fun.col("age").isNull(), Fun.lit(19))
                  .otherwise(Fun.col("age"))) \
        .withColumn("adult", Fun.when(Fun.col("age")>17, "Yes")
                    .otherwise("No")).show()

+----+-------+---+------+-------+-----------+-----+
|  id|   name|age|salary|country|       dept|adult|
+----+-------+---+------+-------+-----------+-----+
|   1| manish| 26| 20000|  india|         IT|  Yes|
|   2|  rahul| 19| 40000|germany|engineering|  Yes|
|   3|  pawan| 12| 60000|  india|      sales|   No|
|   4|roshini| 44|  null|     uk|engineering|  Yes|
|   5|raushan| 35| 70000|  india|      sales|  Yes|
|   6|   null| 29|200000|     uk|         IT|  Yes|
|   7|   adam| 37| 65000|     us|         IT|  Yes|
|   8|  chris| 16| 40000|     us|      sales|   No|
|null|   null| 19|  null|   null|       null|  Yes|
|   7|   adam| 37| 65000|     us|         IT|  Yes|
+----+-------+---+------+-------+-----------+-----+



In [351]:
emp_df.withColumn("age_wise", Fun.when((Fun.col("age")>0) & (Fun.col("age")<17), "Yes")
                              .when((Fun.col("age")>17) & (Fun.col("age")<50), "No")
                              .otherwise("High")).show()

+----+-------+----+------+-------+-----------+--------+
|  id|   name| age|salary|country|       dept|age_wise|
+----+-------+----+------+-------+-----------+--------+
|   1| manish|  26| 20000|  india|         IT|      No|
|   2|  rahul|null| 40000|germany|engineering|    High|
|   3|  pawan|  12| 60000|  india|      sales|     Yes|
|   4|roshini|  44|  null|     uk|engineering|      No|
|   5|raushan|  35| 70000|  india|      sales|      No|
|   6|   null|  29|200000|     uk|         IT|      No|
|   7|   adam|  37| 65000|     us|         IT|      No|
|   8|  chris|  16| 40000|     us|      sales|     Yes|
|null|   null|null|  null|   null|       null|    High|
|   7|   adam|  37| 65000|     us|         IT|      No|
+----+-------+----+------+-------+-----------+--------+



In [352]:
emp_df.createOrReplaceTempView("emp")
spark.sql("""
    SELECT *,
          case when age<18 then 'minor'
          when age>18 then 'major'
          else 'novalue'
          end as adult
    FROM emp
""").show()

+----+-------+----+------+-------+-----------+-------+
|  id|   name| age|salary|country|       dept|  adult|
+----+-------+----+------+-------+-----------+-------+
|   1| manish|  26| 20000|  india|         IT|  major|
|   2|  rahul|null| 40000|germany|engineering|novalue|
|   3|  pawan|  12| 60000|  india|      sales|  minor|
|   4|roshini|  44|  null|     uk|engineering|  major|
|   5|raushan|  35| 70000|  india|      sales|  major|
|   6|   null|  29|200000|     uk|         IT|  major|
|   7|   adam|  37| 65000|     us|         IT|  major|
|   8|  chris|  16| 40000|     us|      sales|  minor|
|null|   null|null|  null|   null|       null|novalue|
|   7|   adam|  37| 65000|     us|         IT|  major|
+----+-------+----+------+-------+-----------+-------+



### Unique, Drop-Duplicates & Sorting

Unique: Return the distinct values from the column or dataframe. Unique count might varies with different numbers of column collection.

Sorting: It means arraning the column or dataframe. There are 2 types of sorting asending and desending. It can be done using single or multiple columns from the dataframe.

Drop-Duplicates: Remove the rows from the dataframe, which are duplicates with respect to column value.

Both the scenario examples covered below:

###

In [353]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(15 ,'Mohit',45000,  18),
(13 ,'Nidhi',60000,  17),      
(14 ,'Priya',90000,  18),  
(18 ,'Sam',65000,   17)]

emp_schema = ["id", "name", "salary", "manager_id"]
emp_df = spark.createDataFrame(data=data, schema=emp_schema);emp_df

print("Dataframe Row Count ::", emp_df.count())

print("Dataframe Distinct Row Count ::", emp_df.distinct().count())

print("Dataframe Distinct Row Count Using Column::", emp_df.select("name").distinct().count())

print("Dataframe Remove Duplicates ::", emp_df.drop_duplicates().count())

print("Dataframe Remove Duplicates Using Column ::", emp_df.drop_duplicates(["name"]).count())

print("Sorting in dataframe")
emp_df.sort(Fun.col("salary").desc(), Fun.col("name")).show()

Dataframe Row Count :: 13
Dataframe Distinct Row Count :: 10
Dataframe Distinct Row Count Using Column:: 9
Dataframe Remove Duplicates :: 10
Dataframe Remove Duplicates Using Column :: 9
Sorting in dataframe
+---+------+------+----------+
| id|  name|salary|manager_id|
+---+------+------+----------+
| 14| Priya| 90000|        18|
| 16|Rajesh| 90000|        10|
| 14| Priya| 80000|        18|
| 11| Vikas| 75000|        16|
| 18|   Sam| 65000|        17|
| 18|   Sam| 65000|        17|
| 13| Nidhi| 60000|        17|
| 13| Nidhi| 60000|        17|
| 17| Raman| 55000|        16|
| 10|  Anil| 50000|        18|
| 15| Mohit| 45000|        18|
| 15| Mohit| 45000|        18|
| 12| Nisha| 40000|        18|
+---+------+------+----------+



### Aggregation

1. Count
2. Sum
3. Max
4. Min
5. Min
6. GroupBy

Count: It is both action as well as trsformation. Count skip null values from single column.
Example:
df.count() => Action
df.select(count("name"))    => Trasformation



In [354]:
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,50000,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(7,'adam',37,65000,'us','IT')
]

emp_schema = ["id", "name", "age", "salary", "country", "dept"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

print("Dataframe Row Count ::", emp_df.count())

print("Dataframe Row Count Using Column With Null ::")
emp_df.select(Fun.count("id")).show()

emp_df.select(Fun.count("salary").alias("row_count"), Fun.sum("salary").alias("total_salary"), Fun.max("salary").alias("max_salary"), Fun.min("salary").alias("min_salary"), Fun.avg("salary").alias("avg_salary")).show()

emp_df.groupBy("dept", "country").agg(Fun.sum("salary")).show()

Dataframe Row Count :: 9
Dataframe Row Count Using Column With Null ::
+---------+
|count(id)|
+---------+
|        9|
+---------+

+---------+------------+----------+----------+-----------------+
|row_count|total_salary|max_salary|min_salary|       avg_salary|
+---------+------------+----------+----------+-----------------+
|        9|      610000|    200000|     20000|67777.77777777778|
+---------+------------+----------+----------+-----------------+

+-----------+-------+-----------+
|       dept|country|sum(salary)|
+-----------+-------+-----------+
|         IT|  india|      20000|
|engineering|germany|      40000|
|      sales|  india|     130000|
|engineering|     uk|      50000|
|         IT|     uk|     200000|
|         IT|     us|     130000|
|      sales|     us|      40000|
+-----------+-------+-----------+



### Dataframe Joins

df1.join(df2, join_expression, join_type)

1. Inner Join => Return all the matched rows.
2. Outter Join => 
3. Left Join => Return all the matched rows + All the left table unmatched rows also.
4. Right Join => Return all the matched rows + All the right table unmatched rows also.
5. Left Semi Join => Distinct row or first row of the, all the matched rows + All the left table unmatched rows also.
6. left Anti Join => 
7. Cross Join


In [355]:
customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]
customer_schema=['customer_id','customer_name','address','date_of_joining']
customer_df = spark.createDataFrame(data=customer_data, schema=customer_schema)
customer_df.createOrReplaceTempView("customer")

sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]
sales_schema=['customer_id','product_id','quantity','date_of_purchase']
sales_df = spark.createDataFrame(data=sales_data, schema=sales_schema)
sales_df.createOrReplaceTempView("sales")

product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]
product_schema=['product_id','name','price']
product_df = spark.createDataFrame(data=product_data, schema=product_schema)
product_df.createOrReplaceTempView("sales")

In [356]:
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "inner").show()
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "inner").count()
#customer_df.join(sales_df, (customer_df["customer_id"]==sales_df["customer_id"]) & (customer_df["customer_id"]==sales_df["customer_id"]), "inner").show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          7|        raman|  patna|     30-12-2022|          7|        22|       4|      03-02-2023|
|          9|       ragini|kolkata|     03-03-2023|          9|         5|       6|      03-03-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15

9

In [357]:
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "outer").show()
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "outer").count()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          3|       nikita|  delhi|     25-06-2023|       null|      null|    null|            null|
|          4|        rahul| ranchi|     24-03-2023|       null|      null|    null|        

15

In [358]:
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "left").show()
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "left").count()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          3|       nikita|  delhi|     25-06-2023|       null|      null|    null|            null|
|          4|        rahul| ranchi|     24-03-2023|       null|      null|    null|        

14

In [359]:
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "right").show()
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "right").count()


+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          7|        raman|  patna|     30-12-2022|          7|        22|       4|      03-02-2023|
|          9|       ragini|kolkata|     03-03-2023|          9|         5|       6|      03-03-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15

10

In [360]:
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "left_semi").show()
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "left_semi").count()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          1|       manish|  patna|     30-05-2022|
|          2|       vikash|kolkata|     12-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|
|          7|        raman|  patna|     30-12-2022|
|          9|       ragini|kolkata|     03-03-2023|
+-----------+-------------+-------+---------------+



5

In [361]:
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "left_anti").show()
customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "left_anti").count()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          3|       nikita|  delhi|     25-06-2023|
|          4|        rahul| ranchi|     24-03-2023|
|          6|     prantosh|kolkata|     18-10-2022|
|          8|      prakash| ranchi|     24-02-2023|
|         10|      raushan| jaipur|     05-02-2023|
+-----------+-------------+-------+---------------+



5

In [362]:
customer_df.crossJoin(sales_df).show()
customer_df.crossJoin(sales_df).count()

                                                                                

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          2|         5|       3|      01-06-2023|
|          1|       manish|  patna|     30-05-2022|          5|        22|       1|      22-03-2023|
|          1|       manish|  patna|     30-05-2022|          7|        22|       4|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          9|         5|       6|      03-03-2023|
|          1|       manish|  patna|     30-05-2022|          2|         1|      12|      15

                                                                                

100

### Strategies In Spark

1. Shuffle sort-merge join => Before joining the table it will sort both the table. All the computation happens in CPU.
2. Shuffle hash join => It will convert the small table in hash table, and this happen in-memory. During joining, it will convert the bigger table id into hash and then it serch in smaller hash table.   
3. Broadcast hash join => This will broadcast the smaller table in all the executor and this process is done by driver. By default broadcast table size is 10 MB, it can be updated using "spark.conf.set('spark.sql.autoBroadcastJoinThreshold', 20455780)" 
4. Cartesian join
5. Broadcast nested loop join 

In [363]:
spark = SparkSession.builder.master("local[5]").appName("SparkSumbittest").getOrCreate()

customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]
customer_schema=['customer_id','customer_name','address','date_of_joining']
customer_df = spark.createDataFrame(data=customer_data, schema=customer_schema)
customer_df.createOrReplaceTempView("customer")

sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]
sales_schema=['customer_id','product_id','quantity','date_of_purchase']
sales_df = spark.createDataFrame(data=sales_data, schema=sales_schema)
sales_df.createOrReplaceTempView("sales")

product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]
product_schema=['product_id','name','price']
product_df = spark.createDataFrame(data=product_data, schema=product_schema)
product_df.createOrReplaceTempView("sales")

sort_merge_df = customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "inner")
sort_merge_df.show()
sort_merge_df.explain()

23/08/08 00:15:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          7|        raman|  patna|     30-12-2022|          7|        22|       4|      03-02-2023|
|          9|       ragini|kolkata|     03-03-2023|          9|         5|       6|      03-03-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15

In [364]:
spark.conf.set("spark.sql.shuffle.partitions", 5)
sort_merge_df = customer_df.join(sales_df, customer_df["customer_id"]==sales_df["customer_id"], "inner")
sort_merge_df.show()
sort_merge_df.explain()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          7|        raman|  patna|     30-12-2022|          7|        22|       4|      03-02-2023|
|          9|       ragini|kolkata|     03-03-2023|          9|         5|       6|      03-03-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15

In [365]:
sort_merge_df = customer_df.join(Fun.broadcast(sales_df), customer_df["customer_id"]==sales_df["customer_id"], "inner")
sort_merge_df.show()
sort_merge_df.explain()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        12|       5|      15-04-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22

### Window Function

Window functions operate on a group of rows, referred to as a window, and calculate a return value for each row based on the group of rows.

1. Row Number => Row Number assign distinct id to each row.
2. Rank => Rows which has same value, it keeps the same id's for that row but skip the number when it repeats.
3. Dense Rank => Rows which has same value, it keeps the same id's for that row. 

Example
| Salary | Row Number | Rank | Dense Rank |
|--------|------------|------|------------|
| 50000  | 1          | 1    | 1          |
| 60000  | 2          | 2    | 2          |
| 70000  | 3          | 3    | 3          |
| 80000  | 4          | 4    | 4          |
| 80000  | 5          | 4    | 4          |
| 80000  | 6          | 4    | 4          |
| 90000  | 7          | 7    | 5          |

In [366]:
from pyspark.sql.window import Window
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,50000,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(7,'adam',37,65000,'us','IT')
]

emp_schema = ["id", "name", "age", "salary", "country", "dept"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

win_main = Window.partitionBy("dept").orderBy(Fun.desc("salary"))
emp_df.withColumn("row_number", Fun.row_number().over(win_main))\
        .withColumn("rank", Fun.rank().over(win_main))\
        .withColumn("dense_rank", Fun.dense_rank().over(win_main))\
        .show(truncate=False)

+---+-------+----+------+-------+-----------+----------+----+----------+
|id |name   |age |salary|country|dept       |row_number|rank|dense_rank|
+---+-------+----+------+-------+-----------+----------+----+----------+
|6  |null   |29  |200000|uk     |IT         |1         |1   |1         |
|7  |adam   |37  |65000 |us     |IT         |2         |2   |2         |
|7  |adam   |37  |65000 |us     |IT         |3         |2   |2         |
|1  |manish |26  |20000 |india  |IT         |4         |4   |3         |
|4  |roshini|44  |50000 |uk     |engineering|1         |1   |1         |
|2  |rahul  |null|40000 |germany|engineering|2         |2   |2         |
|5  |raushan|35  |70000 |india  |sales      |1         |1   |1         |
|3  |pawan  |12  |60000 |india  |sales      |2         |2   |2         |
|8  |chris  |16  |40000 |us     |sales      |3         |3   |3         |
+---+-------+----+------+-------+-----------+----------+----+----------+



In [367]:
emp_df.withColumn("row_number", Fun.row_number().over(win_main))\
        .withColumn("rank", Fun.rank().over(win_main))\
        .withColumn("dense_rank", Fun.dense_rank().over(win_main))\
        .filter(Fun.col("dense_rank")<=2)\
        .show(truncate=False)

+---+-------+----+------+-------+-----------+----------+----+----------+
|id |name   |age |salary|country|dept       |row_number|rank|dense_rank|
+---+-------+----+------+-------+-----------+----------+----+----------+
|6  |null   |29  |200000|uk     |IT         |1         |1   |1         |
|7  |adam   |37  |65000 |us     |IT         |2         |2   |2         |
|7  |adam   |37  |65000 |us     |IT         |3         |2   |2         |
|4  |roshini|44  |50000 |uk     |engineering|1         |1   |1         |
|2  |rahul  |null|40000 |germany|engineering|2         |2   |2         |
|5  |raushan|35  |70000 |india  |sales      |1         |1   |1         |
|3  |pawan  |12  |60000 |india  |sales      |2         |2   |2         |
+---+-------+----+------+-------+-----------+----------+----+----------+



### Lead & Lag