In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("Test").getOrCreate()

In [0]:

file_location = "/FileStore/tables/test-4.csv"
file_type = "csv"


infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

name,Dept,salary
chandu,iot,10000
chandu,DS,5000
rashmi,big data,2000
rashmi,iot,4000
krish,DS,20000
krish,iot,5000
krish,big data,9000


In [0]:
# GROUP BY - Used to collect identical data into the grp on df and perform aggregation function,group by Departments which gives summation of salaries
df.groupBy("Dept").sum("salary").show()


+--------+-----------+
|    Dept|sum(salary)|
+--------+-----------+
|big data|      11000|
|     iot|      19000|
|      DS|      25000|
+--------+-----------+



In [0]:
df.groupBy("Dept").min("salary").show()

+--------+-----------+
|    Dept|min(salary)|
+--------+-----------+
|big data|       2000|
|     iot|       4000|
|      DS|       5000|
+--------+-----------+



In [0]:
df.groupBy("Dept").max("salary").show()

+--------+-----------+
|    Dept|max(salary)|
+--------+-----------+
|big data|       9000|
|     iot|      10000|
|      DS|      20000|
+--------+-----------+



In [0]:
df.groupBy("Dept").avg("salary").show()


+--------+-----------------+
|    Dept|      avg(salary)|
+--------+-----------------+
|big data|           5500.0|
|     iot|6333.333333333333|
|      DS|          12500.0|
+--------+-----------------+



In [0]:
df.groupBy("Dept").mean("salary").show()

+--------+-----------------+
|    Dept|      avg(salary)|
+--------+-----------------+
|big data|           5500.0|
|     iot|6333.333333333333|
|      DS|          12500.0|
+--------+-----------------+



In [0]:
df.groupBy("Dept").count().show()

+--------+-----+
|    Dept|count|
+--------+-----+
|big data|    2|
|     iot|    3|
|      DS|    2|
+--------+-----+



In [0]:
df.groupBy("Dept").agg(({"salary":"sum"})).show()

+--------+-----------+
|    Dept|sum(salary)|
+--------+-----------+
|big data|      11000|
|     iot|      19000|
|      DS|      25000|
+--------+-----------+



In [0]:
#pivot()=It is an aggregation where one of the grouping column values is transposed into individual columns with distinct data.
df.groupBy("Dept").pivot("name").sum("salary").show()

+--------+------+-----+------+
|    Dept|chandu|krish|rashmi|
+--------+------+-----+------+
|big data|  NULL| 9000|  2000|
|     iot| 10000| 5000|  4000|
|      DS|  5000|20000|  NULL|
+--------+------+-----+------+



In [0]:
#HANDLING MISIING VALUES IN PYSPARK
df.na.drop().show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
|chandu|     iot| 10000|
|chandu|      DS|  5000|
|rashmi|big data|  2000|
|rashmi|     iot|  4000|
| krish|      DS| 20000|
| krish|     iot|  5000|
| krish|big data|  9000|
+------+--------+------+



In [0]:
## if all values in rows are null then drop

df.na.drop(how="all").show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
|chandu|     iot| 10000|
|chandu|      DS|  5000|
|rashmi|big data|  2000|
|rashmi|     iot|  4000|
| krish|      DS| 20000|
| krish|     iot|  5000|
| krish|big data|  9000|
+------+--------+------+



In [0]:
# if all values in rows are null then drop
df.na.drop(how = "any", thresh =2).show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
|chandu|     iot| 10000|
|chandu|      DS|  5000|
|rashmi|big data|  2000|
|rashmi|     iot|  4000|
| krish|      DS| 20000|
| krish|     iot|  5000|
| krish|big data|  9000|
+------+--------+------+



In [0]:
# # only in that column rows get deleted
df.na.drop(how = "any",subset =["salary"]).show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
|chandu|     iot| 10000|
|chandu|      DS|  5000|
|rashmi|big data|  2000|
|rashmi|     iot|  4000|
| krish|      DS| 20000|
| krish|     iot|  5000|
| krish|big data|  9000|
+------+--------+------+



In [0]:
#SORTING BASED ON SINGLE COLUMN -sort() — To sort a dataframe by using one or more columns, Default — ascending order

df.sort("salary").show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
|rashmi|big data|  2000|
|rashmi|     iot|  4000|
|chandu|      DS|  5000|
| krish|     iot|  5000|
| krish|big data|  9000|
|chandu|     iot| 10000|
| krish|      DS| 20000|
+------+--------+------+



In [0]:
#SORTING BASED ON DESCENDING ORDER
df.sort(df["salary"].desc()).show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
| krish|      DS| 20000|
|chandu|     iot| 10000|
| krish|big data|  9000|
|chandu|      DS|  5000|
| krish|     iot|  5000|
|rashmi|     iot|  4000|
|rashmi|big data|  2000|
+------+--------+------+



In [0]:
#SORTING  BASED ON TWO COLUMNS "NAME" AND "DEPT"
df.sort("salary","name").show()

+------+--------+------+
|  name|    Dept|salary|
+------+--------+------+
|rashmi|big data|  2000|
|rashmi|     iot|  4000|
|chandu|      DS|  5000|
| krish|     iot|  5000|
| krish|big data|  9000|
|chandu|     iot| 10000|
| krish|      DS| 20000|
+------+--------+------+



