#how to read csv file as a spark dataframe..

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 46.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=a852c809176ea5703b9466766525debf046a93f3786a98d59e405bc6317ed16b
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
#Initializing PySpark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# #Spark Config
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('write').getOrCreate()
sqlContext = SQLContext(sc)



In [None]:
sc

In [None]:
df=spark.read.csv("/content/input.csv",inferSchema=True,header=True)
df.show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
+---+-------+---+------+



#read csv with | delimiter...

In [None]:
df=spark.read.options(delimiter="|").csv("/content/input.csv",inferSchema=True,header=True)
df.show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
+---+-------+---+------+



#getting column names

In [None]:
df.columns

['id', 'name', 'sex', 'salary']

#getting datatypes of columns

In [None]:
df.dtypes

[('id', 'int'), ('name', 'string'), ('sex', 'string'), ('salary', 'int')]

#getting complete info of the columns 

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salary: integer (nullable = true)



#select specific columns from dataframe

In [None]:
df.select("id","sex").show()

+---+---+
| id|sex|
+---+---+
|  1|  M|
|  2|  F|
|  3|  F|
|  4|  F|
+---+---+



#select columns based on list provided

In [None]:
col_list=["id","name","salary"]
df.select(col_list).show()

+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|    Tom|   100|
|  2|  Jenny|   150|
|  3| Andrew|    50|
|  4|Michael|   120|
+---+-------+------+



#select all columns from dataframe

In [None]:
df.select(df.columns).show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
+---+-------+---+------+



#create spark session 

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('write').getOrCreate()
df1=spark.read.options(delimiter="|").csv("/content/input1.csv",inferSchema=True,header=True)
df1.show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
|  5|   null|  F|   500|
+---+-------+---+------+



#filter on single record

In [None]:
#filter on single column where sex=Male
from pyspark.sql import functions as f
df1.filter(f.col("sex")=="M").show()

+---+----+---+------+
| id|name|sex|salary|
+---+----+---+------+
|  1| Tom|  M|   100|
+---+----+---+------+



#filter on multiple records

In [None]:
#filter on multiple columns where sex=female&salary >=100
df1.filter((f.col("sex")=="F")&(f.col("salary")>=100)).show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  2|  Jenny|  F|   150|
|  4|Michael|  F|   120|
|  5|   null|  F|   500|
+---+-------+---+------+



#filter records based  on values of  list

In [None]:
col_list=["Tom","Michael"]
df1.filter(f.col("name").isin(col_list)).show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  4|Michael|  F|   120|
+---+-------+---+------+



#filter records based on values not in list

In [None]:
df1.filter(f.col("name").isin(col_list)=="False").show()

+---+------+---+------+
| id|  name|sex|salary|
+---+------+---+------+
|  2| Jenny|  F|   150|
|  3|Andrew|  F|    50|
+---+------+---+------+



#filter records where name is null

In [None]:
df1.filter(f.col("name").isNull()).show()

+---+----+---+------+
| id|name|sex|salary|
+---+----+---+------+
|  5|null|  F|   500|
+---+----+---+------+



#filter records where name is not null

In [None]:
df1.filter(f.col("name").isNotNull()).show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
+---+-------+---+------+



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('write').getOrCreate()
df=spark.read.csv("/content/input2.csv",inferSchema=True,header=True)
df.show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
|  5|   null|  F|   500|
|  6|   Bill|  M|  null|
+---+-------+---+------+



#replae null value with default name as new_user

In [None]:
df.fillna(value="New_user",subset=["name"]).show()

+---+--------+---+------+
| id|    name|sex|salary|
+---+--------+---+------+
|  1|     Tom|  M|   100|
|  2|   Jenny|  F|   150|
|  3|  Andrew|  F|    50|
|  4| Michael|  F|   120|
|  5|New_user|  F|   500|
|  6|    Bill|  M|  null|
+---+--------+---+------+



#replace values in multiple columns

In [None]:
df.fillna({"name":"new_user","salary":250}).show()

