In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("myapp1").getOrCreate()

###employee table

In [0]:
emp_df=spark.read.option("header","True")\
                 .option("inferSchema","True")\
                  .csv("/Volumes/hr_catalog/hr_schema/hr_volume/hr_directory/employees.csv")   

In [0]:
from pyspark.sql.functions  import col

###filtering

In [0]:
#filter rows using equality
emp_df.filter(col("salary")=="17000").show()

In [0]:
emp_df.select("hire_date").display()

In [0]:
#filter with multiple conditions AND
display(
    emp_df
        .filter(
        (col("hire_date")>='2002-01-01') & 
        (col("hire_date")<="2005-12-31") & 
        (col("salary")>=5000) & 
        (col("salary")<=15000)).orderBy(col("salary")))

In [0]:
#Filter with OR condition
display(emp_df
        .filter
        ((col("First_name")=="Neena") |
         (col("first_name")=="Lex")))

In [0]:
#Filtering using NOT
emp_df.filter(~(col("job_id").isin("IT_PROG"))).show()

In [0]:
#filter with isin()
display(emp_df
        .filter(
        (col("department_id"))
          .isin(10,20,30)))

In [0]:
#filter with between()
emp_df.filter(
    (col("salary")).between(1500,20000)
).show()

In [0]:
#filter when column contains substring
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EmailFilter").getOrCreate()

data = [
    ("raj@gmail.com",),
    ("sita@yahoo.com",),
    ("john@outlook.com",),
    ("meera@gmail.com",),
    ("test@company.org",)
]

df = spark.createDataFrame(data, ["email"])
df.show()


In [0]:
#using endswith()
display(df.filter(
    col("email").endswith("@gmail.com")
))

In [0]:
#Using LIKE
display(df.filter(
    col("email").like('%@gmail.com')
))

In [0]:
#contains
display(df.filter(
        col("email").contains("@gmail.com")
        ))

In [0]:
from pyspark.sql.functions import col, instr, length, substring

display(
  df.select(
    substring(
      col("email"),
      instr(col("email"), "@"),10
        ).alias("domain")).filter(col("domain")=="@gmail.com"))
  


In [0]:
#Filter using startswith() /endswith()
display(emp_df.filter
        (col("first_name").startswith("S")))

In [0]:
display(emp_df.filter(
    col("job_id").endswith("K")
))

In [0]:
#filter using sql expression
display(emp_df.
        filter(
            (col("salary")>=1500) 
        & 
        (col("department_id").isin(10,20,30))))

###Import

In [0]:

  from pyspark.sql import functions as F

In [0]:
display(
    emp_df.withColumn(
        "bonus",
        F.col("salary")*0.2
        ).orderBy(
            F.col("Salary").desc()))

In [0]:
#when   otherwise
display(emp_df.withColumn(
    "staus",F.when(F.col("salary")>10000,"high").otherwise("low")
))

In [0]:
#upper
display(emp_df.withColumn
        ("name_upper",F.upper("First_name")))

In [0]:
#trim
display(emp_df.withColumn(
    "name_trimmed",
        F.trim(F.col("first_name"))).orderBy(col("salary").desc()))

In [0]:
#concat
display(emp_df.withColumn
        ("fullname",
         F.concat(
             F.col("first_name"),
             F.lit(" "),
             F.col("last_name"))))

In [0]:
#substr
display(
    emp_df.withColumn(
        "first_3",
        F.substring(
            F.col("first_name"),
            1,
            3
        )
    )
)

In [0]:
#length
display(
    emp_df.withColumn(
        "name_len",
        F.length(
            F.col("first_name")
        )
    )
)

In [0]:
#split
display(
    emp_df.withColumn(
        "job_parts",F.split(col("job_id"), " ")
    )
)

In [0]:
#current_date
display(
    emp_df.withColumn(
    "today",
    F.current_date()
)
)

In [0]:
#date_add
display(
    emp_df.withColumn(
        "next_week",
        F.date_add(
            F.col("hire_date"),7
        )
    )
)

In [0]:
#date_sub
display(
    emp_df.withColumn(
        "sub_day",
        F.date_sub(
            col("hire_date"),7
        )
    )
)

In [0]:
#datediff
display(
    emp_df.withColumn(
        "date_diff", F.datediff(
            col("end_date"), F.col("start_date")
        )
    )
)

In [0]:
#year
display(
    emp_df.withColumn(
        "year", F.year(col("hire_date"))
    )
)

In [0]:
#month
display(
    emp_df.withColumn(
        "month",F.month(col("hire_date"))
    )
)

