# **Functions in pyspark**

In [14]:
import findspark
findspark.init()


In [15]:
from pyspark.sql import SparkSession

In [16]:
spark=SparkSession.builder.config('spark.driver.host','localhost').getOrCreate()

In [17]:
df=spark.read.format('csv').options(header=True, inferschema=True,sep=',').load('data_for_files/data.csv')

In [6]:
df.show(6)

+------------+-------------------+-------------------+--------------------+-------------------------+------------------------+-----------------------------+-----------------------------+--------------------+--------------------+-------+
|collision_id|         crash_date|         crash_time|      on_street_name|number_of_persons_injured|number_of_persons_killed|contributing_factor_vehicle_1|contributing_factor_vehicle_2|  vehicle_type_code1|  vehicle_type_code2|borough|
+------------+-------------------+-------------------+--------------------+-------------------------+------------------------+-----------------------------+-----------------------------+--------------------+--------------------+-------+
|     4456867|2021-09-02 00:00:00|2024-02-21 19:56:00|MAJOR DEEGAN EXPR...|                        0|                       0|                  Unspecified|                         null|               Sedan|                null|   null|
|     4456988|2021-09-11 00:00:00|2024-02-21 15:45:0

### **Explode**

It is a pyspark function that returns a new row for each element in the given array or map. use the default column name col for elements in the array and values for elements in the map. if the array or map is null, that row is eliminated. entire value should be null.

array -\
If a column have array datatype and contain more than one value than each element is converted into row.

map-\
when a map is passed, it create two new columns one for key and other for value and each element in mapsplit into rows. 

*   **exploded**
    *   if array or map is null than it eliminate that row, the entire value should be null than only it will not include that row.Null if null value is present in array with other element than it include that row.
*   **Explode_outer** 
    *   unlike explode if the array or map has null value than it return null value.
*   **Posexplode**
    *   when array or map is passed it create position column which contain the index of element in the array or map . It ignore the null elements.
*   **posexplode_outer**
    *   unlike posexplode, if the array or map is null, explode_outer returns null


Note - In PySpark, you can use the explode() function on columns that contain array types, not strings directly. If your column contains string values that are formatted as arrays, you can first parse those strings into arrays using functions like split() or regexp_extract(), and then apply explode() on the resulting array column.

**Example for array type**

In [7]:

from pyspark.sql.functions import posexplode


# Sample array data
data = [(1, [10, 20, None,30]), (2, [40, 50]),(3,None)]

# Define the schema
schema = ["id", "array_data"]

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)

**Explode**

In [8]:
from pyspark.sql.functions import explode
df.show()
df.select("id",explode("array_data").alias("array_col")).show()

+---+------------------+
| id|        array_data|
+---+------------------+
|  1|[10, 20, null, 30]|
|  2|          [40, 50]|
|  3|              null|
+---+------------------+

+---+---------+
| id|array_col|
+---+---------+
|  1|       10|
|  1|       20|
|  1|     null|
|  1|       30|
|  2|       40|
|  2|       50|
+---+---------+



In ablove example you can see (3,None) this row is eliminated as the column on which we apply explode has null value. the row which has null value in explode column that is removed.

**explode_outer**

In [9]:
from pyspark.sql.functions import explode_outer
df.show()
df.select("id",explode_outer("array_data").alias("array_data")).show()

+---+------------------+
| id|        array_data|
+---+------------------+
|  1|[10, 20, null, 30]|
|  2|          [40, 50]|
|  3|              null|
+---+------------------+

+---+----------+
| id|array_data|
+---+----------+
|  1|        10|
|  1|        20|
|  1|      null|
|  1|        30|
|  2|        40|
|  2|        50|
|  3|      null|
+---+----------+



Here you can see the row (3,None) is included.

**posexplode**

In [10]:


df.show()

# Use posexplode to explode the array
exploded_df = df.select("id", posexplode("array_data").alias("position", "element"))

# Show the result
exploded_df.show()


+---+------------------+
| id|        array_data|
+---+------------------+
|  1|[10, 20, null, 30]|
|  2|          [40, 50]|
|  3|              null|
+---+------------------+

+---+--------+-------+
| id|position|element|
+---+--------+-------+
|  1|       0|     10|
|  1|       1|     20|
|  1|       2|   null|
|  1|       3|     30|
|  2|       0|     40|
|  2|       1|     50|
+---+--------+-------+



**posexplode_outer()**

In [11]:
from pyspark.sql.functions import posexplode_outer
df.select("id",posexplode_outer("array_data").alias("position","array_data")).show()

+---+--------+----------+
| id|position|array_data|
+---+--------+----------+
|  1|       0|        10|
|  1|       1|        20|
|  1|       2|      null|
|  1|       3|        30|
|  2|       0|        40|
|  2|       1|        50|
|  3|    null|      null|
+---+--------+----------+



**Example for map type**

In map one column is created for keys and other for values or exploded values.

In [12]:
# Sample data with a map column
data = [(1, {"A": 10, "B": 20}), (2, {"C": 30, "D": 40,"F":None}),(3,None)]

# Define the schema
schema = ["id", "map_data"]

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)


In [13]:
# Use explode to explode the map
df.show()
df.select("id", explode("map_data").alias("key", "value")).show()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Program Files\spark-3.4.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Program Files\spark-3.4.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\itsrk\.pyenv\pyenv-win\versions\3.9.13\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# use of posexplode()
from pyspark.sql.functions import explode_outer
df.show()
df.select("id",explode_outer("map_data").alias("key","value")).show()

