<a href="https://colab.research.google.com/github/joypodder/pyspark/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark



In [24]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()
sc = spark.sparkContext

In [25]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Creating dummy dataframe

In [None]:
data = [('Joy','ZS','M',46004),
        ('ABC','ZS','M','12345')]

columns = ['Name','Company','Gender','ID']

df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

+----+-------+------+-----+
|Name|Company|Gender|ID   |
+----+-------+------+-----+
|Joy |ZS     |M     |46004|
|ABC |ZS     |M     |12345|
+----+-------+------+-----+



# Importing dataframe from csv

In [None]:
df2 = spark.read.option("header","true").csv("QUERY4.csv")

In [None]:
df2.printSchema()

root
 |-- geo_name: string (nullable = true)
 |-- total_trx_units_sales_district: string (nullable = true)



In [None]:
df2.show()

+-----------+------------------------------+
|   geo_name|total_trx_units_sales_district|
+-----------+------------------------------+
| District 1|                         86509|
| District 1|                         86509|
|District 21|                         68442|
|District 31|                         68270|
|District 16|                         68242|
|District 13|                         68231|
|District 30|                         68230|
| District 6|                         68214|
|District 10|                         68214|
|District 26|                         68190|
| District 3|                         68185|
+-----------+------------------------------+



In [None]:
df2 = df2.withColumnRenamed("total_trx_units_sales_district","sales")


In [None]:
df2 = df2.withColumn('ID',df2['sales'].cast('int'))

Creating temporary view from a df

In [None]:
df2.createOrReplaceTempView("Temp_table")

In [None]:
querydf = spark.sql("SELECT sum(sales) from temp_table group by geo_name")

In [None]:
querydf.show()

+----------+
|sum(sales)|
+----------+
|   68214.0|
|   68190.0|
|   68231.0|
|   68442.0|
|   68270.0|
|   68242.0|
|   68185.0|
|   68214.0|
|  173018.0|
|   68230.0|
+----------+



# RDD

In [None]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5])

In [None]:
print(rdd.collect())

[1, 2, 3, 4, 5]


In [None]:
dataseq = [("Java",20000),("Python", 10000),("Scala",30000)]
rdd = sc.parallelize(dataseq)

In [None]:
print(rdd.collect())

[('Java', 20000), ('Python', 10000), ('Scala', 30000)]


In [None]:
rdd_csv = sc.textFile("QUERY4.csv")

In [None]:
print(rdd_csv.collect())

['geo_name,total_trx_units_sales_district', 'District 1,86509', 'District 1,86509', 'District 21,68442', 'District 31,68270', 'District 16,68242', 'District 13,68231', 'District 30,68230', 'District 6,68214', 'District 10,68214', 'District 26,68190', 'District 3,68185']


In [None]:
rdd_csv = rdd_csv.map(lambda x: x.split(","))

In [None]:
for element in rdd_csv.collect():
    print( element[0],"\t",element[1])

geo_name 	 total_trx_units_sales_district
District 1 	 86509
District 1 	 86509
District 21 	 68442
District 31 	 68270
District 16 	 68242
District 13 	 68231
District 30 	 68230
District 6 	 68214
District 10 	 68214
District 26 	 68190
District 3 	 68185


# Pyspark tutorial

In [26]:
df = spark.read.format('csv').option('header',True).option('InferSchema',True).load('/content/drive/MyDrive/Pyspark/BigMart Sales.csv')

In [None]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

## Select transformation

In [None]:
df.select(col('Item_Identifier'),col('Item_Weight')).show()

+---------------+-----------+
|Item_Identifier|Item_Weight|
+---------------+-----------+
|          FDA15|        9.3|
|          DRC01|       5.92|
|          FDN15|       17.5|
|          FDX07|       19.2|
|          NCD19|       8.93|
|          FDP36|     10.395|
|          FDO10|      13.65|
|          FDP10|       NULL|
|          FDH17|       16.2|
|          FDU28|       19.2|
|          FDY07|       11.8|
|          FDA03|       18.5|
|          FDX32|       15.1|
|          FDS46|       17.6|
|          FDF32|      16.35|
|          FDP49|          9|
|          NCB42|       11.8|
|          FDP49|          9|
|          DRI11|       NULL|
|          FDU02|      13.35|
+---------------+-----------+
only showing top 20 rows



