<a href="https://colab.research.google.com/github/rganesh203/Pyspark/blob/main/PySpark_Scenario_Based.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#1. How to remove double quotes from a pyspark dataframe column using regex

In [None]:
cols = ['col1','col2']
jsonString='{"Zipcode":"74","ZipCodeType":"STANDA"RD","City":"PARC PARQUE","State":"PR"}'

rows = [(1,'abcd'),(2,jsonString)]

df = spark.createDataFrame(rows,cols)

df = df.withColumn("col3",split(col("col2"),'"ZipCodeType":"')[0])
df = df.withColumn("col4",lit('"ZipCodeType":"'))
df = df.withColumn("col5",split(col("col2"),'"ZipCodeType":"')[1])
df = df.withColumn("col5",concat_ws("",split(split(col("col2"),'"ZipCodeType":"')[1],'"',2)))

df = df.withColumn("col6", concat(col("col3"),col("col4"),col("col5"))).select(col("col2"),col("col6"))
display(df)

#2. PySpark Check if Column Exists in DataFrame

In [None]:
#Method1
listColumns=df.schema.fieldNames()
if fieldNames.count('id')>0:
    print('id column is present')
else:
    print('id column is  not present')
"colum_name"  in listColumns

In [None]:
#Method2

listColumns=df.columns
"colum_name"  in listColumns

#3. Convert PySpark DataFrame to Pandas

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)


In [None]:

pandasDF = pysparkDF.toPandas()
print(pandasDF)


#4. Different ways to apply function on Column in Dataframe using PySpark

In [None]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

# Apply function using withColumn
from pyspark.sql.functions import upper
df.withColumn("Upper_Name", upper(df.Name)) \
  .show()

# Apply function using select
df.select("Seqno","Name", upper(df.Name)) \
  .show()

# Apply function using sql()
df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB") \
     .show()

# Create custom function
def upperCase(str):
    return str.upper()

# Convert function to udf
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
upperCaseUDF = udf(lambda x:upperCase(x),StringType())

# Custom UDF with withColumn()
df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)

