## Import Libraries

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

## start spark session

In [4]:
spark = SparkSession.builder.appName('questions').getOrCreate()

### Create DataFrame

In [15]:
all_players_df = spark.read.format('csv').option('header',True).option('inferSchema',True).load('./csv/IPL_PLAYERS.csv')
unsold_payers_df = spark.read.format('csv').option('header',True).option('inferSchema',True).load('./csv/UNSOLD_PLAYERS.csv')
top_buys_df = spark.read.format('csv').option('header',True).option('inferSchema',True).load('./csv/TOP_BUYS.csv')
all_players_df.show(5)
unsold_payers_df.show(5)
top_buys_df.show(5)

+--------------------+-----------+-------------+----------+----+
|             PLAYERS|NATIONALITY|         TYPE|PRICE PAID|TEAM|
+--------------------+-----------+-------------+----------+----+
|Avanish Rao Aravelly|     Indian|Wicket-Keeper|   2000000| CSK|
|   Mustafizur Rahman|   Overseas|       Bowler|  20000000| CSK|
|      Daryl Mitchell|   Overseas|  All-Rounder| 140000000| CSK|
|        Sameer Rizvi|     Indian|       Batter|  84000000| CSK|
|     Rachin Ravindra|   Overseas|  All-Rounder|  18000000| CSK|
+--------------------+-----------+-------------+----------+----+
only showing top 5 rows

+---------------+-----------+-----------+----------+
|         PLAYER|NATIONALITY|       TYPE|BASE PRICE|
+---------------+-----------+-----------+----------+
|  Priyansh Arya|     Indian|     Batter|   2000000|
|Rohan Kunnummal|     Indian|     Batter|   2000000|
|    Manan Vohra|     Indian|     Batter|   2000000|
| Raj Angad Bawa|     Indian|All-Rounder|   2000000|
|  Sarfaraz Khan|  

#### Q1. Create a dataset from this having origin and destination only

In [33]:
flight_df = spark.read.format('csv').option('header',True).option('inferSchema',True).load('./csv/flight_data.csv')

# Method 1
# flight_df.show()
flight_df_1 = flight_df.withColumn('row_num',row_number().over(Window.partitionBy(col('cust_id')).orderBy(col('cust_id'))))
# flight_df.show()
flight_df_1 = flight_df_1.groupBy(col('cust_id')).agg(first(col('origin')).alias('Source'),last(col('destination')).alias('Destination'))
flight_df_1.show()

# Method 2



+-------+---------+-----------+
|cust_id|   Source|Destination|
+-------+---------+-----------+
|      1|    Delhi|  Mangalore|
|      2|   Mumbai|  Gorakhpur|
|      3|  Chennai|        Goa|
|      4|  Kolkata|  Ahmedabad|
|      5|Hyderabad|Bhubaneswar|
|      6|Bengaluru|     Jaipur|
|      7|  Lucknow|    Kolkata|
|      8|   Indore|     Bhopal|
+-------+---------+-----------+



## Interview Questions

1. Difference between client and cluster mode

2. what is partition skew, reasons for it. How to solve partition skew issues?

3. what is a broadcast join in apache spark

4. what is the difference between partition and bucketing

5. What are different types of joins in Spark

6. why count when used with group by is a transformation else its an action.

7. If your spark job is running slow how would you approach to debug it.

8. Difference between managed table & external table. When do you go about creating exernal tables.

9. Why we are not using mapreduce these days. what are similarities between spark and mapReduce.

10. How do you handle your pyspark code deployment, Explain about the CICD process.

11. Have you used caching in your project, when & where do you consider using it.

12. how to estimate the amount of resources for your spark job.

13. difference between narrow and wide transformation

14. difference between dataframe and dataset

15. If some job failed with out of memory error in production, what will be you approach to debug that

16. what is DAG & how that helps.

17. which version control do you use

18. how do you test your spark code

19. what is shuffling, why we should think about minimizing it.

20. if 199/200 partitions are getting executed but after 1 hour you are getting error. What things you will do?

21. How spark achieves fault tolerance?

22. What is lazy evaluation in spark?

23. What is the difference between lineage and dag?

24. What is difference between Persist() and cache()

25. How is spark sql different from HQL?

26. How to join two different tables using Dataframes?

27. How to tune spark executor and executor memory?

28. Explain about dynamic allocation in spark?

29. Why dataset is preferred compared to dataframe?

30. why spark over map-reduce?

31. What are the different mode in spark?

32. What is Map and FlatMap operation in spark?

33. What are the challenges you face in spark?

34. What is rdd lineage?

35. Major issues faced in spark development?

36. Optimizations technique in spark?

37. What is difference between reduceByKey and GroupByKey

38. How will you join two bigger table in spark?

39. What is difference between repartition and Coalesce?

40. What is checkpointing in spark?

41. What's the difference between an RDD, a DataFrame, and a DataSet?

42. How can you create a DataFrame a) using existing RDD, and b) from a CSV file?

43. Explain the use of StructType and StructField classes in PySpark with examples.

44. What are the different ways to handle row duplication in a PySpark DataFrame?

45. Explain PySpark UDF with the help of an example.

46. Discuss the map() transformation in PySpark 
DataFrame with the help of an example.

47. What do you mean by ‘joins’ in PySpark DataFrame? What are the different types of joins?

48. What is PySpark ArrayType? Explain with an example.

49. What do you understand by PySpark Partition?

50. What is meant by PySpark MapType? How can you create a MapType using StructType?

# Solutions

## Q1. Differenece between client and cluster mode ?