+---+--------+---+------+
| id|    name|sex|salary|
+---+--------+---+------+
|  1|     Tom|  M|   100|
|  2|   Jenny|  F|   150|
|  3|  Andrew|  F|    50|
|  4| Michael|  F|   120|
|  5|new_user|  F|   500|
|  6|    Bill|  M|   250|
+---+--------+---+------+



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('write').getOrCreate()
df=spark.read.csv("/content/input3.csv",inferSchema=True,header=True)
df.show()


+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  2|  Jenny|  F|    10|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
|  5|   null|  F|   500|
|  6|   Bill|  M|  null|
+---+-------+---+------+



#drop duplicate records

In [None]:
df.drop_duplicates().show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  2|  Jenny|  F|   150|
|  4|Michael|  F|   120|
|  1|    Tom|  M|   100|
|  6|   Bill|  M|  null|
|  3| Andrew|  F|    50|
|  5|   null|  F|   500|
|  2|  Jenny|  F|    10|
+---+-------+---+------+



#drop duplicate records based on specific columns

In [None]:
df.drop_duplicates(["id","name"]).show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|   100|
|  2|  Jenny|  F|   150|
|  3| Andrew|  F|    50|
|  4|Michael|  F|   120|
|  5|   null|  F|   500|
|  6|   Bill|  M|  null|
+---+-------+---+------+



#update value of a column (multiply salary by10)

In [None]:
df1.withColumn("salary",f.col("salary")*10).show()

+---+-------+---+------+
| id|   name|sex|salary|
+---+-------+---+------+
|  1|    Tom|  M|  1000|
|  2|  Jenny|  F|  1500|
|  3| Andrew|  F|   500|
|  4|Michael|  F|  1200|
|  5|   null|  F|  5000|
+---+-------+---+------+



#adding new column using existing column(uppercase to lowercase)

In [None]:
df1.withColumn("name_upper",f.upper(f.col("name"))).show()

+---+-------+---+------+----------+
| id|   name|sex|salary|name_upper|
+---+-------+---+------+----------+
|  1|    Tom|  M|   100|       TOM|
|  2|  Jenny|  F|   150|     JENNY|
|  3| Andrew|  F|    50|    ANDREW|
|  4|Michael|  F|   120|   MICHAEL|
|  5|   null|  F|   500|      null|
+---+-------+---+------+----------+



In [None]:
df1.withColumn("name_lower",f.lower(f.col("name"))).show()

+---+-------+---+------+----------+
| id|   name|sex|salary|name_lower|
+---+-------+---+------+----------+
|  1|    Tom|  M|   100|       tom|
|  2|  Jenny|  F|   150|     jenny|
|  3| Andrew|  F|    50|    andrew|
|  4|Michael|  F|   120|   michael|
|  5|   null|  F|   500|      null|
+---+-------+---+------+----------+



#adding new column using default value(add a departmnt column as technology)

In [None]:
df1.withColumn("Departmnt",f.lit("Technology")).show()

+---+-------+---+------+----------+
| id|   name|sex|salary| Departmnt|
+---+-------+---+------+----------+
|  1|    Tom|  M|   100|Technology|
|  2|  Jenny|  F|   150|Technology|
|  3| Andrew|  F|    50|Technology|
|  4|Michael|  F|   120|Technology|
|  5|   null|  F|   500|Technology|
+---+-------+---+------+----------+



#change dtype of the column

In [None]:
df1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salary: integer (nullable = true)



here noticed salary must be float but its showing as integer

In [None]:
from pyspark.sql.types import FloatType
df1.withColumn("salary",f.col("salary").cast(FloatType())).printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salary: float (nullable = true)



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('write').getOrCreate()
df4=spark.read.csv("/content/input4.csv",inferSchema=True,header=True)
df4.show()

+---+-------+---+------+----------+
| id|   name|sex|salary|Department|
+---+-------+---+------+----------+
|  1|    Tom|  M|   100|     sales|
|  2|  Jenny|  F|   150|     sales|
|  3| Andrew|  F|    50|     sales|
|  4|Michael|  F|   120|        IT|
|  5|Jasmine|  F|   500|        IT|
|  6|   Bill|  M|   200|        IT|
+---+-------+---+------+----------+



#getting records from dataframe

In [None]:
df4.count()

6

#getting sum of salary from dataframe

In [None]:
df4.select(f.sum("salary").alias ("total_Salary")).show()