# Custom UDF with select()
df.select(col("Seqno"), \
    upperCaseUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

# Custom UDF with sql()
spark.udf.register("upperCaseUDF", upperCaseUDF)
df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB") \
     .show()


you have learned how to apply a built-in function to a PySpark column by using withColumn(), select() and spark.sql(). Also learned how to create a custom UDF function and apply this function to the column.

#5. printSchema() to string or json in PySpark

In [None]:
# Import
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

# Create DataFrame
columns = ["language","fee"]
data = [("Java", 20000), ("Python", 10000), ("Scala", 10000)]

df = spark.createDataFrame(data).toDF(*columns)
df.printSchema()

In [None]:
a=df.schema.simpleString()
print(a)
b=df.schema.json()
print(b)

#6. How to Write Dataframe as single file with specific name in PySpark

Write a Single file using Spark coalesce() & repartition()
When you are ready to write a DataFrame, first use Spark repartition() and coalesce() to merge data from all partitions into a single partition and then save it to a file.

In [None]:
from pyspark.sql import SparkSession


def write_csv_with_specific_file_name(sc, df, path, filename):
    file_format =
    df.repartition(1).write.option("header", "true").format(file_format).save(path)
    try:
        sc_uri = sc._gateway.jvm.java.net.URI
        sc_path = sc._gateway.jvm.org.apache.hadoop.fs.Path
        file_system = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
        configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
        fs = file_system.get(sc_uri("hdfs://{HDFS_IP/NAME}"), configuration())
        src_path = None
        status = fs.listStatus(sc_path(path))
        for fileStatus in status:
            temp = fileStatus.getPath().toString()
            if "part" in temp:
                src_path = sc_path(temp)
        dest_path = sc_path(path + filename)
        if fs.exists(src_path) and fs.isFile(src_path):
            fs.rename(src_path, dest_path)
            fs.delete(src_path, True)
    except Exception as e:
        raise Exception("Error renaming the part file to {}:".format(filename, e))


def create_dataframe(spark):
    simple_data = [("James", "Sales", "NY", 90000),
                   ("Michael", "Sales", "NY", 86000),
                   ("Robert", "Sales", "CA", 81000),
                   ("Maria", "Finance", "CA", 90000),
                   ("Raman", "Finance", "CA", 99000),
                   ("Scott", "Finance", "NY", 83000),
                   ("Jen", "Finance", "NY", 79000),
                   ("Jeff", "Marketing", "CA", 80000),
                   ("Kumar", "Marketing", "NY", 91000)
                   ]

    schema = ["employee_name", "department", "state", "salary"]
    return spark.createDataFrame(data=simple_data, schema=schema)


if __name__ == '__main__':
    spark = SparkSession.builder.getOrCreate()
    df = create_dataframe(spark)
    write_csv_with_specific_file_name(spark.sparkContext, df, "hdfs://cluster_name/path/to/destination", "/keep_this_file_name.csv")

#7.multiple delimiters pyspark

In [None]:
from pyspark.sql import SparkSession,types
spark= SparkSession.builder.master("local").appName('multiple_delimiter').getOrCreate()
df=spark.read.text('D:\python_coding\pyspark_tutorial\multiple_delimiter.csv')
df.show(truncate=0)

In [None]:
header=df.first([0])
header

In [None]:
schema=header.split('~|')
schema

In [None]:
df_1df.filter(df['value']!= header).rdd.map(lambda x: x[0].split('~|')).toDF(schema)

In [None]:
df_1.show(truncate=0)

#8. Merge DataFrame in Spark

In [None]:
df1=spark.read.option("delimeter","|").csv('input.csv')
df1.show()

In [None]:
df2=spark.read.option("delimeter","|").csv('input2.csv',header=True)
df2.show()

In [None]:
#add new_column in df1
from pyspark.sql.functions import lit
df_add=df.withColumn("bonus_percent", lit('null'))
df_add.show()

In [None]:
#Method1
df_add.union(df2).show()

In [None]:
#method2
from pyspark.sql.types import *
schema = StructType(
    [
    StructField("name",StringType(),True),
    StructField("age",StringType(),True),
    StructField("gender", StringType(), True),
  ]
  )

In [None]:
df3=spark.read.option("delimeter","|").csv('input.csv',header=True,schema=schema)
df4=spark.read.option("delimeter","|").csv('input2.csv',header=True,schema=schema)
df3.union(df4).show()

In [None]:
#outer join
f_df=spark.read.option("delimeter","|").csv('input.csv',header=True)
s_df=spark.read.option("delimeter","|").csv('input2.csv',header=True)
df_ot=f_df.join(s_df,on=['name','age'], how="Outer")
df_ot.show()

In [None]:
#method4

In [None]:
df1=spark.read.option("delimeter","|").csv('input.csv',header=True)
df2=spark.read.option("delimeter","|").csv('input2.csv',header=True)

In [None]:
lista=list(set(df1.columns)-set(df2.columns))
listb=list(set(df2.columns)-set(df1.columns))

In [None]:
for i in lista:
    df2=df2.withcolumn(i,lit("null"))
for i in lista:
    df1=df1.withcolumn(i,lit("null"))

In [None]:
df1.union(df2).show()

#9. PySpark Out of Memory Issue

OutOfMemory error can occur here due to incorrect usage of Spark. The driver in the Spark architecture is only supposed to be an orchestrator and is therefore provided less memory than the executors. You should always be aware of what operations or tasks are loaded to your driver.

1.Driver OOM
2.Executer OOM

In [None]:
Driver Memory Issues
    collect
    Broadcast Huge Data
Executer Memory Issues
    Big Partition
    Yarn Memory Overhead
    High Concurrency

#10. Spark Executor Tuning | Decide Number Of Executors and Memory

Resources we have
    6 Machines
    16 cores (Total cores in cluster 16*6=96)
    64 GB Ram/Machine

our options
    smalle size executore
    biggest size executors

smallest executors
11 core and 4 GB Ram per executors
16 executors on each machine
but we are not using executors cores


Largest executor
    6 executor
    64 GB ram per executor
    16 cores per executor
    IO contention
    no resources for OS
    No memry overhead for YARN.
    


Correct way
executors and executor core
    -no of cores: 96-no of machines=90
    -no of cores per machine: 90/6=15
    -no of executors cores:5
    -so no executors per machine:15/5=3
memory:
    -available per machine:63GB
    -available per executor: 63/321 GB
    YARN over Haed: 2GB
    per Executor Memory: 19 GB

#11. Avro vs Parquet

Avro differs from ORC and Parquet in that it uses a row-based, rather than column-based storage configuration.

Avro uses JSON for defining data types and protocols so it's easy to read and interpret.

#12.partitioning vs bucketing

 Partitioning helps in elimination of data, if used in WHERE clause, where as bucketing helps in organizing data in each partition into multiple files, so as same set of data is always written in same bucket.

#13. Dealing with Date in PySpark

In [None]:
import finsaprk
findspark.init()


In [None]:
from pyspark.sql import SparkSession,types
spark= SparkSession.builder.master("local").appName('date demo').getOrCreate()
df=spark.read.option("header","true").option("inferSchema","true").csv("dem13.csv")
df.printSchema()

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import date_add,to_date,col,expr
df.select(to_date(col("ReachargeDate").cast("string","yyyyMMdd"))).show()

In [None]:
df_s=df.witcolumn("date_s",to_date(col("ReachargeDate").cast("string","yyyyMMdd"))

In [None]:
df_s.printSchema()

In [None]:
df_s.select('*',expr("date_add(date_s,remaining_daye)"))

In [None]:
#sort by vs order by

#14. difference between sort by and order by
Order By and Sort By both are not same in sql. Order by will do sorting an entire data. sort by will do partition wise sorting
orderBy and sort both are same in pyspark. sortWintinPartitions as same as sort By in sql.

#15. Bad Records Handling in databricks

In [None]:
#In Databricks, Bad Records Handling refers to the process of dealing with invalid or erroneous data encountered during ETL (Extract, Transform, Load) or data processing pipelines. When working with large datasets and complex data transformations, it is common to encounter bad records, which are data rows that do not conform to the expected schema or fail certain validation rules.

#Databricks provides several approaches to handle bad records during data processing:

#Fail Fast:

#This is the default behavior in Databricks. If any bad record is encountered during processing, the job fails immediately, and an error message is generated. This approach is suitable when you want to be alerted immediately about data issues.
#Drop Malformed Rows:

#With this approach, you can instruct Databricks to drop the rows that contain bad data, and the data processing continues for the rest of the valid records. This is achieved by using the .option("mode", "DROPMALFORMED") option when reading data.
#Permissive Mode:

#Permissive mode is an option that allows Databricks to process bad records while attempting to infer the schema automatically. Invalid records are converted to null or other default values according to the data type. This is useful when dealing with semi-structured or messy data where the schema might not be well-defined.
#Custom Schema and Error Handling:

#Databricks allows you to provide a custom schema when reading data. By specifying a custom schema, you can control the data types and handling of bad records explicitly. Additionally, you can use the .onCorrupt option to define a custom action or error handling logic for dealing with corrupt or bad records.
#Here's an example of how to use the option and onCorrupt options in Databricks:

# Using Drop Malformed Rows
df = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load("path_to_data")

# Using Permissive Mode
df = spark.read.format("csv").option("header", "true").option("mode", "PERMISSIVE").load("path_to_data")

# Using Custom Schema and Error Handling
from pyspark.sql.types import StructType, StringType, IntegerType

customSchema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    # Add more fields as per your data schema
])

