### Setting up pyspark

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('pyspark_example_03').getOrCreate()

data = [('James','Smith','M',3000),
  ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()


if 'salary1' not in df.columns:
    print("aa")

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+

aa


### adding a new column

In [3]:
from pyspark.sql.functions import lit
df.withColumn('bonus_percent', lit(0.3)).show()

+---------+--------+------+------+-------------+
|firstname|lastname|gender|salary|bonus_percent|
+---------+--------+------+------+-------------+
|    James|   Smith|     M|  3000|          0.3|
|     Anna|    Rose|     F|  4100|          0.3|
|   Robert|Williams|     M|  6200|          0.3|
+---------+--------+------+------+-------------+



### updating an existing column

In [8]:
df.withColumn('bonus_percent', lit(df.salary*4)).show()

+---------+--------+------+------+-------------+
|firstname|lastname|gender|salary|bonus_percent|
+---------+--------+------+------+-------------+
|    James|   Smith|     M|  3000|        12000|
|     Anna|    Rose|     F|  4100|        16400|
|   Robert|Williams|     M|  6200|        24800|
+---------+--------+------+------+-------------+



### Concatenating two columns

In [13]:
from pyspark.sql.functions import concat_ws
df.withColumn('Full_Name', concat_ws(' ','firstname','lastname')).show()

+---------+--------+------+------+---------------+
|firstname|lastname|gender|salary|      Full_Name|
+---------+--------+------+------+---------------+
|    James|   Smith|     M|  3000|    James Smith|
|     Anna|    Rose|     F|  4100|      Anna Rose|
|   Robert|Williams|     M|  6200|Robert Williams|
+---------+--------+------+------+---------------+



### Conditional adding of columns

In [14]:
from pyspark.sql.functions import when
df.withColumn("grade", when((df.salary < 4000), lit("A")).when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
     .otherwise(lit("C"))).show()

+---------+--------+------+------+-----+
|firstname|lastname|gender|salary|grade|
+---------+--------+------+------+-----+
|    James|   Smith|     M|  3000|    A|
|     Anna|    Rose|     F|  4100|    B|
|   Robert|Williams|     M|  6200|    C|
+---------+--------+------+------+-----+



### pyspark sql queries output

In [17]:
from pyspark.sql.functions import current_date
# Add column using select
df.select("firstname","salary", lit(0.3).alias("bonus")).show()

+---------+------+-----+
|firstname|salary|bonus|
+---------+------+-----+
|    James|  3000|  0.3|
|     Anna|  4100|  0.3|
|   Robert|  6200|  0.3|
+---------+------+-----+



In [18]:
df.select("firstname","salary", lit(df.salary * 0.3).alias("bonus_amount")).show()

+---------+------+------------+
|firstname|salary|bonus_amount|
+---------+------+------------+
|    James|  3000|       900.0|
|     Anna|  4100|      1230.0|
|   Robert|  6200|      1860.0|
+---------+------+------------+



In [19]:
df.select("firstname","salary", current_date().alias("today_date")).show()

+---------+------+----------+
|firstname|salary|today_date|
+---------+------+----------+
|    James|  3000|2024-02-06|
|     Anna|  4100|2024-02-06|
|   Robert|  6200|2024-02-06|
+---------+------+----------+



In [20]:
#Add columns using SQL
df.createOrReplaceTempView("PER")
spark.sql("select firstname,salary, '0.3' as bonus from PER").show()

+---------+------+-----+
|firstname|salary|bonus|
+---------+------+-----+
|    James|  3000|  0.3|
|     Anna|  4100|  0.3|
|   Robert|  6200|  0.3|
+---------+------+-----+



In [21]:
spark.sql("select firstname,salary, salary * 0.3 as bonus_amount from PER").show()

+---------+------+------------+
|firstname|salary|bonus_amount|
+---------+------+------------+
|    James|  3000|       900.0|
|     Anna|  4100|      1230.0|
|   Robert|  6200|      1860.0|
+---------+------+------------+



In [22]:
spark.sql("select firstname,salary, current_date() as today_date from PER").show()

+---------+------+----------+
|firstname|salary|today_date|
+---------+------+----------+
|    James|  3000|2024-02-06|
|     Anna|  4100|2024-02-06|
|   Robert|  6200|2024-02-06|
+---------+------+----------+



In [23]:
spark.sql("select firstname,salary, " +
          "case salary when salary < 4000 then 'A' "+
          "else 'B' END as grade from PER").show()

+---------+------+-----+
|firstname|salary|grade|
+---------+------+-----+
|    James|  3000|    B|
|     Anna|  4100|    B|
|   Robert|  6200|    B|
+---------+------+-----+



### Renaming a RDD Column

In [24]:
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+



In [25]:
df.withColumnRenamed('salary','Monthly_Salary').show()

+---------+--------+------+--------------+
|firstname|lastname|gender|Monthly_Salary|
+---------+--------+------+--------------+
|    James|   Smith|     M|          3000|
|     Anna|    Rose|     F|          4100|
|   Robert|Williams|     M|          6200|
+---------+--------+------+--------------+



### Dropping a column using df.drop(column_name).

In [30]:
# dropping columns
#drop ("column_name")
remo_cols = df.drop("firstname") # firstname is dropped
remo_cols.show()

+--------+------+------+
|lastname|gender|salary|
+--------+------+------+
|   Smith|     M|  3000|
|    Rose|     F|  4100|
|Williams|     M|  6200|
+--------+------+------+



In [6]:
temp_list = [('Aman',"1000","Permanant"),('Raj',"5000","Temp"),('Ketan',"3000","Permanant"),(None, None, None),(None,'4000','Temp')]
columns = ['Name', 'Monthly_Salary','Employee_Type']
temp_rdd = spark.createDataFrame(data=temp_list, schema=columns)
temp_rdd.show()

+-----+--------------+-------------+
| Name|Monthly_Salary|Employee_Type|
+-----+--------------+-------------+
| Aman|          1000|    Permanant|
|  Raj|          5000|         Temp|
|Ketan|          3000|    Permanant|
| NULL|          NULL|         NULL|
| NULL|          4000|         Temp|
+-----+--------------+-------------+



In [7]:
drop_col2 = temp_rdd.na.drop(how="all")
drop_col2.show()

+-----+--------------+-------------+
| Name|Monthly_Salary|Employee_Type|
+-----+--------------+-------------+
| Aman|          1000|    Permanant|
|  Raj|          5000|         Temp|
|Ketan|          3000|    Permanant|
| NULL|          4000|         Temp|
+-----+--------------+-------------+



In [8]:
drop_col2 = temp_rdd.na.drop(how="any")
drop_col2.show()

+-----+--------------+-------------+
| Name|Monthly_Salary|Employee_Type|
+-----+--------------+-------------+
| Aman|          1000|    Permanant|
|  Raj|          5000|         Temp|
|Ketan|          3000|    Permanant|
+-----+--------------+-------------+



In [9]:
# in subset you can mention which column to check the null values.
m1_drop_col = temp_rdd.na.drop(how="any", subset=["Employee_Type"])
m1_drop_col.show()

+-----+--------------+-------------+
| Name|Monthly_Salary|Employee_Type|
+-----+--------------+-------------+
| Aman|          1000|    Permanant|
|  Raj|          5000|         Temp|
|Ketan|          3000|    Permanant|
| NULL|          4000|         Temp|
+-----+--------------+-------------+



In [11]:
# filling missing values
data1 = temp_rdd.na.fill('missing',['Name'])
data1.show()

+-------+--------------+-------------+
|   Name|Monthly_Salary|Employee_Type|
+-------+--------------+-------------+
|   Aman|          1000|    Permanant|
|    Raj|          5000|         Temp|
|  Ketan|          3000|    Permanant|
|missing|          NULL|         NULL|
|missing|          4000|         Temp|
+-------+--------------+-------------+