In [0]:
#PySpark JOIN is used to combine two DataFrames
emp = [(1,"Smith",-1,"2018","10","M",3000),
       (2, "Rose",1 , "2010", "20","M", 4000),
       (3,"Williams",1,"2010","10","M",1000),
       (4, "Jones",2 ,"2005","10","F",2000),
       (5,"Brown",2,"2010","40","",-1),
       (6, "Brown", 2, "2010","50","",-1)]
empColumns = ["emp_id","name","superior_emp_id","year_joined", "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
#empDF.printSchema()
empDF.show()

dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
deptColumns = ["dept_name","dept_id"]

deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
#deptDF.printSchema()
deptDF.show()


+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [0]:
#INNER JOIN

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") .show()


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
#OUTER JOIN
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show()


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|  NULL|    NULL|           NULL|       NULL|       NULL|  NULL|  NULL|    Sales|     30|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
|     6|   Brown|              2|       2010|         50|      |    -1|     NULL|   NULL|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
#LEFT JOIN
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
|     6|   Brown|              2|       2010|         50|      |    -1|     NULL|   NULL|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
#Right JOIN
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show()


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|  NULL|    NULL|           NULL|       NULL|       NULL|  NULL|  NULL|    Sales|     30|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [0]:
#LEFTSEMI JOIN
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show()


+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     5|   Brown|              2|       2010|         40|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+



In [0]:
#LEFTANTI JOIN
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show()

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     6|Brown|              2|       2010|         50|      |    -1|
+------+-----+---------------+-----------+-----------+------+------+



In [0]:
#SELECTING RENAMING FILTERING DATA'S IN PANDAS
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark - example join').getOrCreate()

data = [(('Ram'), '1991-04-01', 'M', 3000),
        (('Mike'), '2000-05-19', 'M', 4000),
        (('Rohini'), '1978-09-05', 'M', 4000),
        (('Maria'), '1967-12-01', 'F', 4000),
        (('Jenis'), '1980-02-17', 'F', 1200)]
 

columns = ["Name", "DOB", "Gender", "salary"]
 
# Create the spark dataframe
df = spark.createDataFrame(data=data, schema=columns)
df.show()


+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



In [0]:
df.withColumnRenamed("DOB","DateOfBirth").show()

+------+-----------+------+------+
|  Name|DateOfBirth|Gender|salary|
+------+-----------+------+------+
|   Ram| 1991-04-01|     M|  3000|
|  Mike| 2000-05-19|     M|  4000|
|Rohini| 1978-09-05|     M|  4000|
| Maria| 1967-12-01|     F|  4000|
| Jenis| 1980-02-17|     F|  1200|
+------+-----------+------+------+



In [0]:
#RENAME MULTIPLE COLUMNS
df.withColumnRenamed("Gender","Sex").withColumnRenamed("salary","Amount").show()


+------+----------+---+------+
|  Name|       DOB|Sex|Amount|
+------+----------+---+------+
|   Ram|1991-04-01|  M|  3000|
|  Mike|2000-05-19|  M|  4000|
|Rohini|1978-09-05|  M|  4000|
| Maria|1967-12-01|  F|  4000|
| Jenis|1980-02-17|  F|  1200|
+------+----------+---+------+



In [0]:
#using expr
data = df.selectExpr("Name as name","DOB","Gender","salary")
 
data.show()


+------+----------+------+------+
|  name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



In [0]:
#USING SELECT METHOD
from pyspark.sql.functions import col
 
# Select the 'salary' as 'Amount' using aliasing
# Select remaining with their original name
data = df.select(col("Name"),col("DOB"),
                 col("Gender"),
                 col("salary").alias('Amount'))

data.show()

+------+----------+------+------+
|  Name|       DOB|Gender|Amount|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



In [0]:
#USING DF
Data_list = ["Emp Name","Date of Birth",
             " Gender-m/f","Paid salary"]
 
new_df = df.toDF(*Data_list)
new_df.show()


+--------+-------------+-----------+-----------+
|Emp Name|Date of Birth| Gender-m/f|Paid salary|
+--------+-------------+-----------+-----------+
|     Ram|   1991-04-01|          M|       3000|
|    Mike|   2000-05-19|          M|       4000|
|  Rohini|   1978-09-05|          M|       4000|
|   Maria|   1967-12-01|          F|       4000|
|   Jenis|   1980-02-17|          F|       1200|
+--------+-------------+-----------+-----------+



In [0]:
#UNION AND UNION ALL
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = [("James","Sales","NY",90000,34,10000), \
("Michael","Sales","NY",86000,56,20000), \
("Robert","Sales","CA",81000,30,23000), \
("Maria","Finance","CA",90000,24,23000) \
]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
#df.printSchema()
df.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+



In [0]:
simpleData2 = [("James","Sales","NY",90000,34,10000), \
("Maria","Finance","CA",90000,24,23000), \
("Jen","Finance","NY",79000,53,15000), \
("Jeff","Marketing","CA",80000,25,18000), \
("Kumar","Marketing","NY",91000,50,21000) \
]
columns2= ["employee_name","department","state","salary","age","bonus"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
#df2.printSchema()
df2.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [0]:
# union() to merge two DataFrames
unionDF = df.union(df2)
unionDF.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [0]:
# unionAll() to merge two DataFrames
unionAllDF = df.unionAll(df2)
unionAllDF.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [0]:
# Remove duplicates after union() using distinct()
disDF = df.union(df2).distinct()
disDF.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----