df = spark.read.format("csv").option("header", "true").option("mode", "PERMISSIVE").schema(customSchema).load("path_to_data")
#By choosing the appropriate approach for bad records handling in Databricks, you can ensure that your data processing pipelines are robust, and the output is of high quality, even when dealing with imperfect data.

#16. cache and persist

In [None]:

'''In PySpark, both cache and persist are used to persist intermediate DataFrame or RDD (Resilient Distributed Dataset) results in memory or on
disk to avoid recomputation and improve performance during iterative or interactive data processing tasks.

cache is a shorthand for persist() with the default storage level MEMORY_AND_DISK.
When you use cache, the DataFrame or RDD is stored in memory as well as on disk, making it readily available for quick access in subsequent operations.
The data is stored in memory and spilled to disk if the memory is not sufficient to hold the entire dataset.
The storage level can be explicitly specified using the StorageLevel argument.
Example:'''

# Caching a DataFrame with the default storage level (MEMORY_AND_DISK)
df.cache()
'''persist:
persist allows you to specify the storage level explicitly.
You can choose from various storage levels based on your specific use case and available resources, such as MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY,
OFF_HEAP, etc.
Using persist, you have more control over where and how the data is stored.
Example:'''

from pyspark.storagelevel import StorageLevel

# Persisting a DataFrame with MEMORY_ONLY storage level
df.persist(StorageLevel.MEMORY_ONLY)
#Which one to use?

