### Reading csv Data

#### Finding the available files

In [0]:
dbutils.fs.ls('/FileStore/tables')

Out[129]: [FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales.csv', name='BigMart_Sales.csv', size=869537, modificationTime=1741159322000),
 FileInfo(path='dbfs:/FileStore/tables/CSV/', name='CSV/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/drivers.json', name='drivers.json', size=180812, modificationTime=1741160520000),
 FileInfo(path='dbfs:/FileStore/tables/test.csv', name='test.csv', size=136, modificationTime=1741155396000),
 FileInfo(path='dbfs:/FileStore/tables/tips.csv', name='tips.csv', size=8188, modificationTime=1741155915000)]

### First method of reading & creating a df

In [0]:
file_location = 'dbfs:/FileStore/tables/BigMart_Sales.csv'
df = spark.read.csv(file_location, header=True, inferSchema=True)

### Second method of reading & loading the data into a df

In [0]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load("dbfs:/FileStore/tables/BigMart_Sales.csv")

## Reading a JSON File

In [0]:
df_json = spark.read.format('json').option('inferSchema', True)\
                    .option('header', True)\
                    .option('multiline', False)\
                    .load('dbfs:/FileStore/tables/drivers.json')

In [0]:
df_json.show()

### Defining Schema - DDL & Struct Type

In [0]:
df.printSchema()

In [0]:
df.display()

### Method1: defining the schema

In [0]:
my_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

In [0]:
df = spark.read.format('csv')\
                .schema(my_schema)\
                .option('header', True)\
                .load('dbfs:/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

### Second Method: Using Struct Type

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType({
     StructField('Item_Identifier', StringType(), True),
     StructField('Item_Weight',StringType(),True), 
     StructField('Item_Fat_Content',StringType(),True), 
     StructField('Item_Visibility',StringType(),True), 
     StructField('Item_MRP',StringType(),True), 
     StructField('Outlet_Identifier',StringType(),True), 
     StructField('Outlet_Establishment_Year',StringType(),True), 
     StructField('Outlet_Size',StringType(),True), 
     StructField('Outlet_Location_Type',StringType(),True), 
     StructField('Outlet_Type',StringType(),True), 
     StructField('Item_Outlet_Sales',StringType(),True)
 })

In [0]:
df1 = spark.read.format('csv')\
                .schema(my_struct_schema)\
                .option('header', True)\
                .load('dbfs:/FileStore/tables/BigMart_Sales.csv')

In [0]:
df1.printSchema()

## Transformations

In [0]:
df.select('Item_Identifier', 'Item_Weight', 'Item_Fat_Content').show()

In [0]:
df.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content')).show()

### Renaming a Column

In [0]:
df.withColumnRenamed('Item_Identifier', 'Item_ID').display()

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### Filtering Data

#### Scenario - 1 : One Condition

In [0]:
df.filter(col('Item_Fat_Content')== 'Regular').display()

#### Scenario - 2 : Two Condition

In [0]:
df.filter( (col('Item_type')== 'Soft Drinks') & (col('Item_Weight')< 10)).display()

#### Scenario - 3 

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1', 'Tier 2'))).display()

#### Column renaming

In [0]:
df.withColumnRenamed('Item_Weight', 'Item_Wt').display()

#### Creating, Modifying a column Using withColumn

In [0]:
## lit function is used to add a constant value in a column
df = df.withColumn('Flag', lit('new'))

#### Scenario 1

In [0]:
df.withColumn('Mutiply', col('Item_Weight')* col('Item_MRP')).display()

#### Scenario - 2

In [0]:
df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Regular', 'Reg'))\
        .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Low Fat', 'Lf')).display()

### Type Casting

In [0]:
df.withColumn('Item_Weight', col('Item_Weight').cast(StringType())).display()

In [0]:
df.printSchema()

### Sorting/ Order By

In [0]:
## Scenario 1
df.sort(col('Item_weight').desc()).display()

In [0]:
df.sort(col('Item_Visibility').asc()).display()

In [0]:
df.sort(['Item_Weight', 'Item_Visibility'], ascending = [0,0]).display()

In [0]:
df.sort(['Item_Weight', 'Item_Visibility'], ascending = [0,1]).display()

#### Limiting Data

In [0]:
df.limit(5).display()

### Dropping Columsn

#### Dropping a single Column

In [0]:
df.drop(col('Item_Visibilty')).display()

#### Dropping Mutiple Columns

In [0]:
df.drop('Item_Visibilty', 'Item_Identifier').display()

### Dropping Duplicates

In [0]:
df.dropDuplicates().display()

### Scenario 2 

In [0]:
df.drop_duplicates(subset=['Item_Type']).display()

In [0]:
df.distinct().display()

### Union and Union By Name


### Preparing DataFrames

In [0]:

data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()
df2.display()

In [0]:
df1.union(df2).display()

In [0]:
df1.unionByName(df2).display()