+---+--------------------+
| id|            map_data|
+---+--------------------+
|  1|  {A -> 10, B -> 20}|
|  2|{C -> 30, D -> 40...|
|  3|                null|
+---+--------------------+

+---+----+-----+
| id| key|value|
+---+----+-----+
|  1|   A|   10|
|  1|   B|   20|
|  2|   C|   30|
|  2|   D|   40|
|  2|   F| null|
|  3|null| null|
+---+----+-----+



In [None]:
# use of posexplode()
from pyspark.sql.functions import posexplode
df.show()
df.select("id",posexplode("map_data").alias("posiition","key","value")).show()

+---+--------------------+
| id|            map_data|
+---+--------------------+
|  1|  {A -> 10, B -> 20}|
|  2|{C -> 30, D -> 40...|
|  3|                null|
+---+--------------------+

+---+---------+---+-----+
| id|posiition|key|value|
+---+---------+---+-----+
|  1|        0|  A|   10|
|  1|        1|  B|   20|
|  2|        0|  C|   30|
|  2|        1|  D|   40|
|  2|        2|  F| null|
+---+---------+---+-----+



In [None]:
# this is to get the data having id=3 where column have zero value.
df=df.select("id",explode_outer("map_data").alias("key","value"))



### **when otherwise case functions**

*  **case**
   *  its similar to if,elif,else -> when otherwise
   *  To evaluate a list of conditions and choose a result path according to the matching condition, when(),otherwise() function in python can be used.
   *  this is similar to case or switch statement in other programing language.
   *  when no condition is matched than otherwise() result path is exectued.


**syntax**

>df.withColumn(\
>    "new column",\
>     when(condition , result1)\
>    .when(condition, result2)\
>    .otherwise(result))

when(condition,result)
*   condition - the condition that is checked.
*   result - if condition is true than return the result

<br>

>df.withColumn("new column",\
>    expr("CASE when condition THEN result1 +\
>    "WHEN condition2 THEN result2 +\
>    "WHEN condition THEN result3" +\
>    "Else result END"))

To combine multile condition: use & (and) , | (or) condition within the when clasue.

df.withColumn("new_col",when((con1) & (cond2), result1).when((con1)|(con2), result2).otherwise(result3))
*   (con1) & (cond2) - of both the consition is true than only rsult1 will be output.
*   (con1)|(con2) any or both of the condition should be true than only result 2 will be output.

In [None]:
df.show()

+---+----+-----+
| id| key|value|
+---+----+-----+
|  1|   A|   10|
|  1|   B|   20|
|  2|   C|   30|
|  2|   D|   40|
|  2|   F| null|
|  3|null| null|
+---+----+-----+



In [None]:
from pyspark.sql.functions import when
df.withColumn("new col", when(df.id==1,"one").when(df.id==3,"three").otherwise("other")).show()

+---+----+-----+-------+
| id| key|value|new col|
+---+----+-----+-------+
|  1|   A|   10|    one|
|  1|   B|   20|    one|
|  2|   C|   30|  other|
|  2|   D|   40|  other|
|  2|   F| null|  other|
|  3|null| null|  three|
+---+----+-----+-------+



In [None]:
from pyspark.sql.functions import isnull
df.withColumn("new col", when(df.value<20,"low").when(df.value<=30,"mid").when(isnull("value"),"No record").otherwise("high")).show()

+---+----+-----+---------+
| id| key|value|  new col|
+---+----+-----+---------+
|  1|   A|   10|      low|
|  1|   B|   20|      mid|
|  2|   C|   30|      mid|
|  2|   D|   40|     high|
|  2|   F| null|No record|
|  3|null| null|No record|
+---+----+-----+---------+



### **union and union all**

* To combine two dataframe, the scehma of both the dataframe should match (number of column datatype of respective column should be same). 

* union - in this duplicate value are removed or duplicate value are not included in the dataframe.Remove the duplicate record from resultant dataframe untill spark version 2.0.0. so duplicate can be removed manually by dropDuplicates()
>  * df1.union(df2)
  * This will merge the record of df1 and df2.
* unionall - in this duplicate value are also included.
>  * df1.unionall(df2)

Note:
* if version of spark is greater than 2.0.0 than union and unionAll will give same result and to remove the duplicate we need to use dropDuplicate() function.

if there is mismatch on column number than using select() we can select specific columns which are present in other df also and than we can do union betweent two df.

In [None]:
df.union(df).show()
df.unionAll(df).show()

+---+----+-----+
| id| key|value|
+---+----+-----+
|  1|   A|   10|
|  1|   B|   20|
|  2|   C|   30|
|  2|   D|   40|
|  2|   F| null|
|  3|null| null|
|  1|   A|   10|
|  1|   B|   20|
|  2|   C|   30|
|  2|   D|   40|
|  2|   F| null|
|  3|null| null|
+---+----+-----+

+---+----+-----+
| id| key|value|
+---+----+-----+
|  1|   A|   10|
|  1|   B|   20|
|  2|   C|   30|
|  2|   D|   40|
|  2|   F| null|
|  3|null| null|
|  1|   A|   10|
|  1|   B|   20|
|  2|   C|   30|
|  2|   D|   40|
|  2|   F| null|
|  3|null| null|
+---+----+-----+



In [None]:
zdf=df.union(df)

### **dropDuplicate**

In [None]:
zdf.dropDuplicates().show()

+---+----+-----+
| id| key|value|
+---+----+-----+
|  1|   A|   10|
|  1|   B|   20|
|  2|   D|   40|
|  2|   C|   30|
|  2|   F| null|
|  3|null| null|
+---+----+-----+



**Drop duplicates based on a subset of columns**\
> distinct_subset_df = df.dropDuplicates(subset=["id"])

will drop the record if duplicate value present in id column of z dataframe.

In [None]:
zdf.dropDuplicates(subset=["id"]).show()

+---+----+-----+
| id| key|value|
+---+----+-----+
|  1|   A|   10|
|  2|   C|   30|
|  3|null| null|
+---+----+-----+



### **Pivot and unpivot**

**Pivot**
*   use to transpose the list of values of a column to column.

Pivoting in the context of data transformation refers to the process of rotating or transposing rows into columns, typically to summarize and aggregate data. It involves selecting one column whose unique values will become the new column headers in the output, while another column's values are aggregated across these new columns. Pivoting is useful for reshaping data to make it more readable or suitable for analysis.\

For example, if you have a table with student names, subjects, and scores, you can pivot the table to have student names as rows, subjects as columns, and scores as the intersection of rows and columns.



**Unpivot**
*   Transposing the columns into list of values to a column.

In [None]:
pivo_data= [("a","Q1",6000), ("x","Q1",5000), ("a","Q2",8000), ("x","Q2",7000)]
schema=["shop","Quater","revenue"]
df_pivo= spark.createDataFrame(data=pivo_data,schema=schema)

In [None]:
df_pivo.show()

+----+------+-------+
|shop|Quater|revenue|
+----+------+-------+
|   a|    Q1|   6000|
|   x|    Q1|   5000|
|   a|    Q2|   8000|
|   x|    Q2|   7000|
+----+------+-------+



In [None]:
df_new=df_pivo.groupBy("shop").pivot("Quater").sum("revenue")
df_new.show()

+----+----+----+
|shop|  Q1|  Q2|
+----+----+----+
|   x|5000|7000|
|   a|6000|8000|
+----+----+----+



In [None]:
df_pivo.groupBy("shop","Quater").pivot("Quater").sum("revenue").show()

+----+------+----+----+
|shop|Quater|  Q1|  Q2|
+----+------+----+----+
|   a|    Q1|6000|null|
|   x|    Q1|5000|null|
|   a|    Q2|null|8000|
|   x|    Q2|null|7000|
+----+------+----+----+



In PySpark, the groupBy clause is commonly used in conjunction with the pivot function when performing pivoting operations. The groupBy clause is used to group the data before applying the pivot operation, allowing you to aggregate or summarize the values within each group.

When you pivot data, you typically need to specify an aggregation function (e.g., sum, avg) to determine how values should be combined for each combination of pivot and grouping columns. The groupBy clause provides the context for these aggregations.

**un_pivot**

In [None]:
df_new.selectExpr("shop","stack(2,'Q1',Q1,'Q2',Q2) as (Quater,revenue)").show()

+----+------+-------+
|shop|Quater|revenue|
+----+------+-------+
|   x|    Q1|   5000|
|   x|    Q2|   7000|
|   a|    Q1|   6000|
|   a|    Q2|   8000|
+----+------+-------+



### **stack**

The stack function in PySpark is used to stack or combine multiple columns into two columns: one column for the values from the specified columns, and another column for the associated column names. In your specific case, the stack function is used as follows:\
stack(2, 'Q1', Q1, 'Q2', Q2) as (Quarter, Revenue)

*   2: Indicates that you want to stack two columns at a time. In other words, you are combining pairs of columns.

*   'Q1', Q1, 'Q2', Q2: These are the columns to be stacked. The pairs are 'Q1', Q1 and 'Q2', Q2. The first element in each pair is a literal string (e.g., 'Q1', 'Q2'), and the second element is the reference to the actual DataFrame column with the corresponding name
*   'Q1', Q1 - in 'Q1' is the name or value which we want to see in the output column and Q1 is the column name which is present in the dataframe which we want to stack.



So, for each pair, the literal string is used as a label in the output column (e.g., 'Q1' for the first pair, 'Q2' for the second pair), and the values from the referenced columns are combined into a single column.

+----+---+---+\
|shop|Q1 &emsp;|Q2|\
+----+---+---+\
| A  &emsp;|10 &emsp;| 20|\
| B  &emsp;|30 &emsp;| 40|\
+----+---+---+


Applying the stack(2, 'Q1', Q1, 'Q2', Q2) operation would result in:
>df_new.selectExpr("shop","stack(2,'Q1',Q1,'Q2',Q2) as (Quater,revenue)").show()

Quarter - The first column contains the literal values 'Q1' or 'Q2' (Quarter).\
revenue - The second column contains the values from the referenced columns ('Q1', 'Q2').

'Q1',Q1 - 'Q1' will go to Quater, Q1 will go to revenue

e.g.\
Q1 - Q1 , its value is 10 so in result Quarter - Q1 and Revenue - 10


+----+-------+-------+\
|shop|Quarter|Revenue|\
+----+-------+-------+\
| A  &emsp;| Q1    &emsp;| 10    &emsp;&emsp;|\
| A  &emsp;| Q1    &emsp;| 10    &emsp;&emsp;|\
| A  &emsp;| Q2    &emsp;| 20    &emsp;&emsp;|\
| B  &emsp;| Q1    &emsp;| 30   &emsp; &emsp;|\
| B  &emsp;| Q2    &emsp;| 40   &emsp;&emsp; |\
+----+-------+-------+


<hr>

## **UDF user defined funciton**

user defined function is piece of code which perform certain task and can be reused to perform the same task across multiple scenarios.

syntax -
> def UDF_name(parameters)\
> --code to perform the task\
> --return Return_output

UDF - blacl box
* UDF is black box for spark engine we cannot apply optimization on the UDF. It is not recomended for databrick or spark development.
* Try to minimize the usage of UDF and use built in fucntion.
* UDF are created in python or scala but dataframr are in JVM format. so when we call UDF to exectue certain task, it would happen through java API, which require data serialization and deserialization to perform the task. And as UDF black box to spark (as not in JVM), it ca't apply optimization techniques by defualt.

In [None]:
data = [("Alice", 25, None),
        ("Bob", None, 30),
        ("Charlie", 35, 40),
        (None,52,76)]

columns = ["name", "age", "score"]
df_udf = spark.createDataFrame(data, columns)

In [None]:
df_udf.show()

+-------+----+-----+
|   name| age|score|
+-------+----+-----+
|  Alice|  25| null|
|    Bob|null|   30|
|Charlie|  35|   40|
|   null|  52|   76|
+-------+----+-----+



### **Define UDF to rename column**

In [None]:
import pyspark.sql.functions as f



def rename_column(rename_df):
    for column in rename_df.columns:
        new_column="col_"+column
        rename_df =rename_df.withColumnRenamed(column,new_column)

        # withColumnRenamed('column_name','Value')

    return rename_df

**Execute UDF**

In [None]:
renamed_df=rename_column(df_udf)
renamed_df.show()

+--------+-------+---------+
|col_name|col_age|col_score|
+--------+-------+---------+
|   Alice|     25|     null|
|     Bob|   null|       30|
| Charlie|     35|       40|
|    null|     52|       76|
+--------+-------+---------+



### **Split()**

pyspark function that splits a single column into multiple column based on certain logic\
syntax -
> **pyspark.sql.functions.split(str,pattern,limit=-1)**
> * str - a string exression to split
> * Pattern- a string representing a regular expression
> *  limit - optional; an integer that control the number of times pattern is applied. 

### **Method 1**

In [None]:
# Sample DataFrame with a column containing commas
data = [("Alice,25,3.5", "yt"),
        ("Bob,30,4.0", "rr"),
        ("Charlie,35,4.5", "we")]

In [None]:

columns = ["data", "name"]
split_df = spark.createDataFrame(data, columns)

In [None]:
split_df.show()

+--------------+----+
|          data|name|
+--------------+----+
|  Alice,25,3.5|  yt|
|    Bob,30,4.0|  rr|
|Charlie,35,4.5|  we|
+--------------+----+



In [None]:
from pyspark.sql.functions import split

In [None]:
after_split=split_df.withColumn("first_col",split(split_df.data,',',2).getItem(0)).withColumn("second_col",split(split_df.data,',',2).getItem(1))

In [None]:
after_split.show()

+--------------+----+---------+----------+
|          data|name|first_col|second_col|
+--------------+----+---------+----------+
|  Alice,25,3.5|  yt|    Alice|    25,3.5|
|    Bob,30,4.0|  rr|      Bob|    30,4.0|
|Charlie,35,4.5|  we|  Charlie|    35,4.5|
+--------------+----+---------+----------+



**split(split_df.data,',',2).getItem(0)**

*  This expression is used on a string (split_df.data) and splits it into a list of substrings using ',' as the delimiter.
*  split_df - is the dataframe, data - is the column present in the dataframe
*  The third argument, 2, indicates that the splitting should be done at most 2 times.
*  The result is a list of substrings.
*  getItem(0) - to get the elemnt at index 0 form the list after split.
*  **split_df.data.split(',', 2)[0]** - we can use this format also.

### **Method 2**

In [None]:
from pyspark.sql.functions import split

* write the split query
* using select option get the columns instead of withColumn

In [None]:
split_col=split(split_df['data'],',',2)

df3=split_df.select("data","name",split_col[0].alias('first_col'),split_col[1].alias('second_col'))

df3.show()

+--------------+----+---------+----------+
|          data|name|first_col|second_col|
+--------------+----+---------+----------+
|  Alice,25,3.5|  yt|    Alice|    25,3.5|
|    Bob,30,4.0|  rr|      Bob|    30,4.0|
|Charlie,35,4.5|  we|  Charlie|    35,4.5|
+--------------+----+---------+----------+



## **other use cases**


*   **we can use different split pattern and use them in one query.**
*   Here we are applying split on different columna and using them in one query.
> df.withColumn("first",split(df.name," ")[0])\
>   .withColumn("second",split(df.name," ")[1])\
>   .withColumn("start_date",split(df.date,"-")[0])\
>   .withColumn("end_date",split(df.date,"-")[1])

* **split and drop the splitted columns**

>df.withColumn("first",split(df.name," ")[0])\
>   .withColumn("second",split(df.name," ")[1])\
>   .drop("df.name")

In [None]:
new_df=split_df.withColumn("first",split(split_df.data,",",2)[0]).withColumn("second",split(split_df.data,",",2)[1]).drop("df.data")

In [None]:
new_df.show()

+--------------+----+-------+------+
|          data|name|  first|second|
+--------------+----+-------+------+
|  Alice,25,3.5|  yt|  Alice|25,3.5|
|    Bob,30,4.0|  rr|    Bob|30,4.0|
|Charlie,35,4.5|  we|Charlie|35,4.5|
+--------------+----+-------+------+



### **Arrays_Zip**

pyspark function that return a merged array of struct in which the N-th struct contians all N-th values of input array.
* if there are two column having the array datatype and contain array and we want to merge them than we can use array_zip which will merge the two array based on the position of elements into a single elements.
* syntax
>   **Output = inputDF.withColumn("zipped_value",array_zip("array_score1","array_score_2"))**
*  This will merge the two elements into sinlge element in a column.
*  we can suppy any number of array_columns and merge them.
*  this means we will supply n number of arrays and accoring to posisiton from all array values will be merged, like value of index 0 form all arrays will be merged together and stored in column than value of index 1 will be added together and stored in column.

In [None]:
# Sample DataFrame with two arrays
data = [("Alice", [25, 30, 35,7], [3.5, 4.0, 4.5]),
        ("Bob", [28, 32, 37], [3.2, 4.2, 4.7]),
        ("Charlie", [22, 27, 33], [3.8, 4.1, 4.4])]



In [None]:
columns = ["name", "ages", "scores"]
df = spark.createDataFrame(data, columns)


In [None]:
from pyspark.sql.functions import arrays_zip, col

# Use array_zip to combine two arrays into an array of structs
df_result = df.withColumn("combined_data", arrays_zip(col("ages"), col("scores")))

# Show the result
df_result.show(truncate=False)

+-------+---------------+---------------+--------------------------------------------+
|name   |ages           |scores         |combined_data                               |
+-------+---------------+---------------+--------------------------------------------+
|Alice  |[25, 30, 35, 7]|[3.5, 4.0, 4.5]|[{25, 3.5}, {30, 4.0}, {35, 4.5}, {7, null}]|
|Bob    |[28, 32, 37]   |[3.2, 4.2, 4.7]|[{28, 3.2}, {32, 4.2}, {37, 4.7}]           |
|Charlie|[22, 27, 33]   |[3.8, 4.1, 4.4]|[{22, 3.8}, {27, 4.1}, {33, 4.4}]           |
+-------+---------------+---------------+--------------------------------------------+



>+-------+------------+---------------+---------------------------------.+\
>|name   &emsp;&emsp;|ages&emsp;&emsp;&emsp;          |scores&emsp;&emsp;&emsp;&emsp;&emsp;  &emsp;           |combined_data                      \
>+-------+------------+---------------+---------------------------------+\
>|Alice  &emsp; |[25, 30, 35]&emsp;|[25, 30, 35]&emsp;  |[{25, 3.5}, {30, 4.0}, {35, 4.5},{7, null}]|

[25, 30, 35],[25, 30, 35] - the elements of two array merged based on position and result is stored in new column combined_data.
* If one array contain less number of element than other than null value is added in palce.

* If we give only one array than array_zip will split each element of array into seprate element.

In [None]:
from pyspark.sql.functions import explode

df_brand_exp=df_result.withColumn("explode",explode("combined_data"))

**explode()**\
The explode function in PySpark is used to transform a column with arrays or maps into multiple rows, with one row for each element in the array or map. This is particularly useful when you have a column with nested structures, and you want to flatten it for further analysis.

In [None]:
df_brand_exp.show()

+-------+---------------+---------------+--------------------+---------+
|   name|           ages|         scores|       combined_data|  explode|
+-------+---------------+---------------+--------------------+---------+
|  Alice|[25, 30, 35, 7]|[3.5, 4.0, 4.5]|[{25, 3.5}, {30, ...|{25, 3.5}|
|  Alice|[25, 30, 35, 7]|[3.5, 4.0, 4.5]|[{25, 3.5}, {30, ...|{30, 4.0}|
|  Alice|[25, 30, 35, 7]|[3.5, 4.0, 4.5]|[{25, 3.5}, {30, ...|{35, 4.5}|
|  Alice|[25, 30, 35, 7]|[3.5, 4.0, 4.5]|[{25, 3.5}, {30, ...|{7, null}|
|    Bob|   [28, 32, 37]|[3.2, 4.2, 4.7]|[{28, 3.2}, {32, ...|{28, 3.2}|
|    Bob|   [28, 32, 37]|[3.2, 4.2, 4.7]|[{28, 3.2}, {32, ...|{32, 4.2}|
|    Bob|   [28, 32, 37]|[3.2, 4.2, 4.7]|[{28, 3.2}, {32, ...|{37, 4.7}|
|Charlie|   [22, 27, 33]|[3.8, 4.1, 4.4]|[{22, 3.8}, {27, ...|{22, 3.8}|
|Charlie|   [22, 27, 33]|[3.8, 4.1, 4.4]|[{22, 3.8}, {27, ...|{27, 4.1}|
|Charlie|   [22, 27, 33]|[3.8, 4.1, 4.4]|[{22, 3.8}, {27, ...|{33, 4.4}|
+-------+---------------+---------------+----------

As you can see each element of array is flatten out into rows.

### **array_intersect**

pyspark function that return common element across two array without any duplication

>**from pyspark.sql.functions import arrays_intersect**\
> **outputDF= inputDF.withColumn("intersect",arrays_intersect("col1","col2"))**

*  Intersections is done on the arrays present in col1 and col2

In [None]:
# Sample DataFrame with two array columns
data = [("Alice", [25, 30, 35], [30, 35, 40]),
        ("Bob", [28, 32, 37], [32, 35, 37]),
        ("Charlie", [28, 21, 33], [25, 33, 35])]

columns = ["name", "ages1", "ages2"]
df = spark.createDataFrame(data, columns)

from pyspark.sql.functions import array_intersect

df.show()


+-------+------------+------------+
|   name|       ages1|       ages2|
+-------+------------+------------+
|  Alice|[25, 30, 35]|[30, 35, 40]|
|    Bob|[28, 32, 37]|[32, 35, 37]|
|Charlie|[28, 21, 33]|[25, 33, 35]|
+-------+------------+------------+



In [None]:

# Use expr to find the intersection of two array columns
df_intersect = df.withColumn("common_ages",array_intersect("ages1", "ages2"))
df_intersect.show()

+-------+------------+------------+-----------+
|   name|       ages1|       ages2|common_ages|
+-------+------------+------------+-----------+
|  Alice|[25, 30, 35]|[30, 35, 40]|   [30, 35]|
|    Bob|[28, 32, 37]|[32, 35, 37]|   [32, 37]|
|Charlie|[28, 21, 33]|[25, 33, 35]|       [33]|
+-------+------------+------------+-----------+



### **array_except**

Pyspark fucntions that return the list of elements which are present in first array but not in second array.
> **from pyspark.sql.fucntions import array_except**\
> **output_except= inputDF.withColumns("except",array_except("col1","col2"))**

*  Its like col1-col2 which. means element present in col1 except col2. 
*  common element between col1 and col2 are subtracted form col1


In [None]:
from pyspark.sql.functions import array_except

# Use expr to find the intersection of two array columns
df_except = df.withColumn("common_ages",array_except("ages1", "ages2"))
df_except.show()

+-------+------------+------------+-----------+
|   name|       ages1|       ages2|common_ages|
+-------+------------+------------+-----------+
|  Alice|[25, 30, 35]|[30, 35, 40]|       [25]|
|    Bob|[28, 32, 37]|[32, 35, 37]|       [28]|
|Charlie|[28, 21, 33]|[25, 33, 35]|   [28, 21]|
+-------+------------+------------+-----------+



## **array_sort**

pyspark fucnitons that sorts the elements within the array in ascending order.

In [None]:
import pyspark.sql.functions as fu

df_sort = df.withColumn("sort_ages",fu.array_sort("ages1"))
df_sort.show()

+-------+------------+------------+------------+
|   name|       ages1|       ages2|   sort_ages|
+-------+------------+------------+------------+
|  Alice|[25, 30, 35]|[30, 35, 40]|[25, 30, 35]|
|    Bob|[28, 32, 37]|[32, 35, 37]|[28, 32, 37]|
|Charlie|[28, 21, 33]|[25, 33, 35]|[21, 28, 33]|
+-------+------------+------------+------------+



## **Join the iterable object element**

python function that return a string by joining all the elements of an iterable data such as list, tuple etc seprated by a string seprator.

sytax
> **string_seprator.join(iterable data)**
> *  string_Seprator - seprator string using which iterable elements get joined. it can be , : or can be stirng. It is the seprator based on which you want to join.
> *  join - keyword of koin functions
> *  iterabel data -data such as list or tuple which can be iterated for joining. 

In [None]:
column_list= df.columns
print(column_list)

join_column= "-".join(column_list)
print(join_column)

['name', 'ages1', 'ages2']
name-ages1-ages2


**column_list= df.columns**
* column_list - contain the list of elements.
**join_column= "-".join(column_list)**
* As join will join the iterable object element together but seprated by - in above example.
* join_column will return a string.

# **partition by fucniton**

function that is used to create partitions based on key and than write them into disk there can be more than one key
syntax
> **df.write.partitionBy(key).csv(path)**
> * csv - file format in which we want to store the partitions on disk.
> for each partition a seprate file is created.

In [18]:
df= spark.read.format('csv').options(header=True,inferchsmea=True,sep=',').load("data_for_files/2011-12-08.csv")

In [19]:
df.show(4)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   581214|    23494|VINTAGE DOILY DEL...|      12|2011-12-08 08:32:00|     5.95|   14251.0|United Kingdom|
|   581214|    22969|HOMEMADE JAM SCEN...|      60|2011-12-08 08:32:00|     1.45|   14251.0|United Kingdom|
|   581214|    22910|PAPER CHAIN KIT V...|      40|2011-12-08 08:32:00|     2.55|   14251.0|United Kingdom|
|   581214|    22734|SET OF 6 RIBBONS ...|      12|2011-12-08 08:32:00|     2.89|   14251.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 4 rows



In [None]:
# Your PySpark code that is causing the error
df.write.option("header", True).partitionBy("Description").mode("overwrite").csv("tmp/")

**partitionBy("Description")**
* partitions is based on dataframes column Description.
* there will be seprate file created for each partitions on disk name will be Description ="value"
* value is the valueof key on which partition happened.

**partitionBy("Description","InvoiceDate)**
* In this first partition is done based on Description column and tha done based on InvoiceDate. thus within Description folder another folder will be created which contain parition of InvoiveDate.


### **To limit the number of record in particular partition**

In [None]:
df.write.option("header", True).option("maxRecordsPerFile",4200).partitionBy("Description").mode("overwrite").csv("/tmp")

**opiton(maxRecordsPerFile",4200)**
* this parameter ensure the number of records per partitions file.
* parttion happend based on column Description but if partition file contain more than 4200 record than within partition folder file is further partitioned into smaller files based on number of records in file.
*  if you set maxRecordsPerFile to 4200, PySpark will ensure that each partition file will contain at most 4200 records. If the number of records in a partition exceeds this limit, PySpark will create additional partition files to accommodate the excess records, ensuring that each partition file stays within the specified limit.

In [None]:
# to get number of record in df dataframe.
df.count()

### **Get number of partition** 
- we need to change df to RDD to use getNumPartitions

In [None]:
print(df.rdd.getNumPartitions())

### **Number of record per partition**

In [None]:
from pyspark.sql.functions import spark_partition_id

df.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show()

partitionId - 0 , count - 4940
* Because there is only one partition, id of partition start from 0.

**from pyspark.sql.functions import spark_partition_id**
*  The spark_partition_id function in PySpark is used to get the ID of the partition that a row is in. It returns a new column containing the ID of the partition. This function can be useful when you want to inspect or analyze the distribution of data across partitions.


### **Repartition the dataframe to 5**
* becasue of this there will be 5 parition will be created and records are almost equally distributed among different partitions.

In [None]:
df_5=df.select(df.Description).repartition(5)

### **what if we do partitionBy and than repartition**
If you first partition the data based on a particular column and then apply the repartition() function with a different number of partitions, the result will be a shuffle that disregards the previous partitioning scheme and redistributes the data across the new number of partitions.

In other words, the previous partitioning based on the specific column value will not be preserved, and the data will be shuffled into the new number of partitions specified by the repartition() function.

### **get number of partitions in a dataframe**

In [None]:
print(df_5.rdd.getNumPartitions())

### **Get number of record per partitions**

In [None]:
df_5.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show()

* There were 4940 records in df and on parition of df into 5 parts each part get 988 records.
* **withColumn("partitionId",spark_partition_id())**
  * Each record have another column name partitionId which will tell in which partition the record is present.
  * On grouping the df based on partitionId will group the records having same partitionId and than we apply aggregation function count which will give count of record in each group.


### **count Null value in a dataframes all column**

**find null occurences of each column in dataframe**
* **df.select([count(when(col(c).isNull),c)).alias(c) for c in df.coulumns])**
  * when(col(c).isNull) - true when column c have null value.
  * count() - will count the record where column c have null value.
  * alias(c) - name given to counted column
  * select([]) - [] contain list of columns and those will be displayed.

### **Find top or Bottom N rows per group**

* To get the top N record from all partition
* we need to partition the df using over() window function
* use row_number fucntion which will assign number to each row in each partition.
* than we can filter record based on the row number like col<5 will get top 5 record of each col.

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import col,row_number,desc
from pyspark.sql.window import Window

windowDep=Window.partitionBy("Description").orderBy(col("InvoiceNo").desc())

df_row=df.withColumn('row_number',row_number().over(windowDep))


**we can use desc or asc according to requirement to get first or bottom rows**

In [None]:
df_row.show()

**Get Top N rows per group**

In [None]:
df3_top= df_row.filter(col("row_number")<=1)

col(row_number)<=1 - This will give the record who have row_number  less than or equal to 1.

In [None]:
df3_top.show()

## **Greatest vs least and max vs min**

* Greatest
  * It is used to get max value across the columns in each row.
  * Horizontal scanning of all columns of each row. To get max value in each row.
> **from pyspark.sql.functions import greatest**\
> **greatestDF= df.withColumn("greatest",greatest("id","value"))**
>  * greatest("col1","col2") - col1, col2 columns in which we want to search to find greatest value out of these columns.
* least
  * It is used to get min value across the column in each row

* max
  * this will give max value in a column out of all rows of that column
  * vertical scanning of each row of a column specified.
* min
  * this will give min value in a column out of all rows of that column.

> **from pyspark.sql.functions import max,min**\
> **max_value = df.select(max("id"),max("value")).show()**\
> **min_value = df.select(min("id"),min("value")).show()**
> *  max("id"),max("value") - id, value are columns in which we want to search the max values.

In [None]:
# Create a sample DataFrame
data = [(1, 10), (2, 5), (3, 8)]
columns = ["id", "value"]
df = spark.createDataFrame(data, columns)

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import greatest,least,max,min 

**To get the greatest value across all column in each row**

In [None]:
greatestDF= df.withColumn("greatest",greatest("id","value"))
greatestDF.show()

**To get the least value across all column in each row**

In [None]:
leasttDF= df.withColumn("least",least("id","value"))
leasttDF.show()

**To get max, min value in a columns**

In [None]:
# Find the maximum and minimum values in the 'value' column
df.show()
max_value = df.select(max("id"),max("value")).show()
min_value = df.select(min("id"),min("value")).show()

In [None]:
df.show()
max_value = df.agg({'id':'max','value':'max'}).show()
min_value = df.agg({'id':'min','value':'min'}).show()

## **input_File_name**

* This is used to identify the input file name for each record that got created in dataframe.
* when dataframe is created using different files than with each record there is file name also attached form which file this record came so in case some irregularity comes in data than we can know from which file this data came and than we can correct that data in the file.
* otherwise we have to go through all folders files to find the corrupted data.
* The input_file_name() function in PySpark doesn't directly get information about which row came from which file. Instead, it provides information about the input file name at the task level. When you use input_file_name() within a transformation, such as map or flatMap, it is executed on each partition independently

**Note -**
  *   Keep in mind that input_file_name() is only available when reading from Hadoop Distributed File System (HDFS), and it might not work as expected when reading from other file systems. 

In Apache Spark, the input_file_name() function provides the name of the file being read by the currently executed task. It is commonly used in PySpark transformations to access information about the input file during data processing. Here's a brief explanation of how input_file_name() works:

1. Task Execution in Spark:

   *  Spark jobs are divided into tasks, and these tasks are executed on different partitions of the data.
   *  Each task processes a portion of the data independently.
2. Understanding Input Files:

   * When you read data from a directory that contains multiple files, Spark splits the data into partitions, and each task processes a subset of the partitions.
   *  Each task is responsible for reading and processing the data from one or more files within its assigned partition.
3. Task-Specific File Information:

   *  The input_file_name() function is designed to provide information about the input file being processed by the current task.
   *  When you use input_file_name() within a Spark transformation, it returns the file name of the current input file for that specific task.
4. Usage in Transformations:

   *  You can use input_file_name() within Spark transformations, such as map, flatMap, or other functions that operate on the data.
   *  By using input_file_name(), you can include the file name information in your transformations, allowing you to customize processing based on the source file.

In [None]:
df =spark.read.format("csv").option("recursiveFileLookup","true").option("header","true").load("dbfs:/FileStore/tables")

* **recursiveFileLookup","true"**
  * If folder contain folder within it and in them the files are present than use recursiveFileLookup which will search the sub folders also.

In [None]:
from pyspark.sql.functions import input_file_name,current_timestamp
display(df.withColumn("filename",input_file_name()))

* **input_file_name**
  * This will fetch the file name from which file particular row data is fetched.
  * filename - contain the path of that file or value fetched by input_file_name fucntion.

In [None]:
display(df.withColumn("filename",input_file_name()).withColumn("date_ingested",current_timestamp()))

**current_timestamp** - will give the current time when data is injected into dataframe than at that moment the time will be fetched by the current_timestam and we can find at what time form which file we fetched paticular row in the dataframe.

## **window fucntion**

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

data = [(1, "Alice", 100),
        (2, "Bob", 150),
        (3, "Charlie", 200),
        (4, "David", 120),
        (5, "Eva", 180)]

columns = ["id", "name", "salary"]

df = spark.createDataFrame(data, columns)

window_spec = Window.orderBy("id")

* To use window function we need to import Window class from pyspark.sql.window module.
* window_spec = Window.orderBy("id")
  * In this example, Window.orderBy("id") creates a Window instance that orders the rows by the "id" column. This instance (window_spec)

* **partitionBy**
  * **window_spec=Window.partitionBy("name").orderBy("salary")**
    * window_spec is an instance of the **Window class**, and it represents a window specification. * 
    * This window specification defines the partitioning and ordering behavior for window functions applied to a DataFrame.

    * In this specific case:
      * **partitionBy("name")**: It specifies that the rows will be partitioned by the values in the "name" column. This means that window functions will operate independently within each partition based on unique values in the "name" column.
      * **orderBy("salary")**: Within each partition, the rows will be ordered by the "salary" column. This defines the order in which the window functions will be applied within each partition.



### **Different functions assosiated with window fucntion**

**note -** - F is the alias for the class functions

**Row Number:**

In [None]:
df = df.withColumn("row_number", F.row_number().over(window_spec))
df.show()

+---+-------+------+----------+
| id|   name|salary|row_number|
+---+-------+------+----------+
|  1|  Alice|   100|         1|
|  2|    Bob|   150|         2|
|  3|Charlie|   200|         3|
|  4|  David|   120|         4|
|  5|    Eva|   180|         5|
+---+-------+------+----------+



**Rank:**

In [None]:
window_spec = Window.orderBy("salary")
df = df.withColumn("rank", F.rank().over(window_spec))

df.show()

+---+-------+------+----------+----+
| id|   name|salary|row_number|rank|
+---+-------+------+----------+----+
|  1|  Alice|   100|         1|   1|
|  4|  David|   120|         4|   2|
|  2|    Bob|   150|         2|   3|
|  5|    Eva|   180|         5|   4|
|  3|Charlie|   200|         3|   5|
+---+-------+------+----------+----+



**Dense Rank:**

In [None]:
window_spec = Window.orderBy("salary")
df = df.withColumn("dense_rank", F.dense_rank().over(window_spec))

**Lag:**

In [None]:
window_spec = Window.orderBy("id")
df = df.withColumn("lag_salary", F.lag("salary").over(window_spec))

df.show()


+---+-------+------+----------+----+----------+----------+
| id|   name|salary|row_number|rank|dense_rank|lag_salary|
+---+-------+------+----------+----+----------+----------+
|  1|  Alice|   100|         1|   1|         1|      null|
|  2|    Bob|   150|         2|   3|         3|       100|
|  3|Charlie|   200|         3|   5|         5|       150|
|  4|  David|   120|         4|   2|         2|       200|
|  5|    Eva|   180|         5|   4|         4|       120|
+---+-------+------+----------+----+----------+----------+



**Lead:**

In [None]:
window_spec = Window.orderBy("id")
df = df.withColumn("lead_salary", F.lead("salary").over(window_spec))

df.show()


+---+-------+------+----------+----+----------+----------+-----------+
| id|   name|salary|row_number|rank|dense_rank|lag_salary|lead_salary|
+---+-------+------+----------+----+----------+----------+-----------+
|  1|  Alice|   100|         1|   1|         1|      null|        150|
|  2|    Bob|   150|         2|   3|         3|       100|        200|
|  3|Charlie|   200|         3|   5|         5|       150|        120|
|  4|  David|   120|         4|   2|         2|       200|        180|
|  5|    Eva|   180|         5|   4|         4|       120|       null|
+---+-------+------+----------+----+----------+----------+-----------+



## **convert columns to dictionary (Map)**

*  **create_map**
>   **df.select(create_map(cols,col2))**
>  * The name of column will become the key and the value become value of the keys.
* create_map need 2n paramters where first is key and next is the value.
* df.select(create_map(lit("shop"),col2))
  * lit() is used to add constant value, col2 is used to give value to the key.

In [None]:
from pyspark.sql import functions as F

data = [(1, "Alice", 100),
        (2, "Bob", 150),
        (3, "Charlie", 200),
        (4, "David", 120),
        (5, "Eva", 180)]

columns = ["id", "name", "sale"]

df = spark.createDataFrame(data, columns)



In [None]:
from pyspark.sql.functions import col,create_map,lit, explode
df2=df.select(col("id"),col("name"),create_map(col("name"),col("sale").alias("shop_sale")))
df2.show()


+---+-------+----------------------------+
| id|   name|map(name, sale AS shop_sale)|
+---+-------+----------------------------+
|  1|  Alice|              {Alice -> 100}|
|  2|    Bob|                {Bob -> 150}|
|  3|Charlie|            {Charlie -> 200}|
|  4|  David|              {David -> 120}|
|  5|    Eva|                {Eva -> 180}|
+---+-------+----------------------------+



In [None]:
df2=df.select(col("id"),col("name"),create_map(lit("shop_name"),col("name"),lit("shop_sale"),col("sale")).alias("map"))
df2.show(truncate=False)

+---+-------+----------------------------------------+
|id |name   |map                                     |
+---+-------+----------------------------------------+
|1  |Alice  |{shop_name -> Alice, shop_sale -> 100}  |
|2  |Bob    |{shop_name -> Bob, shop_sale -> 150}    |
|3  |Charlie|{shop_name -> Charlie, shop_sale -> 200}|
|4  |David  |{shop_name -> David, shop_sale -> 120}  |
|5  |Eva    |{shop_name -> Eva, shop_sale -> 180}    |
+---+-------+----------------------------------------+



In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark=SparkSession.builder.config('spark.driver.host','localhost').getOrCreate()


**Note -** to get the values from dict into column we can use explode.

In [None]:
df2.select(explode("map"))

DataFrame[key: string, value: string]

In [None]:
df2.show()

+---+-------+--------------------+
| id|   name|                 map|
+---+-------+--------------------+
|  1|  Alice|{shop_name -> Ali...|
|  2|    Bob|{shop_name -> Bob...|
|  3|Charlie|{shop_name -> Cha...|
|  4|  David|{shop_name -> Dav...|
|  5|    Eva|{shop_name -> Eva...|
+---+-------+--------------------+



## **Struct Type vs Map type**

* **Struct Type**
  * In this we have to strictly follow the structure. we can add None value if value is not present. 
  * **StructType([StructField("Col1",dataype,True),StructField("Col2",StructType([StructField("Col11",dataype,True),StructField("Col12",dataype,True)]),True)])**
    * While inserting data, this structure must be followed.
    * True means null value is allowed.
    * e.g. - **"Ab"** , {**"col11":"val11"**,**"col12":"val12"**}
* **Map Type**
  * In this there is key value pair and we can add any number of key value pair in that.
  * **StructType([StructField("Col1",dataype,True),MapType(datatype1,datatype2,True)])**
    * datatype1- datatype of the key
    * datatype2 - daataype of the value
    * True - null value allowed.
    * while adding data we need to mention the key,value pair.
    * e.g. - **"Ab"** , {**"col11":"val11"**,**"col12":"val12"**}

Both output might look same but there internal working is different.

## **Data security: column level data encryption**

In [None]:
# %pip install cryptography

Collecting cryptography
  Downloading cryptography-41.0.7-cp37-abi3-win_amd64.whl (2.7 MB)
     ---------------------------------------- 2.7/2.7 MB 11.3 MB/s eta 0:00:00
Collecting cffi>=1.12
  Using cached cffi-1.16.0-cp39-cp39-win_amd64.whl (181 kB)
Collecting pycparser
  Using cached pycparser-2.21-py2.py3-none-any.whl (118 kB)
Installing collected packages: pycparser, cffi, cryptography
Successfully installed cffi-1.16.0 cryptography-41.0.7 pycparser-2.21
Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'd:\projects_py\spark_test\.env\Scripts\python.exe -m pip install --upgrade pip' command.


In [None]:
from cryptography.fernet import Fernet

### **Generate Encryption/Decryption key**

In [None]:
key = Fernet.generate_key()
f=Fernet(key)

* **Fernet.generate_key()**
  * Fernet.generate_key(): This function generates a new random key for use with the Fernet symmetric encryption algorithm. The key is a URL-safe base64-encoded 32-byte key, which can be used both for encryption and decryption.
*  **Fernet(key)**
   *  This line creates a Fernet object using the key generated in the first step. The Fernet object is then used to perform encryption and decryption operations.



<br>

*  key is the actual symmetric encryption key that is generated using Fernet.generate_key().
*  f is a Fernet object that uses the key for encryption and decryption operations

### **Encrypt sample data**

In [None]:
PIIData = b"testmail@gamil.com"
TestData= f.encrypt(PIIData)
print(TestData)

b'gAAAAABlcZsWAOREutEI5o7gEjfQcPD2XXPyZCOaZvgK8ZeQsqFlckZCC2zwCyY9znOBDh3R442BJaOibBK2c7wJiK6BWogoWqbMBRoKwMFC7AI3Zqrsj18='


* THe calue stored in PIIData is encrypted to TestData.

### **Define UDF to Encrypt Data**

In [None]:
def encrypt_data(data,KEY):
    from cryptography.fernet import Fernet
    f=Fernet(KEY)
    dataB=bytes(data,'utf-8')
    encrypted_data=f.encrypt(dataB)
    encrypted_data=str(encrypted_data.decode('ascii'))

    return encrypted_data


The line dataB = bytes(data, 'utf-8') is converting the string data (assuming it's stored in the variable data) into bytes using UTF-8 encoding.

*  **data:** This is assumed to be a string variable containing some textual data.
*  **bytes(data, 'utf-8'):** This line converts the string data to bytes using the UTF-8 encoding. In Python, strings are Unicode by default, and when you need to represent them as bytes, you specify the encoding (in this case, 'utf-8'). The result is stored in the variable dataB

This is useful when working with operations that expect byte-like objects, such as cryptographic functions (like the Fernet encryption you mentioned earlier) or when working with binary data in general.

### **Define UDF to Decrypt Data**

In [None]:
def decrypt_data(encrypted_data,KEY):
    from cryptography.fernet import Fernet
    f=Fernet(KEY)
    decrypted_data=f.decrypt(encrypted_data.encode()).decode()
    return decrypted_data

### **Register UDF's**

In [None]:
from pyspark.sql.functions import udf,lit,md5
from pyspark.sql.types import StringType

encryption = udf(encrypt_data,StringType())
decryption = udf(decrypt_data,StringType())

### **create dataframe**

In [None]:

data = [(1, "Alice", 100),
        (2, "Bob", 150),
        (3, "Charlie", 200),
        (4, "David", 120),
        (5, "Eva", 180)]

columns = ["id", "name", "salary"]

encrypteddf = spark.createDataFrame(data, columns)
encrypteddf.show()

**Encrypt the data**

In [None]:
encryptDF=encrypteddf.withColumn("name_encrypted",encryption("name",lit(key)))
encryptDF.show()

+---+-------+------+--------------------+
| id|   name|salary|      name_encrypted|
+---+-------+------+--------------------+
|  1|  Alice|   100|gAAAAABlcaN_qv0Cr...|
|  2|    Bob|   150|gAAAAABlcaN_-J-9_...|
|  3|Charlie|   200|gAAAAABlcaON2dEwK...|
|  4|  David|   120|gAAAAABlcaONIEZYq...|
|  5|    Eva|   180|gAAAAABlcaOOd1YQe...|
+---+-------+------+--------------------+



### **Decrypt the data**

In [None]:
decryptDF=encryptDF.withColumn("name_decrypted",decryption("name_encrypted",lit(key)))
decryptDF.show()

+---+-------+------+--------------------+--------------+
| id|   name|salary|      name_encrypted|name_decrypted|
+---+-------+------+--------------------+--------------+
|  1|  Alice|   100|gAAAAABlcaQFb0GwO...|         Alice|
|  2|    Bob|   150|gAAAAABlcaQJmYYu1...|           Bob|
|  3|Charlie|   200|gAAAAABlcaQX_HHIs...|       Charlie|
|  4|  David|   120|gAAAAABlcaQYritOL...|         David|
|  5|    Eva|   180|gAAAAABlcaQaDfAjY...|           Eva|
+---+-------+------+--------------------+--------------+



## **Array_repeat** 

* **Array_repeat**
    * To create huge amount of data using small data set. 
    * Create an array containing a value (from a column) reoeated count times.
  * syntax
>  * **df.withColumn("key_col",array_repeat(col("id),5))**
>       * This will create a column named key_col which contain array of repeated values as of above example.
>       * id - the column whose value we want to repeat.
>       * 5 - number of time to repeate the value.    

**To create the rows we can apply explode function on the array column which will split the array among different rows.**

In [None]:

data = [(1, "Alice", 100),
        (2, "Bob", 150),
        (3, "Charlie", 200),
        (4, "David", 120),
        (5, "Eva", 180)]

columns = ["id", "name", "salary"]

arrayDF = spark.createDataFrame(data, columns)

In [None]:
from pyspark.sql.functions import explode,array_repeat,col
repeatDf=arrayDF.withColumn("key_col",array_repeat(col("id"),2))
repeatDf.show()

+---+-------+------+-------+
| id|   name|salary|key_col|
+---+-------+------+-------+
|  1|  Alice|   100| [1, 1]|
|  2|    Bob|   150| [2, 2]|
|  3|Charlie|   200| [3, 3]|
|  4|  David|   120| [4, 4]|
|  5|    Eva|   180| [5, 5]|
+---+-------+------+-------+



In [None]:
df1=df.withColumn("key_col",explode(array_repeat(col("id"),100)))
df1.show()

+---+-----+------+-------+
| id| name|salary|key_col|
+---+-----+------+-------+
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
|  1|Alice|   100|      1|
+---+-----+------+-------+
only showing top 20 rows



**As id is of different value thus we need to do some changes**

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lit,row_number

win= Window.orderBy(lit('A'))
df2=df1.withColumn("rowN",row_number().over(win))
df3=df2.withColumn("newID",col("id")+col("rowN"))
df3.show()

+---+-----+------+-------+----+-----+
| id| name|salary|key_col|rowN|newID|
+---+-----+------+-------+----+-----+
|  1|Alice|   100|      1|   1|    2|
|  1|Alice|   100|      1|   2|    3|
|  1|Alice|   100|      1|   3|    4|
|  1|Alice|   100|      1|   4|    5|
|  1|Alice|   100|      1|   5|    6|
|  1|Alice|   100|      1|   6|    7|
|  1|Alice|   100|      1|   7|    8|
|  1|Alice|   100|      1|   8|    9|
|  1|Alice|   100|      1|   9|   10|
|  1|Alice|   100|      1|  10|   11|
|  1|Alice|   100|      1|  11|   12|
|  1|Alice|   100|      1|  12|   13|
|  1|Alice|   100|      1|  13|   14|
|  1|Alice|   100|      1|  14|   15|
|  1|Alice|   100|      1|  15|   16|
|  1|Alice|   100|      1|  16|   17|
|  1|Alice|   100|      1|  17|   18|
|  1|Alice|   100|      1|  18|   19|
|  1|Alice|   100|      1|  19|   20|
|  1|Alice|   100|      1|  20|   21|
+---+-----+------+-------+----+-----+
only showing top 20 rows



**DRoping the unwanted columns**

In [None]:
outputDF= df3.drop("Id","key_col","rowN").select(col("newID").alias("id"),col("name"),col("salary"))

outputDF.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  2|Alice|   100|
|  3|Alice|   100|
|  4|Alice|   100|
|  5|Alice|   100|
|  6|Alice|   100|
|  7|Alice|   100|
|  8|Alice|   100|
|  9|Alice|   100|
| 10|Alice|   100|
| 11|Alice|   100|
| 12|Alice|   100|
| 13|Alice|   100|
| 14|Alice|   100|
| 15|Alice|   100|
| 16|Alice|   100|
| 17|Alice|   100|
| 18|Alice|   100|
| 19|Alice|   100|
| 20|Alice|   100|
| 21|Alice|   100|
+---+-----+------+
only showing top 20 rows



### **subtract vs exceptAll**

* Both are the pyspark transformation used in spark development 
* These transformatin create new dataframe by containing the rows from one dataframe which are not present in second dataframe
* subtract does not preserve duplicate while except all preserves duplicate while creating new dataframe.
* exceptAll is like canceling the records, it will not remove the duplicate values if they match with rows present in second dataframe. As in subtrat all the record are removed.

syntax
  * df1.subtract(df2)
  * df1.executeAll(df2)