'''If you want a simple and quick way to cache a DataFrame in memory with a default storage level, cache is a convenient choice.
If you need to customize the storage level or choose a specific storage strategy (e.g., storing on disk only or off-heap), you can use persist.
Considerations:

Be cautious while using cache or persist as they consume memory resources. If you cache or persist too many DataFrames or RDDs, it may lead to excessive
memory usage and eviction of other important data from memory.
Make sure to unpersist DataFrames or RDDs that are no longer needed to free up memory.
In summary, both cache and persist are used for data persistence in PySpark, with cache being a convenient shorthand for the most common use case. Use
persist when you need to customize the storage level or storage strategy based on your specific data processing requirements. Always be mindful of memory
usage and free up unnecessary persisted data to optimize memory utilization.'''

10 trending questions asked in Apache Spark interviews

1. how are initial number of partitions calculated in a dataframe

2. what happens internally when you execute spark-submit

3. what is a partition skew and how to tackle it

4. what are the spark optimization techniques you have used

5. what is a broadcast join, how does it work internally

6. how do you optimize 2 large table joins

7. please explain about memory management in apache spark

8. what is caching in spark, and when do you consider caching a dataframe

9. how do you handle out of memory errors in spark

10. what is the difference between partitioning and bucketing, please explain with a usecase


In Apache Spark, the initial number of partitions in a DataFrame is determined during the data loading phase when the DataFrame is created from an external data source, such as reading data from a file or connecting to a database. The number of partitions is influenced by various factors and can vary based on the data source and configuration.

Here are some factors that can affect the initial number of partitions:

Default Parallelism: By default, Spark tries to set the number of partitions equal to the number of cores available on the cluster. This is known as the default parallelism.

Input Data Size: The size of the data being loaded can also impact the number of partitions. If the data size is significant, Spark may create more partitions to parallelize data processing and leverage the cluster's resources effectively.

Block Size (for HDFS): If you are reading data from HDFS (Hadoop Distributed File System), the HDFS block size can influence the number of partitions. The default block size is typically 128 MB, and Spark tries to create partitions that align with these blocks to improve data locality.

Configuration Settings: You can explicitly set the number of partitions using configuration settings in Spark. For example, when reading data from a file, you can specify the numPartitions parameter to control the number of partitions.

Example:

python
Copy code
# Setting the number of partitions explicitly when reading a CSV file
df = spark.read.format("csv").option("header", "true").load("file.csv", numPartitions=10)
Data Source Specifics: Different data sources may have their own strategies for determining the initial number of partitions. For instance, when reading data from a database, the JDBC driver might provide a default value for the number of partitions.

Spark Version: The behavior of partitioning can also change with different Spark versions, as the underlying code and optimizations evolve.

Keep in mind that the initial number of partitions is just a starting point, and Spark can dynamically repartition or coalesce data during transformations, depending on the operations you perform on the DataFrame. For example, when using operations like groupBy, join, or repartition, Spark can shuffle and repartition the data to optimize data processing.

You can always check the current number of partitions in a DataFrame using the getNumPartitions() method:

python
Copy code
num_partitions = df.rdd.getNumPartitions()
Overall, the initial number of partitions in a DataFrame is a dynamic process influenced by various factors, but it is essential to understand and consider it when working with large datasets and aiming for optimal data processing performance.