In [0]:
#dayofmonth
display(
    emp_df.withColumn(
        "day", F.dayofmonth (col("hire_date"))
    )
)

In [0]:
#na
display(
    emp_df.na.fill({"commission_pct" : 0})
)

In [0]:
#isnotnull
display(
    emp_df.filter(F.col("commission_pct").isNotNull())
)

In [0]:
#group_by
display(
    emp_df.groupBy(
        "department_id").
        agg(F.avg("salary").alias("avg_sal"))
    )


In [0]:
#groupBy (sum)
display(
    emp_df.groupBy(
        "department_id").
    agg(F.sum("salary").
        alias("sum_sal"))
    )


In [0]:
#orderBy
emp_df.orderBy(col("salary").desc()).show()

In [0]:
emp_df.filter(
    col("salary")>=15000).show()


####Department table

In [0]:
dept_df=spark.read.option("header","True")\
                 .option("inferSchema","True")\
                  .csv("/Volumes/hr_catalog/hr_schema/hr_volume/hr_directory/departments.csv")  

In [0]:
display(
    emp_df.join(
        dept_df,"department_id", "left"
    )
)

In [0]:
emp_df.show()

In [0]:
emp_df.join(
    dept_df,emp_df["department_id"] == dept_df["department_id"],"inner"
).show()

###Joins


####show tables from other schema in spark sql command

In [0]:
spark.sql('SHOW TABLES IN hr_catalog.hr_schema').show()

In [0]:
%sql
select * from hr_catalog.hr_schema.employees e, hr_catalog.hr_schema.departments d where e.department_id=d.department_id;

In [0]:
#inner join  in spark sql
display(
    spark.sql(
        """
        SELECT *
        FROM hr_catalog.hr_schema.employees e
        INNER JOIN hr_catalog.hr_schema.departments d
            ON e.employee_id = d.department_id
        """
    )
)

In [0]:
#inner join in pyspark
emp_df.join(
    dept_df, on="department_id", how="inner"
).display()

In [0]:
#left join
display(
    emp_df.join(
        dept_df,on="department_id", how="left"
    )
)

In [0]:
#Right join
display(
    emp_df.join(
        dept_df, on="department_id", how="Right"
    )
)

In [0]:
#Full join
display(
    emp_df.join(
        dept_df, on="department_id", how="full"
    )
)

In [0]:
#left semi join
display(
    emp_df.join(
        dept_df, on="department_id", how="left_semi"
                )
)

In [0]:
#left anti join
display(
    emp_df.join(
        dept_df,on="department_id", how="left_anti")
)

In [0]:
#join on multiple columns
display(
    emp_df.join(
        dept_df, on=["department_id"]), how="inner"
)

In [0]:
#Join and select specific columns
display(
    emp_df.join(
        dept_df, on="department_id", how="inner").select (emp_df["first_name"],dept_df["department_name"])
)

In [0]:
from pyspark.sql.functions import col

In [0]:
#self join (join a dataframe itself)
display(
    emp_df.alias("a").join
        (emp_df.alias("b"),
         col("a.manager_id")==col("b.employee_id"),"inner")
         .select (col("a.first_name"),col("b.first_name").alias("manager")))

In [0]:
%sql
select * from hr_catalog.hr_schema.employees e , hr_catalog.hr_schema.departments d , hr_catalog.hr_schema.locations l
where e.department_id=d.department_id and 
d.location_id=l.location_id;

####location table


In [0]:
loc_df=spark.read.format("csv").option("header","True").option("inferSchema","True").load("/Volumes/hr_catalog/hr_schema/hr_volume/hr_directory/locations.csv")
loc_df.display()
loc_df.printSchema()

In [0]:
display(
    emp_df.join(
        dept_df,on='department_id', how="inner")
    .join(loc_df,on="location_id",how="inner")
)

###column operations


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, concat

In [0]:
data = [
    (1, "Alice", 25, 50000.0, "HR"),
    (2, "Bob", 30, 65000.0, "IT"),
    (3, "Charlie", 35, 75000.0, "Finance"),
    (4, "David", 22, 45000.0, "HR"),
    (5, "Eve", 40, 90000.0, "IT")
]

In [0]:
columns = ["id", "name", "age", "salary", "department"]

In [0]:
df = spark.createDataFrame(data, columns)

In [0]:
df.printSchema()
df.show()

In [0]:
#Add a new column with a constant value
df.withColumn(
    "Region", lit("india")).display()

In [0]:
#Add a new column bansed on calculation
df.withColumn(
    "hike",col("salary")*0.20).display()

In [0]:
#Rename a column
display(
    df.withColumnRenamed("name","employee_name")
)