## Alias

In [None]:
df.select(col('Item_Identifier').alias('ID'),col('Item_Weight').alias('Weight')).show()

+-----+------+
|   ID|Weight|
+-----+------+
|FDA15|   9.3|
|DRC01|  5.92|
|FDN15|  17.5|
|FDX07|  19.2|
|NCD19|  8.93|
|FDP36|10.395|
|FDO10| 13.65|
|FDP10|  NULL|
|FDH17|  16.2|
|FDU28|  19.2|
|FDY07|  11.8|
|FDA03|  18.5|
|FDX32|  15.1|
|FDS46|  17.6|
|FDF32| 16.35|
|FDP49|     9|
|NCB42|  11.8|
|FDP49|     9|
|DRI11|  NULL|
|FDU02| 13.35|
+-----+------+
only showing top 20 rows



## Filter

In [None]:

#df.filter(col('Item_Fat_Content')=='Regular').show()
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight') < 10)).show()

+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|  Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|          DRC01|       5.92|         Regular|    0.019278216|Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228|2025-01-28|2025-02-07|      10|
|          DRZ11|       8.85|         Regular|    0.113123893|Soft Drinks|122.5388|     

In [None]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDH17|       16.2|         Regular|    0.016687114|        Frozen Foods| 96.9726|           OUT045|                     2002|       NULL|              Tier 2|Supermarket Type1|        1076.5986|
|          FDU28|       19.2|         Regular|     0.09444959|        Frozen Foods|187.8214|           OUT017|                     2007|       NULL|              Tier 2|Superma

## With column rename

In [None]:
df.withColumnRenamed('Item_Weight','Wt').show()

+---------------+------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|    Wt|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|   9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|  5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4

## with column

### new column

In [None]:
df = df.withColumn('Flag',lit(1))

In [None]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|   1|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|        

In [None]:
df.withColumn('Multiply',col('Item_Weight')*col('Item_MRP')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+------------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|          Multiply|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+------------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|   1|2323.2255600000003|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drin

### modify existing column

In [None]:
df.withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Low Fat','LF'))\
      .withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Regular','Reg')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|              LF|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|   1|
|          DRC01|       5.92|             Reg|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|        

## typecasting

In [None]:
df.withColumn('Item_Weight',col('Item_Weight').cast(StringType())).printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)
 |-- Flag: integer (nullable = false)



## Sort/OrderBy

In [None]:
df.sort(col('Item_Weight').desc()).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDR13|      9.895|         Regular|    0.028696932|      Canned|117.0492|           OUT013|                     1987|       High|              Tier 3|Supermarket Type1|         810.9444|   1|
|          DRD49|      9.895|         Low Fat|    0.167799329| Soft Drinks|239.4564|           OUT035|                     2004|      Small|              Tier 2|Supermarket Type1|        5

In [None]:
df.sort(col('Item_Visibility').asc()).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          DRB48|      16.75|         Regular|              0|         Soft Drinks| 39.3822|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|         353.5398|   1|
|          FDS32|      17.75|         Regular|              0|Fruits and Vegeta...|139.9838|           OUT045|                     2002|       NULL|        

In [None]:
#df.sort([col('Item_Visibility').asc(),col('Item_MRP').desc()]).show()
df.sort(['Item_Visibility','Item_MRP'],ascending=[1,0]).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDU35|       6.44|         low fat|              0|              Breads|   99.87|           OUT045|                     2002|       NULL|              Tier 2|Supermarket Type1|          1498.05|   1|
|          NCH29|       5.51|         Low Fat|              0|  Health and Hygiene| 98.9726|           OUT018|                     2009|     Medium|        

## Limit

In [None]:
df.limit(10).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|   1|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|        

## Drop

In [None]:
df.drop('Item_Visibility','Item_Visibility').show()

+---------------+-----------+----------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          FDA15|        9.3|         Low Fat|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|   1|
|          DRC01|       5.92|         Regular|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228|   1|
|          FDN15|       1

## Drop duplicates

