In [0]:
'''SparkSession which was introduced in Spark 2.0 is a unified entry point for Spark applications.It acts as a connector to all Spark’s underlying functionalities, including RDDs, DataFrames, and Datasets, providing a unified interface to work with structured data processing. It is one of the very first objects you create while developing a Spark SQL application.SparkSession can be created using SparkSession.builder() method. SparkSession consolidates several previously separate contexts, such as SQLContext, HiveContext, and StreamingContext, into one entry point, simplifying the interaction with Spark and its different APIs. It enables users to perform various operations like reading data from various sources, executing SQL queries, creating DataFrames and Datasets, and performing actions on distributed datasets efficiently. '''

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark=SparkSession.builder.appName("Practice").master("local[*]").getOrCreate()

In [0]:
spark

In [0]:
emp_data=[[1,'John','IT'],
          [2,'Harry','HR'],
          [3,'Paul','DATA'],
          [4,'Tom','IT'],
          [5,'Smith','WEB']
        ]
schema=['id','name','dept']
df_emp=spark.createDataFrame(data=emp_data,schema=schema)
df_emp.show() #Action

+---+-----+----+
| id| name|dept|
+---+-----+----+
|  1| John|  IT|
|  2|Harry|  HR|
|  3| Paul|DATA|
|  4|  Tom|  IT|
|  5|Smith| WEB|
+---+-----+----+



In [0]:
display(df_emp.head(3))
display(df_emp.tail(3))
display(df_emp.printSchema())

id,name,dept
1,John,IT
2,Harry,HR
3,Paul,DATA


id,name,dept
3,Paul,DATA
4,Tom,IT
5,Smith,WEB


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)



In [0]:
df_emp.rdd.getNumPartitions()

8

In [0]:
''' 
Transformations:
Transformations are operations on RDDs, DataFrames, or Datasets that produce a new distributed dataset from an existing one. They are generally lazy, meaning they are not executed immediately but create a logical execution plan.

map: Applies a function to each element and produces a new RDD, DataFrame, or Dataset.
filter: Selects elements that satisfy a given condition.
groupBy: Groups elements based on a key.
join: Combines two datasets based on a common key.

Actions are operations that trigger the execution of transformations and return a value to the driver program or write data to an external storage system. They are the operations that actually initiate the computation.

collect: Retrieves all elements of the distributed dataset to the driver program.
count: Returns the number of elements in the distributed dataset.
first: Returns the first element of the distributed dataset.
saveAsTextFile: Writes the content of the dataset to a text file.

Narrow transformations: Quick and simple, requiring no rearranging.
Wide transformations: Include shuffles and can affect output.
Actions: Procedures that start a Spark computation, carry it out, and either write data to an external storage device or return results to the driver application.
To maximize the efficiency of PySpark operations, it is essential to distinguish between narrow and wide transformations because wide transformations might result in a substantial overhead of data shuffling.
'''

In [0]:
df_emp_IT=df_emp.where(df_emp.dept=='IT') #Filtering
df_emp_IT.show()

+---+----+----+
| id|name|dept|
+---+----+----+
|  1|John|  IT|
|  4| Tom|  IT|
+---+----+----+



In [0]:
df_emp_IT.write.format('csv').mode('overwrite').save('/FileStore/tables/output.csv') #Saving output in csv format