In [0]:
#Drop a column
display(df.drop("department"))

In [0]:
#conditional column with when() and otherwise()

display(
    df.withColumn("status", when( col("salary")>60000,"high_sal").otherwise("low_sal")))

In [0]:
#crate a column with existing column using concat()
display(
    emp_df.withColumn(
        "full_name",concat(col("first_name"),lit(" "),col("last_name"))
    )
)


In [0]:

df.printSchema()
df.show()

In [0]:
#change column datatype
display(
    df.withColumn("age",df["age"].cast("integer")).printSchema())



In [0]:
#Replace values in a column
from pyspark.sql.functions import col, when

display(
    df.withColumn(
        "dept_name",
        when(
            col("department") == "HR",
            "HumanResourse").when(col("department")=="IT","InformationTechnology"
        ).otherwise(col("department"))
    )
)

In [0]:
#use alias to nename column in a select
display(
    df.select(col("department").alias("dept")
))


###ordering

In [0]:
from pyspark.sql.functions import desc, asc

In [0]:
#short dataframe in ascending order
display(df.orderBy(col("salary").desc()))

In [0]:
#sort dataframe in descending order
display(
  emp_df.orderBy(desc("salary")))

In [0]:
#sory by multiple columns
display(df.orderBy(desc("salary"),("age")))

In [0]:
#sory using sort() instread of orderBy()
display(df.sort("salary"))

In [0]:
#sort with mixed asc() and desc()
display(
    df.orderBy(df["id"].asc(), df["name"].asc())
)

In [0]:
#sort by column name as a string list
display(
    df.orderBy(col("department"),
               col("salary")),
               ascending=[True,False])

In [0]:
from pyspark.sql.functions import expr

In [0]:
#sort using SQL expresion
display(
    df.sort(expr("salary DESC" ))
)

In [0]:
#sort null values first
display((emp_df.orderBy(col("commission_pct").desc_nulls_first())
).select("commission_pct"))

In [0]:
df.show()

In [0]:
df.select("age").show()

In [0]:
#retrive  top n rows after sorting
display(
    df.orderBy(col("salary").desc()).limit(2)
)

##Null

In [0]:
##drop rows with any null valus
drop_null_df=emp_df.na.drop().display()

In [0]:
#Drop rows where all columns are null
drop_all_col_null_df=emp_df.na.drop(how="all").display()

In [0]:
#Drop rows with nulls in specific columns
drop_null_df1=emp_df.na.drop(subset=["commission_pct","manager_id"]).display()

In [0]:
from pyspark.sql.functions import *

In [0]:
#fill null values with a constant value
display(emp_df.na.fill(0))

In [0]:
#fill nulls with constant value
null_values_fill_df=emp_df.na.fill({"first_name":"Stevan","manager_id":0}).display()

In [0]:
#Replace specific values
display(
    emp_df.na.replace("Null", None)
)

In [0]:
#filter out rows with null values in a column
display(
    emp_df.filter(col("commission_pct").isNotNull())
.select("commission_pct"))

In [0]:
#keeps only rows with a column
display(
    emp_df.filter(col("commission_pct").isNull()).
select("commission_pct"))

In [0]:
#count null values in a specific column
display(
    emp_df.filter(
        (col("commission_pct").isNull())).count()
    )


#Data cleaning

In [0]:
df.show()

In [0]:
new_df = [
    (6, "Alice", 25, 50000.0, "HR"),
    (7, "Bob", 30, 65000.0, "IT"),
    (8, "Charlie", 35, 75000.0, "Finance"),
    (9, "David", 22, 45000.0, "HR"),
    (10, "Eve", 40, 90000.0, "IT")
]


In [0]:
columns = ["id", "name", "age", "salary", "department"]

In [0]:
 new_df=spark.createDataFrame(data, columns)
 df = df.unionByName(new_df)
 df.show()

In [0]:
#drop duplicates
display(new_df.dropDuplicates())

In [0]:
#drop duplicates with specific columns
display(
    emp_df.dropDuplicates (["job_id"])
.select("job_id"))

In [0]:
#convert test to lowercase or uppercase
display(
    emp_df.withColumn(
        "upper_case",upper(col("first_name")))
        .withColumn("lower_case",lower(col("last_name"))).select("upper_case","lower_case"))


In [0]:
#Replace invalid or missing values
display(
    emp_df.withColumn(
        "status",when(col("commission_pct").isNull(), 0).
        otherwise(1)
    ).select("status")
)

In [0]:
data=[(1,"Male","HR"),(2,"Female","HR"),(3,"Male","IT"),(4,"Female","IT"),(5,"Unknown","IT")]
df=spark.createDataFrame(data,["Emp_id","Gender","Department"])