## String Functions

In [0]:
df.select(initcap('Item_Type')).display()

In [0]:
df.select(lower('Item_Type').alias('lower_item_type')).display()

In [0]:
df.select(upper('Item_Type').alias('upper_item_type')).display()

## Date Functions - Curren Date, Date_add, Date_sub

In [0]:
df = df.withColumn('Curr_Date', current_date())

In [0]:
df = df.withColumn('Week_After', date_add('Curr_Date', 7))
df.display()

In [0]:
df = df.withColumn('Week_Before', date_sub('Curr_Date', 7))

df.display()

### DateDiff

In [0]:
df = df.withColumn('Date_Diff', datediff('Week_After','Curr_Date'))

df.display()

In [0]:
df.withColumn('Month_Name', date_format('Curr_Date', 'MMM')).display()

In [0]:
df.withColumn('Year_Name', date_format('Curr_Date', 'yyyy')).display()

In [0]:
df.withColumn('New_Date', date_format('Curr_Date', 'dd-MM-yyyy')).display()

## Handling Nulls

In [0]:
df.dropna(how='any').display()

In [0]:

df.dropna(subset=['Outlet_Size']).display()

## Filling the Null Values

In [0]:
df.fillna('Not Available').display()

In [0]:
df.fillna('Not Available', subset = ['Outlet_Size']).display()

## Split and Indexing

In [0]:
df.withColumn('Outlet_type', split('Outlet_Type',' ')).display()

## Indexing

In [0]:
df.withColumn('Outlet_type', split('Outlet_Type',' ')[1]).display()

## Explode

In [0]:
df_exp = df.withColumn('Outlet_type', split('Outlet_Type',' '))

df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type', explode('Outlet_Type')).display()

## Array_Contains

In [0]:
df_exp.display()

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### Group By

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP').alias('Total_Price')).display()

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP').alias('Avg_Price')).sort('Avg_Price', ascending = False).display()

In [0]:
df.groupBy('Item_Type', 'Outlet_Type').agg(sum('Item_MRP').alias('Total_Price'), avg('Item_MRP').alias('Avg_price')).display()

## Collect_List

In [0]:

data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

In [0]:
df_book = spark.createDataFrame(data, schema)

In [0]:
df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

### Pivot

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

## When Otherwise (Similar to Case WHen)

In [0]:
df = df.withColumn('Veg_Flag', when(col('Item_Type')=='Meat', 'Non-Veg').otherwise('Veg'))

### Scenario 2

In [0]:
df.withColumn('Veg_Cost_Flag', when(((col('Veg_Flag')=='Veg') & (col('Item_MRP')<100)), 'Veg_Inexpensive')\
                              .when((col('Veg_Flag')=='Veg') & (col('Item_MRP') > 100), 'Veg_Expensive')\
                                .otherwise('Non-Veg')).display()

### Inner Join

In [0]:

dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)
     

In [0]:
df1.display()
df2.display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'inner').display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'left').display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'right').display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'anti').display()

### Window Functions

#### Row_Number()

In [0]:
from pyspark.sql.window import *

In [0]:
df.withColumn('Row_Num', row_number().over(Window.orderBy('Item_Identifier'))).display()

### Rank & Dense Rank

In [0]:
df.withColumn('col_rank', rank().over(Window.orderBy(col('Item_Identifier').desc())))\
    .withColumn('col_dense_rank', dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

### Cumulative Sum

In [0]:
df.withColumn('Cum_Sum', sum('Item_MRP').over(Window.orderBy('Item_Type')))\
    .display()

In [0]:
df.withColumn('Cum_Sum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
df.withColumn('cum_sum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

## Data Writing

### Writing into a CSV

In [0]:
df.write.format('csv').save('/FileStore/tables/CSV/transformed.csv')

### Writing Modes - Append, OverWrite, Error, Ignore

#### Append


In [0]:
df.write.format('csv')\
    .mode('append')\
    .option('path', '/FileStore/tables/CSV/transformed.csv')\
    .save()

#### OverWrite

In [0]:
df.write.format('csv')\
    .mode('overwrite')\
    .option('path', '/FileStore/tables/CSV/transformed.csv')\
    .save()

#### Ignore

In [0]:
df.write.format('csv')\
    .mode('ignore')\
    .option('path', '/FileStore/tables/CSV/transformed.csv')\
    .save()

### Parquet Format - Stores Data in columnar format

In [0]:
df.write.format('parquet')\
    .mode('overwrite')\
    .option('path', '/FileStore/tables/CSV/transformed.csv')\
    .save()

### Writing as a table

In [0]:

df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')

## Spark SQL

In [0]:
df.createTempView('my_temp_view')

In [0]:
%sql
select * from my_temp_view

In [0]:
df_new = spark.sql("select * from my_temp_view where Item_Fat_Content = 'Low Fat'")

In [0]:
df_new.display()