### Solution

In client mode

lets say user comes submit the job using spark submit comes to edge node (gateway to the clusters) and executed spark submit job in client mode , driver program will launch to the edge node itself which is a part of the cluster and this driver program will spawn different executors on different nodes of the cluster. So, driver program will be occupying resources (memory,cpu) of the edge node. 
lets say another user comes and submit another job . another driver program will launch and occupy resources of the edge nodes.

Client Mode:

* In client mode, the driver program runs on the machine where you submit the Spark application. 
* It communicates with the cluster manager to request resources and execute tasks on the worker nodes.
* The SparkContext is initialized on the client machine, and the driver program coordinates the execution of tasks on the cluster.
* This mode is often used for development and debugging, allowing you to interactively examine results and debug your application more easily.

Cluster Mode:

* In cluster mode, the driver program runs on one of the nodes in the Spark cluster. 
* The SparkContext is initialized on that node, and the driver program is responsible for managing the execution of the application.
* The driver program typically runs on the master node, and the Spark workers run on the worker nodes.
* This mode is suitable for large-scale production deployments where the Spark application is submitted to a cluster manager (like Apache YARN, Apache Mesos, or Spark's standalone cluster manager).





spark submit -> submit the spark job -> option to specify deploy mode(client or cluster) -> 

after job submit -> driver program spawned (drives complete spark jobs)  -> executor program spawned -> spwaned on different nodes (data processing is done here and controlled by driver program)

## Question 2. what is partition skew, reasons for it. How to solve partition skew issues?


### Solution
It occurs when the distribution of data across the partitions in uneven, leading to some partitions having much larger data size then the other partitions. Example: a single partition handles maximum amount of data which lead to skewness. 

reason  -  mainly happens after wide transformation

solve - 
1. salting - if a key is occuring multiple times add some random number to  it, one key splitting into multiple keys. (complex process)
2. AQE


## Question 3. what is a broadcast join in apache spark

### Solution -
sometimes also called map side join
whenever we have one small table and one  large table you can use broadcast join
small table can be broadcasted across all the executors 
adv -  no shuffling of the data

## Question 4. what is the difference between partition and bucketing

### Solution
partitioning - both are strategis to structure data so that you can read some data and skip most. 

when we have a column with less number of distinct column then we can use partitioning
folders created on logic

bucketing - files created based on hash function. also helps in join operation when joining two large tables. 

## Question 5. What are the different types of joins in Spark ?

### Solution
1. broadcast hash join - one small table and one large table - no shuffling involved
2. shuffle hash join - slight optimization of later - one medium and one large table - from medium table hash is created so no sorting is involved.
3. shuffle sort merge join - 2 large tables 

## Question 6. why count when used with group by is a transformation else its an action.

### Solution 
df.count() -  action

df.groupBy().count() -  transformation further calculation is possible

## Question 7. If your spark job is running slow how would you approach to debug it.

### Solution 
1. check spark UI for your slow tasks , enable AQE for handling partition skew
2. optimize join strategies , consider broadcast join for small datasets
3. ensure sufficient resources are allocated for your job
4. verify number of dataframe partitions / change number of shuffle partition if required
5. mitigate garbage collection delays by giving some off heap memory
6. monitor disk spills and allocate more memory per cpu core if needed
7. opt for hash aggregation over sort aggregate when aggregating
8. implement caching
9. choose right file format and fast compressing techniques

## Question 8. Difference between managed and external tables. when do you go about creating external tables ?

### Solution
table -  data + metadata

managed -  data and metadata managed by spark
external -  data is external but metadata is managed

when you drop managed table both data and metadata get dropped. 
when you drop external table only metadata gets dropped,

### Question 9. Why we are not using mapreduce these days. what are similarities between spark and mapReduce.

### Solution
limitation of map reduce
1. slow
2. heavy use of disk , writes on disk
3. can be only written in java , have to write a lot of code

similarities
1. both are distributing engine

spark is faster, generic , in memory

### Question 13. What are the differences between narrow and wide transformation ?

### Solution 

Narrow Transformation :-     Narrow transformations are the result of map and filter functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations.Functions such as `map()`, `mapPartition()`, `flatMap()`, `filter()`, `union()` are some examples of narrow transformation



Wide Transformation :-  Wide transformations are the result of groupByKey and reduceByKey functions and these compute data that live on many partitions meaning there will be data movements between partitions to execute wide transformations. Since these shuffles the data, they also called shuffle transformations.





Spark Context serves as an entry point to spark, manages cluster resources, and spark configurations



Spark Architecture ->

it has master-slave architecture
 consists of driver and executors which run as master and worker node respectively.
 we have spark context 
 cluster manager used for allocation of resources 



when the spark program is submitted the driver program in the master node initializes the spark context

## you need to join two large dataset but the join operation causing out of memory error. what strategy would you use to optimize this join

1. Broadcast Join
If one of the datasets is small enough to fit into memory, you can use a broadcast join. This avoids shuffling the large dataset and improves performance.
2. Increase Shuffle Partitions
Increasing the number of shuffle partitions can help distribute the data more evenly across the cluster, reducing the memory load on individual nodes
3. Repartition Before Join
Repartitioning the datasets before joining can ensure better data distribution and reduce skew
4. Use Bucketing
If you perform the same join multiple times, consider using bucketing. This can reduce the shuffle overhead.
5. Filter Early
Apply filters to reduce the amount of data before performing the join.
6. Adjust Spark Configuration
Tuning Spark configuration parameters can help manage memory usage better.