+------------+
|total_Salary|
+------------+
|        1120|
+------------+



#getting max &min salary from dataframe

In [None]:
df4.select(f.max("salary").alias ("Max_Salary")).show()
df4.select(f.min("salary").alias ("Min_Salary")).show()

+----------+
|Max_Salary|
+----------+
|       500|
+----------+

+----------+
|Min_Salary|
+----------+
|        50|
+----------+



#getting meansalary from dataframe

In [None]:
df4.select(f.mean("salary").alias ("mean_Salary")).show()

+------------------+
|       mean_Salary|
+------------------+
|186.66666666666666|
+------------------+



#using multiple aggregate functions

In [None]:
df4.select(f.max("salary").alias ("Max_Salary"),f.min("salary").alias ("Min_Salary")).show()

+----------+----------+
|Max_Salary|Min_Salary|
+----------+----------+
|       500|        50|
+----------+----------+



#getting aggregate salary by department

In [None]:
df4.groupby("Department").sum("salary").show()

+----------+-----------+
|Department|sum(salary)|
+----------+-----------+
|     sales|        300|
|        IT|        820|
+----------+-----------+



In [None]:
df4.groupby("Department").max("salary").show()

+----------+-----------+
|Department|max(salary)|
+----------+-----------+
|     sales|        150|
|        IT|        500|
+----------+-----------+



In [None]:
df.groupby("Department").min("salary").show()

+----------+-----------+
|Department|min(salary)|
+----------+-----------+
|     sales|         50|
|        IT|        120|
+----------+-----------+



In [None]:
df.groupby("Department").mean("salary").show()

+----------+-----------------+
|Department|      avg(salary)|
+----------+-----------------+
|     sales|            100.0|
|        IT|273.3333333333333|
+----------+-----------------+



#groupby with multiple aggregate functions

In [None]:
(df.groupby("Department").agg(f.sum("salary").alias("total_salary"),f.max("salary").alias("max_Salary")).show())

+----------+------------+----------+
|Department|total_salary|max_Salary|
+----------+------------+----------+
|     sales|         300|       150|
|        IT|         820|       500|
+----------+------------+----------+



#multiple column groupby 

In [None]:
(df.groupby("Department","sex").agg(f.sum("salary").alias("total_salary"),f.max("salary").alias("max_Salary")).show())

+----------+---+------------+----------+
|Department|sex|total_salary|max_Salary|
+----------+---+------------+----------+
|     sales|  F|         200|       150|
|        IT|  M|         200|       200|
|        IT|  F|         620|       500|
|     sales|  M|         100|       100|
+----------+---+------------+----------+



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('write').getOrCreate()
dfs=spark.read.csv("/content/stars.csv",inferSchema=True,header=True)
dfs.show()

+---+----------+-------------------+-------+---------+
| id|      name|               date|company|     city|
+---+----------+-------------------+-------+---------+
|  1|Tom Cruise|2016-04-01 00:00:00|    tcx|   mumbai|
|  2|     Rajix|2016-04-01 00:00:00|    tcx|   mumbai|
|  3|   Tamanna|2016-04-01 00:00:00|    tcx|   mumbai|
|  3|   Tamanna|2017-12-01 00:00:00|    tcx|bangalore|
|  2|     Rajix|2018-10-01 00:00:00|   infi|     pune|
|  3|   Tamanna|2018-10-01 00:00:00|    tcx|    delhi|
|  2|     Rajix|2018-12-01 00:00:00| lobant|     pune|
|  1|Tom Cruise|2021-01-01 00:00:00| ractal|   mumbai|
+---+----------+-------------------+-------+---------+



#dropduplicates based on id&name(default method)

In [None]:
dfs.drop_duplicates(["id","name"]).show()

+---+----------+-------------------+-------+------+
| id|      name|               date|company|  city|
+---+----------+-------------------+-------+------+
|  1|Tom Cruise|2016-04-01 00:00:00|    tcx|mumbai|
|  2|     Rajix|2016-04-01 00:00:00|    tcx|mumbai|
|  3|   Tamanna|2016-04-01 00:00:00|    tcx|mumbai|
+---+----------+-------------------+-------+------+



default method  will retain the first occurancerecord incase of duplicate records