In [None]:
# de-dup
df.dropDuplicates().show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          NCB06|       17.6|         Low Fat|    0.082316506|  Health and Hygiene| 160.692|           OUT035|                     2004|      Small|              Tier 2|Supermarket Type1|          1597.92|   1|
|          NCR05|       10.1|         Low Fat|    0.054630834|  Health and Hygiene|200.2084|           OUT046|                     1997|      Small|        

In [None]:
df.dropDuplicates(['Item_Identifier']).show()

+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|  Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Flag|
+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          DRA12|       11.6|         Low Fat|    0.041177505|Soft Drinks|140.3154|           OUT017|                     2007|       NULL|              Tier 2|Supermarket Type1|        2552.6772|   1|
|          DRA24|      19.35|         Regular|    0.040154087|Soft Drinks|164.6868|           OUT017|                     2007|       NULL|              Tier 2|Supermarket Type1|        1146.5

## Union

In [None]:
data1 = [('a','1'),
         ('b',2)]
data2 = [('c','1'),
         ('d','2')]
schema = 'id1 STRING,id2 STRING'
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)

In [None]:
df1.union(df2).show()

+---+---+
|id1|id2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  1|
|  d|  2|
+---+---+



## Union by name

In [None]:
data1 = [('a',1),('b',2)]
schema1 = 'id1 STRING, id2 INT'


data2 = [(3,'c'),(4,'d')]
schema2 = 'id2 INT, id1 STRING'

df1 = spark.createDataFrame(data=data1,schema=schema1)
df2 = spark.createDataFrame(data=data2,schema=schema2)

In [None]:
print("DF1")
df1.show()

print("DF2")
df2.show()

DF1
+---+---+
|id1|id2|
+---+---+
|  a|  1|
|  b|  2|
+---+---+

DF2
+---+---+
|id2|id1|
+---+---+
|  3|  c|
|  4|  d|
+---+---+



In [None]:
df1.unionByName(df2).show()

+---+---+
|id1|id2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
|  d|  4|
+---+---+



## String functions

In [None]:
df.select(lower(col('Item_Type').alias('String fns'))).show()

+--------------------------------+
|lower(Item_Type AS `String fns`)|
+--------------------------------+
|                           dairy|
|                     soft drinks|
|                            meat|
|            fruits and vegeta...|
|                       household|
|                    baking goods|
|                     snack foods|
|                     snack foods|
|                    frozen foods|
|                    frozen foods|
|            fruits and vegeta...|
|                           dairy|
|            fruits and vegeta...|
|                     snack foods|
|            fruits and vegeta...|
|                       breakfast|
|              health and hygiene|
|                       breakfast|
|                     hard drinks|
|                           dairy|
+--------------------------------+
only showing top 20 rows



## Date functions

### current_date()

In [None]:
df = df.withColumn('currdate',current_date())

In [None]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-01-28|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2

### date_add()

In [None]:
df  = df.withColumn('newdate',date_add(col('currdate'),10))

In [None]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-01-28|2025-02-07|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2

### Datediff

In [None]:
df = df.withColumn('datediff',datediff(col('newdate'),col('currdate')))

In [None]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-01-28|2025-02-07|      10|
|          DRC01|       5.92|         Regular|    0.

### date format functions

In [None]:
df.withColumn('currdate',date_format(col('currdate'),'dd-MM-yyyy')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|28-01-2025|2025-02-07|      10|
|          DRC01|       5.92|         Regular|    0.

## Handling null values

### dropping nulls

In [None]:
df.count()

8523

In [None]:
## Dropping rows with any missing value
df.dropna('any').count()

4650

In [None]:
## Dropping rows in which all values are missing
df.dropna('all').count()

8523

In [None]:
## Dropping rows which have null values in a certain column
df.dropna(subset = ['Item_Weight']).count()

7060

### Filling Null values


In [None]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)
 |-- currdate: date (nullable = false)
 |-- newdate: date (nullable = false)
 |-- datediff: integer (nullable = false)



In [None]:
df.fillna('not available').show()