In [0]:
df.show()

In [0]:
df_count=df.groupBy("gender").count()
display(df_count)

In [0]:
df_newcol=df_count.withColumn("gender_flag", 
                              when (col("gender")=="Male","M").
                              when (col("gender")=="Female" ,"F").
                              otherwise("invalid"))

df_newcol.display()

#Strings

In [0]:
#convert strings to lower case
display(
  emp_df.withColumn(
    "lower_case", 
    lower(col("first_name"))
  ).select(
    "first_name", 
    "lower_case"
  )
)

In [0]:
#convert string to uppercase
display(
    emp_df.withColumn(
    "upper_name", 
    upper(col("last_name"))).
    select("last_name","upper_name"))


In [0]:
#get substring from a column
display(
    emp_df.withColumn(
        "name",substring(col("first_name"),1,2)
    ).select("name")
)

In [0]:
#concatenate multiple string columns
display(
    emp_df.withColumn(
        "full_name_sal",concat_ws(
            col("first_name"),
            col("last_name"),
            col("salary"))
    )
)

In [0]:
#concatenate with a separator
display(
emp_df.withColumn(
"full_name",concat_ws("  ",col("first_name"),col("last_name"))).orderBy(col("salary").desc()).
select("salary","full_name"))

In [0]:
#Replace part of a string
display(
    df.withColumn(
        "name",
        regexp_replace(
        col("gender"),"male","Murugan"))
    )           
    


In [0]:
#split a sting into an array
display(
    df.withColumn("name_parts",split(col("gender"), " ")
    )
    )


##Date and Time

In [0]:
#get current date and timestamp
display(
  df.withColumn(
    "current_date",current_date()).
  withColumn("current_timestamp",current_timestamp())
  )


In [0]:
#calculate the date difference between two days
display(
    emp_df.withColumn(
        "date_diff",datediff("end_date","start_date")
    )
)

In [0]:
#Add or subtract from a days
display(
    emp_df
    .withColumn(
        "next_week",
        date_add(col("hire_date"), 7)
    )
    .withColumn(
        "last_week",
        date_sub(col("hire_date"), 7)
    )
)

In [0]:
#Extract year, month,day from the date
display(
    emp_df.withColumn(
        "year",year(col("hire_date")))
    .withColumn(
        "month",month(col("hire_date"))).
    withColumn(
        "day", dayofmonth(col("hire_date"))
    )
)

In [0]:
# calculate months between two days
display(
    emp_df.withColumn(
        "months_between",months_between(col("hire_date")))
)

In [0]:
#convert string to date
df.printSchema()

In [0]:
#format date as string
display(
    emp_df.withColumn(
        "formatted_date",date_format(col("hire_date"), "mm/dd/yyyy")
    ).select("hire_date","formatted_date")
)

In [0]:

emp_df.select("hire_date").display()

In [0]:
#filter row by date range
display(
    emp_df.filter(
        (col("hire_date")>=lit("2025-09-30")) &
    (col("hire_date")<=lit("2002-12-07"))))
       

In [0]:
emp_df.schema.fields

In [0]:
emp_df.schema.names

In [0]:
emp_df.columns

In [0]:
data=data = [
    (1, "A100", "2024-05-01 10:00:00", 250.00),
    (2, "A100", "2024-05-01 12:30:00", 260.00),
    (3, "A100", "2024-05-02 09:15:00", 265.00),
    (4, "B200", "2024-05-03 14:10:00", 300.00),
    (5, "C300", "2024-05-02 08:50:00", 150.00),
    (6, "C300", "2024-05-02 09:00:00", 155.00),
    (7, "D400", "2024-05-01 16:00:00", 180.00)
]
df=spark.createDataFrame(data,["Order_id","Order_no","updated_at","amount"])
df.display()    

In [0]:
from pyspark.sql import functions as f

In [0]:
w = Window.partitionBy("order_no").orderBy(col("updated_at").desc())
display(df.withColumn("rn",f.row_number().over(w)).filter("rn=1"))

In [0]:
display(
    df.groupBy("order_no").count().filter("count >1"))
                                                                             
                                          

##Aggr function

In [0]:
display(
  emp_df.agg(sum(col("salary")))
)

In [0]:
display(
    emp_df.agg(max("salary"))
)

In [0]:
display(
    emp_df.agg(min("salary"))
)

In [0]:
display(
    emp_df.agg(avg("salary"))
)

In [0]:
display(
    emp_df.agg(count("commission_pct")
    ))