In [0]:
employee=[
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = ["employee_id","department_id","name","age","gender","salary","hire_date"]
df_employee=spark.createDataFrame(data=employee,schema=emp_schema)
df_employee.show()
df_employee.printSchema()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [0]:
#Change existing datatype of column
temp_col_datatype_changed_df=df_employee.withColumn("employee_id",df_employee["employee_id"].cast('int'))
temp_col_datatype_changed_df.show()
temp_col_datatype_changed_df.printSchema()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [0]:
#Change existing datatype of columns

updated_df_employee=(df_employee
                       .withColumn("employee_id", col("employee_id").cast('int'))
                       .withColumn("department_id", col("department_id").cast('int'))
                       .withColumn("age", col("age").cast('int'))
                       .withColumn("salary", col("salary").cast('double'))
                       .withColumn("hire_date", to_date(col("hire_date"),'yyyy-MM-dd'))
                      )
updated_df_employee.show()

+-----------+-------------+-------------+---+------+-------+----------+
|employee_id|department_id|         name|age|gender| salary| hire_date|
+-----------+-------------+-------------+---+------+-------+----------+
|          1|          101|     John Doe| 30|  Male|50000.0|2015-01-01|
|          2|          101|   Jane Smith| 25|Female|45000.0|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male|55000.0|2014-05-01|
|          4|          102|    Alice Lee| 28|Female|48000.0|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male|60000.0|2013-04-01|
|          6|          103|    Jill Wong| 32|Female|52000.0|2018-07-01|
|          7|          101|James Johnson| 42|  Male|70000.0|2012-03-15|
|          8|          102|     Kate Kim| 29|Female|51000.0|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male|58000.0|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female|47000.0|2018-08-01|
|         11|          104|   David Park| 38|  Male|65000.0|2015

In [0]:
total_len=updated_df_employee.count()
print(total_len)

20


In [0]:
#Column Renaming

final_df_employee=(df_employee
                   .withColumnRenamed('employee_id','empid')
                   .withColumnRenamed('department_id','deptid')
                   )
final_df_employee.show(truncate=True) #Display the DataFrame with default truncation (20 characters per column)

final_df_employee.show(truncate=False) #Display the DataFrame with no truncation of column values

final_df_employee.show(truncate=30) #Display the DataFrame with column values truncated to 30 characters

final_df_employee.show(n=10,truncate=True) #Display the first 10 rows of the DataFrame with default truncation

+-----+------+-------------+---+------+------+----------+
|empid|deptid|         name|age|gender|salary| hire_date|
+-----+------+-------------+---+------+------+----------+
|  001|   101|     John Doe| 30|  Male| 50000|2015-01-01|
|  002|   101|   Jane Smith| 25|Female| 45000|2016-02-15|
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|  004|   102|    Alice Lee| 28|Female| 48000|2017-09-30|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|  006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|
|  008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|
|  009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|  010|   104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|  011|   104|   David Park| 38|  Male| 65000|2015-11-01|
|  012|   105|   Susan Chen| 31|Female| 54000|2017-02-15|
|  013|   106|    Brian Kim| 45|  Male| 75000|2011-07-01|
|  014|   107|    Emily Lee| 26|Female| 46000|2019-01-01|
|  015|   106|

In [0]:
final_df_employee.schema

StructType([StructField('empid', StringType(), True), StructField('deptid', StringType(), True), StructField('name', StringType(), True), StructField('age', StringType(), True), StructField('gender', StringType(), True), StructField('salary', StringType(), True), StructField('hire_date', StringType(), True)])

In [0]:
final_df_employee2=final_df_employee.withColumn("Default Count",lit(1))
final_df_employee2.show()

+-----+------+-------------+---+------+------+----------+-------------+
|empid|deptid|         name|age|gender|salary| hire_date|Default Count|
+-----+------+-------------+---+------+------+----------+-------------+
|  001|   101|     John Doe| 30|  Male| 50000|2015-01-01|            1|
|  002|   101|   Jane Smith| 25|Female| 45000|2016-02-15|            1|
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|            1|
|  004|   102|    Alice Lee| 28|Female| 48000|2017-09-30|            1|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|            1|
|  006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|            1|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|            1|
|  008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|            1|
|  009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|            1|
|  010|   104|     Lisa Lee| 27|Female| 47000|2018-08-01|            1|
|  011|   104|   David Park| 38|  Male| 65000|2015-11-01|       

In [0]:
final_df_employee3=final_df_employee2.withColumn('deptid',col('deptid').cast('int'))
final_df_employee3.show()
final_df_employee3.schema

+-----+------+-------------+---+------+------+----------+-------------+
|empid|deptid|         name|age|gender|salary| hire_date|Default Count|
+-----+------+-------------+---+------+------+----------+-------------+
|  001|   101|     John Doe| 30|  Male| 50000|2015-01-01|            1|
|  002|   101|   Jane Smith| 25|Female| 45000|2016-02-15|            1|
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|            1|
|  004|   102|    Alice Lee| 28|Female| 48000|2017-09-30|            1|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|            1|
|  006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|            1|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|            1|
|  008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|            1|
|  009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|            1|
|  010|   104|     Lisa Lee| 27|Female| 47000|2018-08-01|            1|
|  011|   104|   David Park| 38|  Male| 65000|2015-11-01|       

StructType([StructField('empid', StringType(), True), StructField('deptid', IntegerType(), True), StructField('name', StringType(), True), StructField('age', StringType(), True), StructField('gender', StringType(), True), StructField('salary', StringType(), True), StructField('hire_date', StringType(), True), StructField('Default Count', IntegerType(), False)])

In [0]:
#Filtering based on 2 conditions

df_sal_more_than_50k=final_df_employee3.filter((col('salary')>50000) & (col('gender')=='Male'))
df_sal_more_than_50k.show()

df_sal_more_than_50k=final_df_employee3.filter((col('salary')>50000) | (col('gender')=='Male'))
df_sal_more_than_50k.show()

df_sal_more_than_50k.filter(df_sal_more_than_50k['deptid'].isin('101','105')).show()
df_sal_more_than_50k.filter(~(df_sal_more_than_50k['deptid'].isin('101','105'))).show()

+-----+------+-------------+---+------+------+----------+-------------+
|empid|deptid|         name|age|gender|salary| hire_date|Default Count|
+-----+------+-------------+---+------+------+----------+-------------+
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|            1|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|            1|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|            1|
|  009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|            1|
|  011|   104|   David Park| 38|  Male| 65000|2015-11-01|            1|
|  013|   106|    Brian Kim| 45|  Male| 75000|2011-07-01|            1|
|  015|   106|  Michael Lee| 37|  Male| 63000|2014-09-30|            1|
|  017|   105|  George Wang| 34|  Male| 57000|2016-03-15|            1|
|  019|   103|  Steven Chen| 36|  Male| 62000|2015-08-01|            1|
+-----+------+-------------+---+------+------+----------+-------------+

+-----+------+-------------+---+------+------+----------+------

In [0]:
#Null Checking

df_sal_more_than_50k.filter(df_sal_more_than_50k['deptid'].isNull()).show()
df_sal_more_than_50k.filter(df_sal_more_than_50k['deptid'].isNotNull()).show()

+-----+------+----+---+------+------+---------+-------------+
|empid|deptid|name|age|gender|salary|hire_date|Default Count|
+-----+------+----+---+------+------+---------+-------------+
+-----+------+----+---+------+------+---------+-------------+

+-----+------+-------------+---+------+------+----------+-------------+
|empid|deptid|         name|age|gender|salary| hire_date|Default Count|
+-----+------+-------------+---+------+------+----------+-------------+
|  001|   101|     John Doe| 30|  Male| 50000|2015-01-01|            1|
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|            1|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|            1|
|  006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|            1|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|            1|
|  008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|            1|
|  009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|            1|
|  011|   104|   David Park| 38

In [0]:
#Name Wildcards Like SQL

df_sal_more_than_50k.filter(df_sal_more_than_50k['name'].startswith('S')).show()
df_sal_more_than_50k.filter(df_sal_more_than_50k['name'].endswith('en')).show()
df_sal_more_than_50k.filter(df_sal_more_than_50k['name'].contains('el')).show()

df_sal_more_than_50k.filter(df_sal_more_than_50k['name'].like('%he%')).show()

+-----+------+-----------+---+------+------+----------+-------------+
|empid|deptid|       name|age|gender|salary| hire_date|Default Count|
+-----+------+-----------+---+------+------+----------+-------------+
|  012|   105| Susan Chen| 31|Female| 54000|2017-02-15|            1|
|  019|   103|Steven Chen| 36|  Male| 62000|2015-08-01|            1|
+-----+------+-----------+---+------+------+----------+-------------+

+-----+------+-----------+---+------+------+----------+-------------+
|empid|deptid|       name|age|gender|salary| hire_date|Default Count|
+-----+------+-----------+---+------+------+----------+-------------+
|  012|   105| Susan Chen| 31|Female| 54000|2017-02-15|            1|
|  019|   103|Steven Chen| 36|  Male| 62000|2015-08-01|            1|
+-----+------+-----------+---+------+------+----------+-------------+

+-----+------+-----------+---+------+------+----------+-------------+
|empid|deptid|       name|age|gender|salary| hire_date|Default Count|
+-----+------+----

In [0]:
final_df_employee2.select(col('deptid')).distinct().show() #Column wise distinct values
final_df_employee2.select(col('deptid')).distinct().orderBy(col('deptid')).show() #Column wise distinct values sorted
final_df_employee2.select(col('deptid')).distinct().orderBy(col('deptid').asc()).show() #Column wise distinct values sorted in ascending

+------+
|deptid|
+------+
|   101|
|   102|
|   103|
|   104|
|   105|
|   107|
|   106|
+------+

+------+
|deptid|
+------+
|   101|
|   102|
|   103|
|   104|
|   105|
|   106|
|   107|
+------+

+------+
|deptid|
+------+
|   101|
|   102|
|   103|
|   104|
|   105|
|   106|
|   107|
+------+



In [0]:
#Group By on selected column/columns with sum() and avg()

final_df_employee2.groupBy('deptid').count().show()
final_df_employee2.groupBy('deptid','gender').count().show()
final_df_employee2.groupBy('deptid').sum().show()
final_df_employee2.groupBy('deptid').avg().show()

+------+-----+
|deptid|count|
+------+-----+
|   101|    3|
|   102|    4|
|   103|    4|
|   104|    3|
|   105|    2|
|   107|    2|
|   106|    2|
+------+-----+

+------+------+-----+
|deptid|gender|count|
+------+------+-----+
|   101|  Male|    2|
|   101|Female|    1|
|   102|  Male|    1|
|   102|Female|    3|
|   103|Female|    1|
|   103|  Male|    3|
|   104|Female|    2|
|   104|  Male|    1|
|   105|Female|    1|
|   107|Female|    2|
|   106|  Male|    2|
|   105|  Male|    1|
+------+------+-----+

+------+------------------+
|deptid|sum(Default Count)|
+------+------------------+
|   101|                 3|
|   102|                 4|
|   103|                 4|
|   104|                 3|
|   105|                 2|
|   107|                 2|
|   106|                 2|
+------+------------------+

+------+------------------+
|deptid|avg(Default Count)|
+------+------------------+
|   101|               1.0|
|   102|               1.0|
|   103|               1.0|
|   

In [0]:
final_df_employee2.groupBy('deptid').count().show()
final_df_employee2.groupBy('deptid').mean().show()


final_df_employee2.groupBy('deptid').agg(
    count('deptid').alias('deptid_count'),
    sum('deptid').alias('total_deptid'),
    avg('deptid').alias('deptid_avg')
    ).show()


# '''
# groupBy().count()
# Usage: This is a shorthand method to count the number of rows for each group created by the groupBy operation.

# groupBy().agg(count(...))
# Usage: This method is more flexible and allows you to apply multiple aggregation functions. You explicitly specify which aggregation function to use and can name the resulting columns with alias.'''

+------+-----+
|deptid|count|
+------+-----+
|   101|    3|
|   102|    4|
|   103|    4|
|   104|    3|
|   105|    2|
|   107|    2|
|   106|    2|
+------+-----+

+------+------------------+
|deptid|avg(Default Count)|
+------+------------------+
|   101|               1.0|
|   102|               1.0|
|   103|               1.0|
|   104|               1.0|
|   105|               1.0|
|   107|               1.0|
|   106|               1.0|
+------+------------------+

+------+------------+------------+----------+
|deptid|deptid_count|total_deptid|deptid_avg|
+------+------------+------------+----------+
|   101|           3|       303.0|     101.0|
|   102|           4|       408.0|     102.0|
|   103|           4|       412.0|     103.0|
|   104|           3|       312.0|     104.0|
|   105|           2|       210.0|     105.0|
|   107|           2|       214.0|     107.0|
|   106|           2|       212.0|     106.0|
+------+------------+------------+----------+



In [0]:
dept=[(101,'CSE'),(102,'IT'),(103,'ECE'),(104,'ME'),(105,'EE'),(107,'EEE'),(106,'CE')]
schema=StructType([StructField(name='id',dataType=IntegerType()),
                   StructField(name='deptname',dataType=StringType())
                   ])
dept_data=spark.createDataFrame(data=dept,schema=schema)
dept_data.show()

+---+--------+
| id|deptname|
+---+--------+
|101|     CSE|
|102|      IT|
|103|     ECE|
|104|      ME|
|105|      EE|
|107|     EEE|
|106|      CE|
+---+--------+



In [0]:
dept_data.withColumn('Remarks',when((col('deptname')=='IT') | (col('deptname')=='CSE') ,'OK')\
                     .otherwise('Not OK')).show()

+---+--------+-------+
| id|deptname|Remarks|
+---+--------+-------+
|101|     CSE|     OK|
|102|      IT|     OK|
|103|     ECE| Not OK|
|104|      ME| Not OK|
|105|      EE| Not OK|
|107|     EEE| Not OK|
|106|      CE| Not OK|
+---+--------+-------+



In [0]:
final_df_employee2.show()

+-----+------+-------------+---+------+------+----------+-------------+
|empid|deptid|         name|age|gender|salary| hire_date|Default Count|
+-----+------+-------------+---+------+------+----------+-------------+
|  001|   101|     John Doe| 30|  Male| 50000|2015-01-01|            1|
|  002|   101|   Jane Smith| 25|Female| 45000|2016-02-15|            1|
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|            1|
|  004|   102|    Alice Lee| 28|Female| 48000|2017-09-30|            1|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|            1|
|  006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|            1|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|            1|
|  008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|            1|
|  009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|            1|
|  010|   104|     Lisa Lee| 27|Female| 47000|2018-08-01|            1|
|  011|   104|   David Park| 38|  Male| 65000|2015-11-01|       

Joins in Pyspark

In [0]:
innerjoin_df=final_df_employee2.join(dept_data,final_df_employee2.deptid==dept_data.id,"inner")
innerjoin_df.show()
innerjoin_df.count()
innerjoin_df.distinct().show()
innerjoin_df.count()

+-----+------+-------------+---+------+------+----------+-------------+---+--------+
|empid|deptid|         name|age|gender|salary| hire_date|Default Count| id|deptname|
+-----+------+-------------+---+------+------+----------+-------------+---+--------+
|  001|   101|     John Doe| 30|  Male| 50000|2015-01-01|            1|101|     CSE|
|  002|   101|   Jane Smith| 25|Female| 45000|2016-02-15|            1|101|     CSE|
|  007|   101|James Johnson| 42|  Male| 70000|2012-03-15|            1|101|     CSE|
|  003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|            1|102|      IT|
|  004|   102|    Alice Lee| 28|Female| 48000|2017-09-30|            1|102|      IT|
|  008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|            1|102|      IT|
|  020|   102|    Grace Kim| 32|Female| 53000|2018-11-01|            1|102|      IT|
|  005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|            1|103|     ECE|
|  006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|        

20

In [0]:
leftjoin_df=final_df_employee2.join(dept_data,final_df_employee2.deptid==dept_data.id,"left")
leftjoin_df.select('empid','deptid','deptname','name','age','gender','salary','hire_date').show()

+-----+------+--------+-------------+---+------+------+----------+
|empid|deptid|deptname|         name|age|gender|salary| hire_date|
+-----+------+--------+-------------+---+------+------+----------+
|  001|   101|     CSE|     John Doe| 30|  Male| 50000|2015-01-01|
|  002|   101|     CSE|   Jane Smith| 25|Female| 45000|2016-02-15|
|  003|   102|      IT|    Bob Brown| 35|  Male| 55000|2014-05-01|
|  004|   102|      IT|    Alice Lee| 28|Female| 48000|2017-09-30|
|  005|   103|     ECE|    Jack Chan| 40|  Male| 60000|2013-04-01|
|  006|   103|     ECE|    Jill Wong| 32|Female| 52000|2018-07-01|
|  007|   101|     CSE|James Johnson| 42|  Male| 70000|2012-03-15|
|  009|   103|     ECE|      Tom Tan| 33|  Male| 58000|2016-06-01|
|  008|   102|      IT|     Kate Kim| 29|Female| 51000|2019-10-01|
|  010|   104|      ME|     Lisa Lee| 27|Female| 47000|2018-08-01|
|  012|   105|      EE|   Susan Chen| 31|Female| 54000|2017-02-15|
|  011|   104|      ME|   David Park| 38|  Male| 65000|2015-11

In [0]:
rightjoin_df=final_df_employee2.join(dept_data,final_df_employee2.deptid==dept_data.id,"right")
rightjoin_df.select(
    col('empid').alias('eid'),
    col('deptid'),
    col('deptname'),
    col('name'),
    col('age'),
    col('gender'),
    col('salary'),
    col('hire_date')
).show()

+---+------+--------+-------------+---+------+------+----------+
|eid|deptid|deptname|         name|age|gender|salary| hire_date|
+---+------+--------+-------------+---+------+------+----------+
|007|   101|     CSE|James Johnson| 42|  Male| 70000|2012-03-15|
|002|   101|     CSE|   Jane Smith| 25|Female| 45000|2016-02-15|
|001|   101|     CSE|     John Doe| 30|  Male| 50000|2015-01-01|
|020|   102|      IT|    Grace Kim| 32|Female| 53000|2018-11-01|
|008|   102|      IT|     Kate Kim| 29|Female| 51000|2019-10-01|
|004|   102|      IT|    Alice Lee| 28|Female| 48000|2017-09-30|
|003|   102|      IT|    Bob Brown| 35|  Male| 55000|2014-05-01|
|019|   103|     ECE|  Steven Chen| 36|  Male| 62000|2015-08-01|
|009|   103|     ECE|      Tom Tan| 33|  Male| 58000|2016-06-01|
|006|   103|     ECE|    Jill Wong| 32|Female| 52000|2018-07-01|
|005|   103|     ECE|    Jack Chan| 40|  Male| 60000|2013-04-01|
|018|   104|      ME|    Nancy Liu| 29|Female| 50000|2017-06-01|
|011|   104|      ME|   D

In [0]:
rightjoin_df=final_df_employee2.join(dept_data,final_df_employee2.deptid==dept_data.id,"full")
rightjoin_df.select(
    col('empid').alias('eid'),
    col('deptid'),
    col('deptname'),
    col('name'),
    col('age'),
    col('gender'),
    col('salary'),
    col('hire_date')
).show()

+---+------+--------+-------------+---+------+------+----------+
|eid|deptid|deptname|         name|age|gender|salary| hire_date|
+---+------+--------+-------------+---+------+------+----------+
|001|   101|     CSE|     John Doe| 30|  Male| 50000|2015-01-01|
|002|   101|     CSE|   Jane Smith| 25|Female| 45000|2016-02-15|
|007|   101|     CSE|James Johnson| 42|  Male| 70000|2012-03-15|
|003|   102|      IT|    Bob Brown| 35|  Male| 55000|2014-05-01|
|004|   102|      IT|    Alice Lee| 28|Female| 48000|2017-09-30|
|008|   102|      IT|     Kate Kim| 29|Female| 51000|2019-10-01|
|020|   102|      IT|    Grace Kim| 32|Female| 53000|2018-11-01|
|005|   103|     ECE|    Jack Chan| 40|  Male| 60000|2013-04-01|
|006|   103|     ECE|    Jill Wong| 32|Female| 52000|2018-07-01|
|009|   103|     ECE|      Tom Tan| 33|  Male| 58000|2016-06-01|
|019|   103|     ECE|  Steven Chen| 36|  Male| 62000|2015-08-01|
|010|   104|      ME|     Lisa Lee| 27|Female| 47000|2018-08-01|
|011|   104|      ME|   D

In [0]:
leftanti_df=final_df_employee2.join(dept_data,final_df_employee2.deptid==dept_data.id,"leftanti")
leftanti_df.select(
    col('empid').alias('eid'),
    col('deptid'),
    col('name'),
    col('age'),
    col('gender'),
    col('salary'),
    col('hire_date')
).show()

# The output will include all rows from employee_df that do not have a corresponding deptid in dept_df, and it will display the deptname column as null for these rows.

leftsemi_df=final_df_employee2.join(dept_data,final_df_employee2.deptid==dept_data.id,"leftsemi")
leftsemi_df.select(
    col('empid').alias('eid'),
    col('deptid'),
    col('name'),
    col('age'),
    col('gender'),
    col('salary'),
    col('hire_date')
).show()

# The output will include all rows from employee_df that do not have a corresponding deptid in dept_df, and it will display the deptname column as null for these rows.

+---+------+----+---+------+------+---------+
|eid|deptid|name|age|gender|salary|hire_date|
+---+------+----+---+------+------+---------+
+---+------+----+---+------+------+---------+

+---+------+-------------+---+------+------+----------+
|eid|deptid|         name|age|gender|salary| hire_date|
+---+------+-------------+---+------+------+----------+
|001|   101|     John Doe| 30|  Male| 50000|2015-01-01|
|002|   101|   Jane Smith| 25|Female| 45000|2016-02-15|
|007|   101|James Johnson| 42|  Male| 70000|2012-03-15|
|003|   102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|004|   102|    Alice Lee| 28|Female| 48000|2017-09-30|
|008|   102|     Kate Kim| 29|Female| 51000|2019-10-01|
|020|   102|    Grace Kim| 32|Female| 53000|2018-11-01|
|005|   103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|006|   103|    Jill Wong| 32|Female| 52000|2018-07-01|
|009|   103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|019|   103|  Steven Chen| 36|  Male| 62000|2015-08-01|
|010|   104|     Lisa Lee| 27|F

In [0]:
#self join

self_df=dept_data.alias('dept_data1').join(dept_data.alias('dept_data2'),\
                                           col("dept_data1.id")==col("dept_data2.id"),"inner")
self_df.show()

+---+--------+---+--------+
| id|deptname| id|deptname|
+---+--------+---+--------+
|101|     CSE|101|     CSE|
|102|      IT|102|      IT|
|103|     ECE|103|     ECE|
|104|      ME|104|      ME|
|105|      EE|105|      EE|
|106|      CE|106|      CE|
|107|     EEE|107|     EEE|
+---+--------+---+--------+



In [0]:
#Union of dataframes in pyspark

dept1=[(101,'CSE'),(102,'IT'),(103,'ECE')]
schema=StructType([StructField(name='id',dataType=IntegerType()),
                   StructField(name='deptname',dataType=StringType())
                   ])
dept_data1=spark.createDataFrame(data=dept1,schema=schema)
dept_data1.show()

dept2=[(104,'ME'),(105,'EE'),(107,'EEE'),(106,'CE'),(103,'ECE')]
schema=StructType([StructField(name='id',dataType=IntegerType()),
                   StructField(name='deptname',dataType=StringType())
                   ])
dept_data2=spark.createDataFrame(data=dept2,schema=schema)
dept_data2.show()

print("------UNION RESULT-------")

dept_union=dept_data1.union(dept_data2)
dept_union.show()
dept_union.orderBy(col('id').asc()).show()

print("------Choose distinct from union both dataframes-------")

dept_unionAll=dept_data1.unionAll(dept_data2)
dept_unionAll.show()
dept_unionAll.orderBy(col('id').asc()).distinct().show()

+---+--------+
| id|deptname|
+---+--------+
|101|     CSE|
|102|      IT|
|103|     ECE|
+---+--------+

+---+--------+
| id|deptname|
+---+--------+
|104|      ME|
|105|      EE|
|107|     EEE|
|106|      CE|
|103|     ECE|
+---+--------+

------UNION RESULT-------
+---+--------+
| id|deptname|
+---+--------+
|101|     CSE|
|102|      IT|
|103|     ECE|
|104|      ME|
|105|      EE|
|107|     EEE|
|106|      CE|
|103|     ECE|
+---+--------+

+---+--------+
| id|deptname|
+---+--------+
|101|     CSE|
|102|      IT|
|103|     ECE|
|103|     ECE|
|104|      ME|
|105|      EE|
|106|      CE|
|107|     EEE|
+---+--------+

------Choose distinct from union both dataframes-------
+---+--------+
| id|deptname|
+---+--------+
|101|     CSE|
|102|      IT|
|103|     ECE|
|104|      ME|
|105|      EE|
|107|     EEE|
|106|      CE|
|103|     ECE|
+---+--------+

+---+--------+
| id|deptname|
+---+--------+
|101|     CSE|
|102|      IT|
|103|     ECE|
|104|      ME|
|105|      EE|
|107|     EEE

Window Functions in Pyspark

In [0]:
df_employee.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [0]:
from pyspark.sql.window import Window
window1=Window.partitionBy(col('department_id')).orderBy(col('salary').desc())
maxfunc=max(col('salary')).over(window1)
df_employee_new=df_employee.withColumn('Max Salary',maxfunc)
df_employee_new.show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Max Salary|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|     70000|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|     70000|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|     70000|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|     55000|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|     55000|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|     55000|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|     55000|
|        019|          103|  Steven Chen| 36|  Male| 62000|2015-08-01|     62000|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|     62000|
|        009|   

In [0]:
window2=Window.partitionBy(col('department_id')).orderBy(col('salary').asc())
maxfunc2=max(col('salary')).over(window2)
df_employee_new2=df_employee.withColumn('Max Salary',maxfunc2)
df_employee_new2.show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Max Salary|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|     45000|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|     50000|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|     70000|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|     48000|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|     51000|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|     53000|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|     55000|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|     52000|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|     58000|
|        005|   

In [0]:
window2=Window.partitionBy(col('department_id')).orderBy(col('salary').desc())
rn=row_number().over(window2)
df_employee_new2=df_employee.withColumn('Row Number',rn)
df_employee_new2.show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Row Number|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|         1|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|         2|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|         3|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|         1|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|         2|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|         3|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|         4|
|        019|          103|  Steven Chen| 36|  Male| 62000|2015-08-01|         1|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|         2|
|        009|   

In [0]:
df_employee_new21=df_employee_new2.withColumn('rn',expr("row_number() OVER(PARTITION BY department_id ORDER BY salary)"))
df_employee_new21.show()
df_employee_new22=df_employee_new21.withColumn('rank',expr("rank() OVER(PARTITION BY department_id ORDER BY salary)"))
df_employee_new22.show()
df_employee_new23=df_employee_new22.withColumn('dense_rank',expr("dense_rank() OVER(PARTITION BY department_id ORDER BY salary)"))
df_employee_new23.show()

+-----------+-------------+-------------+---+------+------+----------+----------+---+
|employee_id|department_id|         name|age|gender|salary| hire_date|Row Number| rn|
+-----------+-------------+-------------+---+------+------+----------+----------+---+
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|         3|  1|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|         2|  2|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|         1|  3|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|         4|  1|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|         3|  2|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|         2|  3|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|         1|  4|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|         4|  1|
|        009|          103|      Tom Tan| 33|  Male| 5

In [0]:
window3=Window.partitionBy(col('department_id')).orderBy(col('salary').desc())
lagvalue1=lag(col('salary'),1).over(window3)
lagvalue2=lag(col('salary'),1,0).over(window3)
df_employee_new24=df_employee_new2.withColumn('Prev Emp Salary',lagvalue1)
df_employee_new24.show()
df_employee_new25=df_employee_new24.withColumn('PrevEmpSalary:DefaultValue:found_null',lagvalue2)
df_employee_new25.show(truncate=True)

+-----------+-------------+-------------+---+------+------+----------+----------+---------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Row Number|Prev Emp Salary|
+-----------+-------------+-------------+---+------+------+----------+----------+---------------+
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|         1|           NULL|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|         2|          70000|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|         3|          50000|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|         1|           NULL|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|         2|          55000|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|         3|          53000|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|         4|          51000|
|        019|       

In [0]:
window3=Window.partitionBy(col('department_id')).orderBy(col('salary').desc())
leadvalue1=lead(col('salary'),1).over(window3)
leadvalue2=lead(col('salary'),1,0).over(window3)
df_employee_new1=df_employee.withColumn('Next Emp Salary',leadvalue1)
df_employee_new1.show()
df_employee_new2=df_employee_new1.withColumn('NextEmpSalary:Default:null',leadvalue2)
df_employee_new2.show(truncate=True)

+-----------+-------------+-------------+---+------+------+----------+---------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Next Emp Salary|
+-----------+-------------+-------------+---+------+------+----------+---------------+
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|          50000|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|          45000|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|           NULL|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|          53000|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|          51000|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|          48000|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           NULL|
|        019|          103|  Steven Chen| 36|  Male| 62000|2015-08-01|          60000|
|        005|          103|    Jack Chan| 4

In [0]:
#Partition - Repartition - Coalesce

print(df_employee.rdd.getNumPartitions())

new_df_employee=df_employee.repartition(5)
print(new_df_employee.rdd.getNumPartitions())
new_df_employee=new_df_employee.withColumn('Partition_num',spark_partition_id())
new_df_employee.show()
new_df_employee.groupBy(col('Partition_num')).count().show()

new_df_employee2=df_employee.coalesce(1)
print(new_df_employee2.rdd.getNumPartitions())

new_df_employee2=new_df_employee2.withColumn('Partition_num',spark_partition_id())
new_df_employee2.show()

8
5
+-----------+-------------+-------------+---+------+------+----------+-------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Partition_num|
+-----------+-------------+-------------+---+------+------+----------+-------------+
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|            0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|            0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|            0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|            0|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|            0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|            1|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|            1|
|        015|          106|  Michael Lee| 37|  Male| 63000|2014-09-30|            1|
|        020|          102|    Grace Kim| 32|Female| 53000|20

In [0]:
#Reading CSV File

df=spark.read.format('csv').load('dbfs:/FileStore/tables/Salaries.csv')
df.show()
df.printSchema()

df1=spark.read.format('csv').option("header",True).load('dbfs:/FileStore/tables/Salaries.csv')
df1.show()
df1.printSchema()

df2=spark.read.format('csv').option("header",True).option("inferSchema",False).load('dbfs:/FileStore/tables/Salaries.csv')
df2.show()
df2.printSchema()

df3=spark.read.format('csv').option("header",True).option("inferSchema",True).load('dbfs:/FileStore/tables/Salaries.csv')
df3.show()
df3.printSchema()

'''In this example, inferSchema=False is not explicitly set because we are providing a custom schema using the schema parameter. If you were to set inferSchema=False explicitly without providing a schema, Spark would use a default schema where all columns are treated as strings.

Setting inferSchema to False can be useful when you have a predefined schema for your data, and you want to avoid the additional overhead of Spark inferring the schema by making an extra pass over the data. It can also be beneficial for performance in situations where the dataset is large, and automatic schema inference might be resource-intensive.

When you set inferSchema=True, you're instructing PySpark to automatically deduce the data types of each column from the actual data. 

Additional Pass Over Data: PySpark makes an extra pass over the data to analyze the values in each column and infer their types.
Automatic Data Type Inference: Based on the observed values, PySpark determines the most appropriate data type for each column (e.g., StringType, IntegerType, DoubleType, etc.).
Convenience: This is convenient when you don’t have a predefined schema for your data, as it saves you from manually specifying the data types.
Potential Inaccuracy: While convenient, automatic inference might not always be accurate, especially if the dataset is large or if the data types vary within the columns.'''

+--------------------+-----+--------+
|                 _c0|  _c1|     _c2|
+--------------------+-----+--------+
|            Position|Level|Salaries|
|Associate Softwar...|    1|   45000|
|   Software Engineer|    2|   50000|
| Software Engineer 1|    3|   60000|
| Software Engineer 2|    4|   80000|
|Senior Software E...|    5|  100000|
|     Product Manager|    6|  125000|
|  Consultant Manager|    7|  150000|
|   Principal Manager|    8|  200000|
|                 CTO|    9|  250000|
|                 CEO|   10|  300000|
+--------------------+-----+--------+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

+--------------------+-----+--------+
|            Position|Level|Salaries|
+--------------------+-----+--------+
|Associate Softwar...|    1|   45000|
|   Software Engineer|    2|   50000|
| Software Engineer 1|    3|   60000|
| Software Engineer 2|    4|   80000|
|Senior Software E...|    5|  100000|
|     Product 

"In this example, inferSchema=False is not explicitly set because we are providing a custom schema using the schema parameter. If you were to set inferSchema=False explicitly without providing a schema, Spark would use a default schema where all columns are treated as strings.\n\nSetting inferSchema to False can be useful when you have a predefined schema for your data, and you want to avoid the additional overhead of Spark inferring the schema by making an extra pass over the data. It can also be beneficial for performance in situations where the dataset is large, and automatic schema inference might be resource-intensive.\n\nWhen you set inferSchema=True, you're instructing PySpark to automatically deduce the data types of each column from the actual data. \n\nAdditional Pass Over Data: PySpark makes an extra pass over the data to analyze the values in each column and infer their types.\nAutomatic Data Type Inference: Based on the observed values, PySpark determines the most appropri

In [0]:
#Read Modes
schema1="CustomerID int,Gen string,Age int,AnnualIncome int,SpendingScore int,_corrupt_record string" #permissive
schema2="CustomerID int,Gen string,Age int,AnnualIncome int,SpendingScore int" #drop malformed
path='dbfs:/FileStore/tables/Pandas_Dataset-2.csv'

df=spark.read.format('csv').option('header',True).option('inferSchema',True).schema(schema1).load(path)
df.show()
df.printSchema()

df1=spark.read.format('csv')\
    .option('header',True)\
    .option('inferSchema',True)\
    .option('mode','PERMISSIVE')\
    .schema(schema1)\
    .load(path)
df1.show()
df1.printSchema()

df2=spark.read.format('csv')\
    .option('header',True)\
    .option('mode','DROPMALFORMED')\
    .schema(schema2)\
    .load(path)
df2.show()
df2.printSchema()

df3=spark.read.format('csv')\
    .option('header',True)\
    .option('inferSchema',True)\
    .schema(schema1)\
    .option('mode','FAILFAST')\
    .load(path)
df3.show()
df3.printSchema()

+----------+------+----+------------+-------------+--------------------+
|CustomerID|   Gen| Age|AnnualIncome|SpendingScore|     _corrupt_record|
+----------+------+----+------------+-------------+--------------------+
|         1|  Male|  19|          15|           39|                NULL|
|         2|  Male|  21|          15|           81|                NULL|
|         3|  NULL|NULL|        NULL|            6|                NULL|
|         4|Female|  23|          16|           77|                NULL|
|         5|Female|  31|          17|           40|                NULL|
|         6|Female|  22|        NULL|         NULL|6,Female,22,"17,1...|
|         7|Female|  35|          18|            6|                NULL|
|         8|Female|  23|          18|           94|                NULL|
|         9|  NULL|  64|          19|            3|                NULL|
|        10|Female|  30|          19|           72|                NULL|
|        11|  Male|  67|          19|           14|

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1251549302140129>, line 33[0m
[1;32m     25[0m df2[38;5;241m.[39mprintSchema()
[1;32m     27[0m df3[38;5;241m=[39mspark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m'[39m[38;5;124mcsv[39m[38;5;124m'[39m)\
[1;32m     28[0m     [38;5;241m.[39moption([38;5;124m'[39m[38;5;124mheader[39m[38;5;124m'[39m,[38;5;28;01mTrue[39;00m)\
[1;32m     29[0m     [38;5;241m.[39moption([38;5;124m'[39m[38;5;124minferSchema[39m[38;5;124m'[39m,[38;5;28;01mTrue[39;00m)\
[1;32m     30[0m     [38;5;241m.[39mschema(schema1)\
[1;32m     31[0m     [38;5;241m.[39moption([38;5;124m'[39m[38;5;124mmode[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mFAILFAST[39m[38;5;124m'[39m)\
[1;32m     32[0m     [38;5;241m.[39mload(path)
[0;32m---> 33[0m df3[38;5;241m.[

In [0]:
df1.where("_corrupt_record is not null").show() #filtering corrupted record in df1
#in  df2 row 6 is missing as we have used the 'DROPMALFORMED' mode.

+----------+------+---+------------+-------------+--------------------+
|CustomerID|   Gen|Age|AnnualIncome|SpendingScore|     _corrupt_record|
+----------+------+---+------------+-------------+--------------------+
|         6|Female| 22|        NULL|         NULL|6,Female,22,"17,1...|
+----------+------+---+------------+-------------+--------------------+



In [0]:
all_options={
    "header":"true",
    "inferSchema":"true",
    "mode":"PERMISSIVE"
}
readdf=spark.read.format('csv').options(**all_options).load(path)
readdf.show()

+----------+------+----+------------+--------------+
|CustomerID|   Gen| Age|AnnualIncome|Spending Score|
+----------+------+----+------------+--------------+
|         1|  Male|  19|          15|            39|
|         2|  Male|  21|          15|            81|
|         3|  NULL|NULL|        NULL|             6|
|         4|Female|  23|          16|            77|
|         5|Female|  31|          17|            40|
|         6|Female|  22|  17,101,ABC|          NULL|
|         7|Female|  35|          18|             6|
|         8|Female|  23|          18|            94|
|         9|  NULL|  64|          19|             3|
|        10|Female|  30|          19|            72|
|        11|  Male|  67|          19|            14|
|      NULL|  NULL|NULL|        NULL|          NULL|
|        13|Female|  58|          20|            15|
|        14|Female|  24|          20|            77|
|        15|  Male|NULL|          20|            13|
|        16|  Male|  22|          20|         

In [0]:
#For different file formats like .parquet/.avro/.orc/.json
path='dbfs:/FileStore/tables/'
df1=spark.read.format('parquet')\
    .option('header',True)\
    .option('inferSchema',True)\
    .option('recursiveFileLookup',True)\ #check all parquet files recursively in one directory
    .schema(schema1)\
    .load(path)
df1.show()

In [0]:
#Creating a json file structure

schema = StructType([ 
	StructField("name",StringType()), 
	StructField("marks",ArrayType(IntegerType()))
])  
data = [("A", [80, 90, 95]), 
        ("B", [70, 85, 92])] 
df = spark.createDataFrame(data, schema) 
output_path = "/dbfs/FileStore/tables/output_jsons/marks.json"
# Write the DataFrame as JSON to the specified directory
df.write.json(output_path)

#Other write modes - 

df.write.format("parquet").save(path)
df.write.format("parquet").partitionBy('id').save(path)
df.write.format("parquet").mode('append').option('header',True).partitionBy('id').save(path)
df.write.format("parquet").mode('overwrite').option('header',True).partitionBy('id').save(path)
df.write.format("parquet").mode('ignore').option('header',True).partitionBy('id').save(path)
df.write.format("parquet").mode('error').option('header',True).partitionBy('id').save(path) #if it exists from previously it will fail
df.repartition(1).write.format("parquet").mode('error').option('header',True).partitionBy('id').save(path)

In [0]:
#Read JSON / Flatten JSON

df_json=spark.read.format('json').load(output_path)
df_json.show()
df_json.printSchema()

df_json=df_json.withColumn('flatten',explode('marks'))
df_json.select(
    col('name'),
    col('flatten').alias('mark')
).show()

+------------+----+
|       marks|name|
+------------+----+
|[80, 90, 95]|   A|
|[70, 85, 92]|   B|
+------------+----+

root
 |-- marks: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- name: string (nullable = true)

+----+----+
|name|mark|
+----+----+
|   A|  80|
|   A|  90|
|   A|  95|
|   B|  70|
|   B|  85|
|   B|  92|
+----+----+



In [0]:
''' orders.json
[
	{
		"id":1,
		"name":"john"
		"orders":[
			{"id":101,"product":"laptop","qty":1},
			{"id":102,"product":"mobile","qty":2}
		]
	},
	{
		"id":2,
		"name":"paul"
		"orders":[
			{"id":103,"product":"camera","qty":5},
			{"id":104,"product":"gopro","qty":3}
		]
	}
]
'''
ip="dbfs:/FileStore/orders.json"
df_jsons=spark.read.format('json').load(ip)

flatten_df=df_jsons.select("id","name",explode("orders").alias("orders_flatten"))
flatten_df.show()
final_df=flatten_df.select(
    "id",
    "name",
    "orders.id",
    "orders.product",
    "orders.qty"
)
final_df.show()

In [0]:
#Handling Duplicate Rows

newdept=[(104,'ME'),(105,'EE'),(107,'EEE'),(103,'CE'),(103,'CE'),(103,'ECE')]
schema=StructType([StructField(name='id',dataType=IntegerType()),
                   StructField(name='deptname',dataType=StringType())
                   ])
newdept=spark.createDataFrame(data=newdept,schema=schema)
newdept.show()
newdept.dropDuplicates().show()
newdept.dropDuplicates('id').show()
newdept.dropDuplicates(['id','deptname']).show()

+---+--------+
| id|deptname|
+---+--------+
|104|      ME|
|105|      EE|
|107|     EEE|
|103|      CE|
|103|      CE|
|103|     ECE|
+---+--------+

+---+--------+
| id|deptname|
+---+--------+
|104|      ME|
|105|      EE|
|107|     EEE|
|103|      CE|
|103|     ECE|
+---+--------+

+---+--------+
| id|deptname|
+---+--------+
|103|      CE|
|104|      ME|
|105|      EE|
|107|     EEE|
+---+--------+

+---+--------+
| id|deptname|
+---+--------+
|104|      ME|
|105|      EE|
|107|     EEE|
|103|      CE|
|103|     ECE|
+---+--------+



In [0]:
df_sal=spark.read.format('csv').option("header",True).option("inferSchema",True).load('dbfs:/FileStore/tables/Salaries.csv')
df_sal.show()

+--------------------+-----+--------+
|            Position|Level|Salaries|
+--------------------+-----+--------+
|Associate Softwar...|    1|   45000|
|   Software Engineer|    2|   50000|
| Software Engineer 1|    3|   60000|
| Software Engineer 2|    4|   80000|
|Senior Software E...|    5|  100000|
|     Product Manager|    6|  125000|
|  Consultant Manager|    7|  150000|
|   Principal Manager|    8|  200000|
|                 CTO|    9|  250000|
|                 CEO|   10|  300000|
+--------------------+-----+--------+



In [0]:
#UDF - User Defined Functions

def bonus(sal):
    return (int)(sal)*1.1


bonus_udf=udf(bonus)
spark.udf.register('bonus_sql',bonus,"double") #By registering UDFs, you ensure that they are correctly understood, serialized, and executed across a distributed Spark environment, leading to better performance and reliability.

df_sal_new=df_sal.withColumn("Bonus",round(bonus_udf("Salaries"),2))
df_sal_new.show()

+--------------------+-----+--------+--------+
|            Position|Level|Salaries|   Bonus|
+--------------------+-----+--------+--------+
|Associate Softwar...|    1|   45000| 49500.0|
|   Software Engineer|    2|   50000| 55000.0|
| Software Engineer 1|    3|   60000| 66000.0|
| Software Engineer 2|    4|   80000| 88000.0|
|Senior Software E...|    5|  100000|110000.0|
|     Product Manager|    6|  125000|137500.0|
|  Consultant Manager|    7|  150000|165000.0|
|   Principal Manager|    8|  200000|220000.0|
|                 CTO|    9|  250000|275000.0|
|                 CEO|   10|  300000|330000.0|
+--------------------+-----+--------+--------+



In [0]:
#Cache and Persist

df.cache()  # Stores the DataFrame in memory
df.show()   # Subsequent actions on 'df' will be faster as it is cached

df.persist(StorageLevel.MEMORY_AND_DISK)  # Stores the DataFrame in memory and disk
df.show()

'''cache() and persist() are used to store DataFrames (or RDDs) in memory for faster access during iterative computations or repeated queries.ache():
Purpose: Caches the DataFrame in memory only.
Usage: It is used when you want to store the DataFrame in memory for faster access during repeated operations.
Storage Level: cache() uses the default storage level: MEMORY_ONLY.
This means the DataFrame is stored in memory. If the memory is insufficient, some partitions may be recomputed when needed. 

persist():
Purpose: Allows you to specify different storage levels for caching, including memory and disk.
Usage: Used when you need more control over how the data is stored (e.g., memory, disk, or both).
Storage Levels: Common options include:
MEMORY_ONLY: Stores data in memory (same as cache()).
MEMORY_AND_DISK: Stores data in memory, but if memory is insufficient, it spills over to disk.
DISK_ONLY: Stores data only on disk.
MEMORY_ONLY_SER: Stores data in memory in a serialized format (more efficient).
'''


In [0]:
df_sal_new2=df_sal_new.withColumn("Bonus_col_without_UDF",expr("Salaries*1.1"))
df_sal_new2.show()               

+--------------------+-----+--------+--------+---------------------+
|            Position|Level|Salaries|   Bonus|Bonus_col_without_UDF|
+--------------------+-----+--------+--------+---------------------+
|Associate Softwar...|    1|   45000| 49500.0|              49500.0|
|   Software Engineer|    2|   50000| 55000.0|              55000.0|
| Software Engineer 1|    3|   60000| 66000.0|              66000.0|
| Software Engineer 2|    4|   80000| 88000.0|              88000.0|
|Senior Software E...|    5|  100000|110000.0|             110000.0|
|     Product Manager|    6|  125000|137500.0|             137500.0|
|  Consultant Manager|    7|  150000|165000.0|             165000.0|
|   Principal Manager|    8|  200000|220000.0|             220000.0|
|                 CTO|    9|  250000|275000.0|             275000.0|
|                 CEO|   10|  300000|330000.0|             330000.0|
+--------------------+-----+--------+--------+---------------------+



In [0]:
#Multiple Spark Configurations

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.optimizerEnabled", "true")

sparkConf.set("spark.sql.shuffle.partitions", "50")

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.driver.cores",8)
    .config("spark.driver.memory","1g")
    .config("spark.executor.memory", "2g") \
    .getOrCreate()



In [0]:
#Broadcast Join

''' A technique used in distributed computing to optimize joins between large and small datasets in PySpark. It is particularly useful when one of the datasets is small enough to fit into the memory of each worker node.

How It Works:
Broadcasting: The smaller DataFrame (or RDD) is "broadcasted" to all the worker nodes. This means that a copy of the smaller dataset is sent to every node in the cluster.
Local Join: Each worker node performs the join operation locally using its own partition of the larger DataFrame and the broadcasted smaller DataFrame. This avoids shuffling of the large DataFrame across the network, which can be costly in terms of performance.
Benefits:
Reduced Shuffling: By broadcasting the smaller DataFrame, you minimize the need to shuffle large amounts of data across the cluster, which speeds up the join operation.
Improved Performance: Broadcast joins can significantly reduce execution time for join operations when the smaller DataFrame can fit into memory.
Use Cases:
Small Lookup Tables: Ideal for cases where you have a large DataFrame and a small lookup table.
Optimization: Useful for scenarios where shuffling large amounts of data would otherwise be inefficient. '''

large_df = spark.read.csv("path/to/large.csv")
small_df = spark.read.csv("path/to/small.csv")

# Perform a broadcast join
result_df=large_df.join(broadcast(small_df),large_df["key"]==small_df["key"],"inner")

result_df.show()


In [0]:
#Bucketing in Spark

''' A technique used in Spark to optimize the performance of data processing operations,particularly for large-scale data sets.It helps in improving the efficiency of data access and query performance by organizing data into fixed-size buckets based on a specific column (the bucket column).

1. Improves Join Performance: When joining large datasets, bucketing can help by ensuring that rows with the same bucket column values are in the same bucket. This can minimize shuffling during joins, as Spark can perform more efficient joins on the bucketed columns.

2. Efficient Query Execution: Bucketing can speed up query execution by enabling more efficient data partitioning and retrieval. This is especially useful for aggregations and filtering operations.

3. Reduces Data Skew: Bucketing helps in distributing data more evenly across partitions, which can reduce data skew and improve load balancing across the cluster.

4. Optimizes Data Skipping: When querying bucketed tables, Spark can skip reading data from buckets that do not match the query criteria, leading to faster query execution.


Bucket Column: Data is divided into buckets based on the hash of the bucket column. Rows with the same bucket column value are placed into the same bucket.
Number of Buckets: You specify the number of buckets when writing the data. Each bucket is a file or directory containing data with the same hash range of the bucket column. '''

transactions_df = spark.createDataFrame([
    (1, "2024-01-01", 100.0),
    (2, "2024-01-02", 150.0),
    (1, "2024-01-03", 200.0),
    (2, "2024-01-04", 250.0)
], ["customer_id", "date", "amount"])

transactions_df.write.bucketBy(4, "customer_id").sortBy("date").format("parquet").save("dbfs:/FileStore/tables/transactions")

customer_df = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob")
], ["customer_id", "name"])

customer_df.write.format("parquet").save("dbfs:/FileStore/tables/customers")

transactions_df = spark.read.format("parquet").load("dbfs:/FileStore/tables/transactions")
customer_df = spark.read.format("parquet").load("dbfs:/FileStore/tables/customers")

joined_df = transactions_df.join(customer_df,on="customer_id", how="inner")
joined_df.show()

#Since both DataFrames are bucketed by customer_id,the join operation is more efficient because the data is already organized in a way that aligns with the join key.

In [0]:
#Delta Table
'''
Delta Lake is a storage layer that brings ACID transactions to Apache Spark and big data workloads.It supports schema enforcement,time travel,and efficient data reads and writes.
'''

transactions_df = spark.createDataFrame([
    (1,"2024-01-01",100.0),
    (2,"2024-01-02",150.0),
    (1,"2024-01-03",200.0),
    (2,"2024-01-04",250.0)
],["customer_id","date","amount"])

transactions_df.write.format("delta").partitionBy("customer_id").save("dbfs:/FileStore/tables/delta_transactions")

customer_df = spark.createDataFrame([
    (1,"Alice"),
    (2,"Bob")
],["customer_id","name"])

customer_df.write.format("delta").save("dbfs:/FileStore/tables/delta_customers")

transactions_df = spark.read.format("delta").load("dbfs:/FileStore/tables/delta_transactions")
customer_df = spark.read.format("delta").load("dbfs:/FileStore/tables/delta_customers")

joined_df = transactions_df.join(customer_df, on="customer_id", how="inner")

joined_df.show()

+-----------+----------+------+-----+
|customer_id|      date|amount| name|
+-----------+----------+------+-----+
|          1|2024-01-01| 100.0|Alice|
|          1|2024-01-03| 200.0|Alice|
|          2|2024-01-02| 150.0|  Bob|
|          2|2024-01-04| 250.0|  Bob|
+-----------+----------+------+-----+



In [0]:
'''
Z-Ordering is a technique used to optimize the physical layout of data files in Delta Lake tables. It co-locates related data in the same data file to improve the efficiency of query performance, particularly for queries that filter on certain columns.It is a powerful feature for improving query performance in Delta Lake, especially when dealing with large datasets and frequent filter queries.
'''

spark.sql("OPTIMIZE delta.'dbfs:/FileStore/tables/delta_transactions' ZORDER BY (customer_id)")

transactions_df = spark.read.format("delta").load("dbfs:/FileStore/tables/delta_transactions/")

# Perform a query filtering on the Z-Ordered column
result_df=transactions_df.filter(transactions_df.customer_id==1)


In [0]:
#Delta Table Creation using Spark SQL

spark.sql("""
CREATE TABLE transactions (
    customer_id INT,
    date STRING,
    amount DOUBLE
)
USING DELTA LOCATION 'dbfs:/FileStore/tables/delta_transaction' """)


spark.sql("""
INSERT INTO transactions
VALUES
    (1, '2024-01-01', 100.0),
    (2, '2024-01-02', 150.0),
    (1, '2024-01-03', 200.0),
    (2, '2024-01-04', 250.0)
""")


result_df = spark.sql("SELECT * FROM transactions WHERE customer_id = 1")
result_df.show()

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|          1|2024-01-01| 100.0|
|          1|2024-01-03| 200.0|
+-----------+----------+------+



In [0]:
#Using SQL
DESCRIBE HISTORY delta.'dbfs:/FileStore/tables/delta_transactions'

# View Delta table history using Spark SQL
history_df=spark.sql("DESCRIBE HISTORY delta.'dbfs:/FileStore/tables/delta_transaction'")
history_df.show(truncate=False)

#Time Travel
SELECT * FROM delta.'dbfs:/FileStore/tables/delta_transactions TIMESTAMP AS OF '2024-01-02 00:00:00'

In [0]:
#Data Skew and Salting

'''Data Skew - It occurs when the distribution of data across partitions is uneven. This can lead to performance bottlenecks in distributed processing systems because some nodes or partitions end up with significantly more data to process than others.

Causes of Data Skew:

1.Uneven Data Distribution: Some keys may have a much larger volume of data than others. For example, in a dataset where a specific key (e.g., a customer ID) is very common, this key will have more data associated with it, leading to skew.

2.Imbalanced Joins: When joining datasets, if one of the join keys has a disproportionately large amount of data, it can cause skew. For example, joining on a column with many repeated values can lead to some partitions handling much more data than others.

Effects of Data Skew:
-Increased Execution Time: Some tasks will take much longer than others, leading to increased overall job execution time.
-Resource Bottlenecks: Skewed data can cause certain nodes to run out of memory or CPU, affecting the performance of the entire cluster.

Detecting Data Skew:
1.Task Metrics: Examine the execution times of different tasks. Significant variations can indicate skew.
2.Data Distribution: Check the distribution of data across partitions.


Salting - A technique used to mitigate the effects of data skew by introducing randomness into the partitioning of data. The goal is to distribute data more evenly across partitions, thereby reducing skew.


1.Add a Salt Key:Introduce a new column, often called a "salt" column, to the data. This column contains random values that help in spreading the data more evenly.
2.Modify the Partitioning Column: When performing operations like joins, include the salt column in the partitioning key. This ensures that data is distributed more evenly across partitions.'''


df=df.withColumn("salt", floor(rand()*10))  # Add random salt values from 0 to 9

# Modify the join key to include salt
df1=df1.withColumn("join_key",concat(col("customer_id"),col("salt")))
df2=df2.withColumn("join_key",concat(col("customer_id"),col("salt")))

# Perform the join using the modified key
joined_df=df1.join(df2,on="join_key",how="inner")

'''Remove or adjust the salt column as needed after the join.By applying salting, you can help balance the workload across partitions and improve performance in distributed data processing environments.'''

Dbutils

In [0]:
#dbutils - It is a set of utilities provided by Databricks for interacting with the Databricks workspace and its various services. It includes functionalities for managing files, secrets, notebooks, jobs, and more. Here’s an overview of `dbutils` and the most commonly used commands:

#List files and directories:
dbutils.fs.ls("/mnt")

#Read a file:
dbutils.fs.head("/mnt/myfile.txt")

#Copy a file:
dbutils.fs.cp("dbfs:/source_path/file", "dbfs:/destination_path/file")

#Move a file:
dbutils.fs.mv("dbfs:/source_path/file", "dbfs:/destination_path/file")

#Remove a file or directory:
dbutils.fs.rm("dbfs:/path/to/file_or_directory",recurse=True)

#Create a directory:
# dbutils.fs.mkdirs("dbfs:/path/to/directory")

#Mount storage:
dbutils.fs.mount(
    source = "s3a://bucket-name",
    mountPoint = "/mnt/mount-name",
    extraConfigs = {"<conf-key>": "<conf-value>"}
  )

#Unmount storage:
dbutils.fs.unmount("/mnt/mount-name")

#Get a secret:
dbutils.secrets.get(scope="my-scope",key="my-key")

#Run a notebook:
result=dbutils.notebook.run("/path/to/notebook",timeout_seconds=60,arguments={"arg1": "value1"})

#Get notebook parameters:
dbutils.notebook.getContext().notebookPath()

#Create a widget:
dbutils.widgets.text("my-widget","default-value","Widget Label")

#Get widget value:
widget_value = dbutils.widgets.get("my-widget")

#Remove a widget:
dbutils.widgets.remove("my-widget")

In [0]:
dbutils.widgets.text("input_param","default_value","Input Parameter")
dbutils.widgets.dropdown("dropdown_param","option1",["option1", "option2", "option3"],"Dropdown Parameter")
dbutils.widgets.combobox("combobox_param","option2",["option1", "option2", "option3"],"Combobox Parameter")

# Retrieve widget values
input_value=dbutils.widgets.get("input_param")
dropdown_value=dbutils.widgets.get("dropdown_param")
combobox_value=dbutils.widgets.get("combobox_param")

# Display widget values(for debugging purposes)
print(f"Input Parameter: {input_value}")
print(f"Dropdown Parameter: {dropdown_value}")
print(f"Combobox Parameter: {combobox_value}")


data = [
    (1, "2024-01-01", 100.0),
    (2, "2024-01-02", 150.0),
    (1, "2024-01-03", 200.0),
    (2, "2024-01-04", 250.0)
]
schema=["customer_id","date","amount"]
df=spark.createDataFrame(data, schema)
filtered_df = df.filter(df["customer_id"]==int(input_value))
filtered_df.show()

# Example of running another notebook with parameters
result=dbutils.notebook.run("/path/to/other_notebook",timeout_seconds=60,arguments={
    "input_param": input_value,
    "dropdown_param": dropdown_value,
    "combobox_param": combobox_value
})

#Print result of the notebook run
print(result)

#Optional: Remove widgets if they are no longer needed
dbutils.widgets.remove("input_param")
dbutils.widgets.remove("dropdown_param")
dbutils.widgets.remove("combobox_param")