+---------------+-------------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-------------+--------------------+-----------------+-----------------+----------+----------+--------+
|Item_Identifier|  Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|  Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-------------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-------------+--------------------+-----------------+-----------------+----------+----------+--------+
|          FDA15|          9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|       Medium|              Tier 1|Supermarket Type1|         3735.138|2025-01-28|2025-02-07|      10|
|          DRC01|         5.92|     

In [None]:
df.fillna('not available',subset=['Item_Fat_Content']).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|2025-01-28|2025-02-07|      10|
|          DRC01|       5.92|         Regular|    0.

## Split and indexing

In [None]:
## Splitting column based on delimiter

df.withColumn('Outlet_Type',split(col('Outlet_Type'),' ')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|         Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|[Supermarket, Type1]|         3735.138|2025-01-28|2025-02-07|      10|
|          DRC01|       5.92|         Re

In [None]:
## Indexing

df.withColumn('Outlet_Type',split(col('Outlet_Type'),' ')[1]).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|      Type1|         3735.138|2025-01-28|2025-02-07|      10|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft 

## Explode function

In [None]:
df_exp = df.withColumn('Outlet_Type',split(col('Outlet_Type'),' '))

In [None]:
df_exp.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|         Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|[Supermarket, Type1]|         3735.138|2025-01-28|2025-02-07|      10|
|          DRC01|       5.92|         Re

In [None]:
df_exp.withColumn('Outlet_Type',explode(col('Outlet_Type'))).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|  currdate|   newdate|datediff|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+----------+----------+--------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket|         3735.138|2025-01-28|2025-02-07|      10|
|          FDA15|        9.3|         Low Fat|    0.016047301|              

## Array Contains

In [None]:
df_split = df.withColumn('Outlet_Type',split(col('Outlet_Type'),' '))

In [None]:
df_split.withColumn('Type1present',array_contains(col('Outlet_Type'),'Type1')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|         Outlet_Type|Item_Outlet_Sales|Type1present|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+--------------------+-----------------+------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|[Supermarket, Type1]|         3735.138|        true|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|  

## Group by

In [22]:
df.groupby(col('Item_Type')).agg(sum('Item_MRP')).show()

+--------------------+------------------+
|           Item_Type|     sum(Item_MRP)|
+--------------------+------------------+
|       Starchy Foods|21880.027399999995|
|        Baking Goods| 81894.73640000001|
|              Breads| 35379.11979999999|
|Fruits and Vegeta...|178124.08099999998|
|                Meat|59449.863799999956|
|         Hard Drinks|29334.676599999995|
|         Soft Drinks|58514.164999999964|
|           Household|135976.52539999998|
|           Breakfast|        15596.6966|
|               Dairy|101276.45959999996|
|         Snack Foods|175433.92040000003|
|              Others|22451.891600000006|
|             Seafood| 9077.870000000003|
|              Canned|  90706.7269999999|
|        Frozen Foods|118558.88140000001|
|  Health and Hygiene|        68025.8388|
+--------------------+------------------+



In [None]:
df.groupby(col('Item_Type')).agg(sum('Item_MRP')).sort(col('sum(Item_MRP)').desc()).show()

+--------------------+------------------+
|           Item_Type|     sum(Item_MRP)|
+--------------------+------------------+
|Fruits and Vegeta...|178124.08099999998|
|         Snack Foods|175433.92040000003|
|           Household|135976.52539999998|
|        Frozen Foods|118558.88140000001|
|               Dairy|101276.45959999996|
|              Canned|  90706.7269999999|
|        Baking Goods| 81894.73640000001|
|  Health and Hygiene|        68025.8388|
|                Meat|59449.863799999956|
|         Soft Drinks|58514.164999999964|
|              Breads| 35379.11979999999|
|         Hard Drinks|29334.676599999995|
|              Others|22451.891600000006|
|       Starchy Foods|21880.027399999995|
|           Breakfast|        15596.6966|
|             Seafood| 9077.870000000003|
+--------------------+------------------+



## Collect list

In [4]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book1'),
        ('user2','book3'),
        ('user3','book2'),
        ('user3','book1')]

schema = 'user STRING, book STRING'

df_coll = spark.createDataFrame(data=data,schema=schema)

In [6]:
df_coll.groupby(col('user')).agg(collect_list(col('book'))).show()

+-----+------------------+
| user|collect_list(book)|
+-----+------------------+
|user1|    [book1, book2]|
|user3|    [book2, book1]|
|user2|    [book1, book3]|
+-----+------------------+



## Pivot

In [18]:
df_pivot =  df.select(col('Item_Type'),col('Item_MRP').cast(DoubleType()),col('Outlet_Size'))

In [19]:
df_pivot.printSchema()

root
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Size: string (nullable = true)



In [20]:
df_pivot.show()

+--------------------+--------+-----------+
|           Item_Type|Item_MRP|Outlet_Size|
+--------------------+--------+-----------+
|               Dairy|249.8092|     Medium|
|         Soft Drinks| 48.2692|     Medium|
|                Meat| 141.618|     Medium|
|Fruits and Vegeta...| 182.095|       NULL|
|           Household| 53.8614|       High|
|        Baking Goods| 51.4008|     Medium|
|         Snack Foods| 57.6588|       High|
|         Snack Foods|107.7622|     Medium|
|        Frozen Foods| 96.9726|       NULL|
|        Frozen Foods|187.8214|       NULL|
|Fruits and Vegeta...| 45.5402|     Medium|
|               Dairy|144.1102|      Small|
|Fruits and Vegeta...|145.4786|     Medium|
|         Snack Foods|119.6782|      Small|
|Fruits and Vegeta...|196.4426|       High|
|           Breakfast| 56.3614|      Small|
|  Health and Hygiene|115.3492|     Medium|
|           Breakfast| 54.3614|     Medium|
|         Hard Drinks|113.2834|     Medium|
|               Dairy|230.5352| 

In [25]:
df_pivot.groupby(col('Item_Type')).pivot('Outlet_Size').agg(sum(col('Item_MRP'))).show()

+--------------------+------------------+------------------+------------------+------------------+
|           Item_Type|              null|              High|            Medium|             Small|
+--------------------+------------------+------------------+------------------+------------------+
|       Starchy Foods|         6040.6402|3004.9844000000003| 7124.136199999997| 5710.266599999999|
|              Breads|        10011.5004|          3343.974|        11691.4662|10332.179200000002|
|        Baking Goods|23433.838799999994| 9431.749199999998|25614.249399999997| 23414.89900000001|
|Fruits and Vegeta...|49758.730999999985| 20671.34759999999|59047.217200000014|48646.785199999984|
|                Meat|16158.166000000005| 5627.036400000002| 20326.45059999999|        17338.2108|
|         Hard Drinks| 8869.577199999998|3264.3329999999996|10712.827199999996|         6487.9392|
|         Soft Drinks|17745.318000000003| 6456.165199999999|17572.946399999997|16739.735400000005|
|         

## When otherwise

In [38]:
df = df.withColumn('veg_nonveg',when(col('Item_Type')=='Meat','Nonveg').otherwise('Veg'))

In [33]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|veg_nonveg|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|       Veg|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2

### Multiple conditions

In [40]:
df.withColumn('veg_nonveg',when((col('veg_nonveg')=='Veg') & (col('Item_MRP') > 100),'ExpensiveVeg')\
              .when((col('veg_nonveg')=='Veg') & (col('Item_MRP') <= 100),'CheapVeg')\
              .otherwise('NonVeg')).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|  veg_nonveg|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|ExpensiveVeg|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|              

## Joins

In [30]:
## left right inner same as sql
## syntax
## df_left.join(df_right,df_left['key'] == df_right['key'],'type of join').show()
## type of join left,right,inner

## Anti Join
## returns records of df_left which are not available in df_right


## Window functions

In [9]:
from pyspark.sql.window import Window

### RowNumber()

In [19]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier')))\
.select('Item_Identifier','rowCol').show()

+---------------+------+
|Item_Identifier|rowCol|
+---------------+------+
|          DRA12|     1|
|          DRA12|     2|
|          DRA12|     3|
|          DRA12|     4|
|          DRA12|     5|
|          DRA12|     6|
|          DRA24|     7|
|          DRA24|     8|
|          DRA24|     9|
|          DRA24|    10|
|          DRA24|    11|
|          DRA24|    12|
|          DRA24|    13|
|          DRA59|    14|
|          DRA59|    15|
|          DRA59|    16|
|          DRA59|    17|
|          DRA59|    18|
|          DRA59|    19|
|          DRA59|    20|
+---------------+------+
only showing top 20 rows



### Rank/Dense Rank

In [18]:
df.withColumn('rank',rank().over(Window.orderBy('Item_Identifier')))\
  .withColumn('dense_rank',dense_rank().over(Window.orderBy('Item_Identifier')))\
  .select('Item_Identifier','rank','dense_rank').show()


+---------------+----+----------+
|Item_Identifier|rank|dense_rank|
+---------------+----+----------+
|          DRA12|   1|         1|
|          DRA12|   1|         1|
|          DRA12|   1|         1|
|          DRA12|   1|         1|
|          DRA12|   1|         1|
|          DRA12|   1|         1|
|          DRA24|   7|         2|
|          DRA24|   7|         2|
|          DRA24|   7|         2|
|          DRA24|   7|         2|
|          DRA24|   7|         2|
|          DRA24|   7|         2|
|          DRA24|   7|         2|
|          DRA59|  14|         3|
|          DRA59|  14|         3|
|          DRA59|  14|         3|
|          DRA59|  14|         3|
|          DRA59|  14|         3|
|          DRA59|  14|         3|
|          DRA59|  14|         3|
+---------------+----+----------+
only showing top 20 rows



### Cumulative sum

In [21]:
df.withColumn('total_sum',sum(col('Item_MRP')).over(Window.orderBy('Item_Identifier')))\
  .select('Item_Identifier','Item_MRP','total_sum').show()

+---------------+--------+------------------+
|Item_Identifier|Item_MRP|         total_sum|
+---------------+--------+------------------+
|          DRA12|140.3154|          851.1924|
|          DRA12|141.6154|          851.1924|
|          DRA12|142.3154|          851.1924|
|          DRA12|141.9154|          851.1924|
|          DRA12|142.0154|          851.1924|
|          DRA12|143.0154|          851.1924|
|          DRA24|164.6868|1999.8000000000002|
|          DRA24|163.2868|1999.8000000000002|
|          DRA24|163.8868|1999.8000000000002|
|          DRA24|165.7868|1999.8000000000002|
|          DRA24|163.3868|1999.8000000000002|
|          DRA24|165.0868|1999.8000000000002|
|          DRA24|162.4868|1999.8000000000002|
|          DRA59|184.8924|         3481.2392|
|          DRA59|183.6924|         3481.2392|
|          DRA59|185.9924|         3481.2392|
|          DRA59|183.2924|         3481.2392|
|          DRA59|186.6924|         3481.2392|
|          DRA59|186.2924|        

In [23]:
df.withColumn('Cumulative',sum(col('Item_MRP')).over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow)))\
  .select('Item_Identifier','Item_MRP','Cumulative').show()