In [0]:
display(
    emp_df.groupBy("department_id").agg(sum("salary").alias("total_sal"))
    .filter(col("total_sal")>10000) .
    orderBy(asc("department_id"))
)

In [0]:
#total no. of employees
display(
    emp_df.count()
)

In [0]:
spark.sql("select max(salary),min(salary)from hr_catalog.hr_schema.employees").display()

In [0]:
#highest and lowest salary
display(
    emp_df.agg(max("salary").alias("max_sal"),
    (min("salary")).alias("min_sal")
))

In [0]:
from pyspark.sql.functions import *

In [0]:
#Total salary paid by company
display(
    emp_df.agg(
        sum(col("salary")))
    )

    #emp_df.agg(sum(col("salary")))


In [0]:
#number of employees in each department
display(
    emp_df.groupBy("department_id").agg(count(col("employee_id")))
)

In [0]:
#average salary per department
display(emp_df.groupBy("department_id").agg(round(avg("salary"))).alias("avg_sal"))

In [0]:
#departments with more than five employees
display(
    emp_df.groupBy("department_id").agg(count("employee_id").alias("num_employees")).filter("num_employees>5")
)

In [0]:
#Avg salary byjob title
display(
    emp_df.groupBy("job_id").agg(avg("salary"))
)

In [0]:
#department wise maximum salary
display(
    emp_df.groupBy("department_id").
    agg(max("salary"))
)

In [0]:
#number of employees hired each year
display(
    emp_df.groupBy("hire_date")
    .agg(count("employee_id"))
)

In [0]:
#departments where avg salary > 80000
display(
    emp_df.groupBy("department_id").
    agg(avg("salary"))
)

In [0]:
#total salary by job excluding commission
display(
    emp_df.groupBy("job_id").
    agg(sum("salary"))
)

In [0]:
#count of employees receiving commission
display(
    emp_df.groupBy("commission_pct").
    agg(count("employee_id"))
)

In [0]:
#department name with employee count
display(
    emp_df.groupBy("department_id").
    agg(count("employee_id")).join(dept_df, "department_id")

)

In [0]:
#highest salary in each department (with department_name)
display(
    emp_df.groupBy("department_id").
    agg(max("salary").alias("max_sal"))
        .join(dept_df,"department_id")
                .select("department_id","department_name","max_sal"))


##window function

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [0]:
#Rank employees by salary within each department
w=Window.partitionBy("department_id").orderBy("salary")
display(
    emp_df.withColumn(
        "rank",F.rank().over(w)
    )
)

In [0]:
#Find the highest-paid employee in each department
#Return employees who earn the maximum salary in their department.

display(
    emp_df.withColumn("max_sal",F.max("salary").over(Window.partitionBy("department_id"))).
    filter("salary=max_sal").select("department_id","max_sal","employee_id")
)

In [0]:
#calculate the running total of salary within each department
display(
    spark.sql(
        """
        SELECT
            employee_id,
            department_id,
            salary,
            SUM(salary) OVER (
                PARTITION BY department_id
                ORDER BY salary
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            ) AS running_total
        FROM hr_catalog.hr_schema.employees
                """
    )
)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
#calculate rank(),dense_ranK(),row_number()
windowspec=Window.partitionBy("department_id").orderBy(desc("salary"))
display(
  emp_df.withColumn("rank",rank().over(windowspec))\
        .withColumn("denserank",dense_rank().over(windowspec))\
         .withColumn("rownum",row_number().over(windowspec))\
         .withColumn("lead_sal",lead("salary",1).over(windowspec))\
          .withColumn("lag_sal",lag("salary",1).over(windowspec)).
          select("department_id","salary","rank","denserank","rownum","lead_sal","lag_sal")
         
)

In [0]:
#Frame specification rowsBetween()
windowspecframe=Window.partitionBy("department_id").orderBy((col("salary").desc())).rowsBetween(-2,0)
display(
    emp_df.withColumn(
        "rolling_sum",sum("salary").over(windowspecframe)).
            select("department_id","salary","rolling_sum")
)

In [0]:
#rangeBetween
window_spec_frame=Window.partitionBy("department_id").orderBy((col("salary")).desc()).rangeBetween(1000,24000)
display(
    emp_df.withColumn(
        "rangebetween",sum("salary").over(window_spec_frame)).
            select("department_id","salary","rangebetween")
)
    

In [0]:
from pyspark.sql.functions import *

In [0]:
w_date=Window.orderBy("hire_date").rowsBetween(-1,1)
display(
    emp_df.withColumn(
        "maxsal",
        max("salary").over(w_date)
).select("hire_date","salary","maxsal"))