+---------------+--------+------------------+
|Item_Identifier|Item_MRP|        Cumulative|
+---------------+--------+------------------+
|          DRA12|140.3154|          140.3154|
|          DRA12|141.6154|          281.9308|
|          DRA12|142.3154|          424.2462|
|          DRA12|141.9154|          566.1616|
|          DRA12|142.0154|           708.177|
|          DRA12|143.0154|          851.1924|
|          DRA24|164.6868|1015.8792000000001|
|          DRA24|163.2868|1179.1660000000002|
|          DRA24|163.8868|1343.0528000000002|
|          DRA24|165.7868|1508.8396000000002|
|          DRA24|163.3868|1672.2264000000002|
|          DRA24|165.0868|1837.3132000000003|
|          DRA24|162.4868|1999.8000000000002|
|          DRA59|184.8924|2184.6924000000004|
|          DRA59|183.6924|2368.3848000000003|
|          DRA59|185.9924|2554.3772000000004|
|          DRA59|183.2924|         2737.6696|
|          DRA59|186.6924|          2924.362|
|          DRA59|186.2924|        

## Spark SQL

In [27]:
df.createOrReplaceTempView('temp_table')

In [28]:
## can be extended for any sql query
result_df = spark.sql('SELECT * FROM temp_table')

In [29]:
result_df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma