### Data Reading

## Load data

1. CSV File

In [None]:
path_csv = '/Volumes/workspace/default/csvfile/BigMart Sales.csv'

In [None]:
df_csv = spark.read.format('csv')\
    .option('header',True)\
        .option('inferSchema',True)\
            .load(path_csv)

In [None]:
df_csv.display()

2. JSON file

In [None]:
path_json = '/Volumes/workspace/learning/jsonfile/drivers.json'

In [None]:
df_json = spark.read.format('json')\
    .option('header',True)\
        .option('multiline',False)\
            .load(path_json)

In [None]:
df_json.display()

### SCHEMA - DDL and StructType()


###Schema Defination

--> To Use for Change the data type

1. DDL Schema

In [None]:
df_csv.printSchema()

In [None]:
schema_csv = '''
Item_Identifier string,
Item_Weight string,
Item_Fat_Content string,
Item_Visibility double,
Item_Type string,
Item_MRP double,
Outlet_Identifier string,
Outlet_Establishment_Year integer,
Outlet_Size string,
Outlet_Location_Type string,
Outlet_Type string,
Item_Outlet_Sales double
'''

In [None]:
df = spark.read.format('csv')\
    .schema(schema_csv)\
        .option('header',True)\
            .load(path_csv)

In [None]:
df.display()

2. StructType() Schema

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [None]:
schema_struct = StructType([
    StructField('Item_Identifier',StringType(),True),
    StructField('Item_Weight',StringType(),True),
    StructField('Item_Fat_Content',StringType(),True),
    StructField('Item_Visibility',StringType(),True),
    StructField('Item_Type',StringType(),True),
    StructField('Item_MRP',DoubleType(),True),
    StructField('Outlet_Identifier',StringType(),True),
    StructField('Outlet_Establishment_Year',IntegerType(),True),
    StructField('Outlet_Size',StringType(),True),
    StructField('Outlet_Location_Type',StringType(),True),
    StructField('Outlet_Type',StringType(),True),
    StructField('Item_Outlet_Sales',DoubleType(),True)

])

In [None]:
df= spark.read.format('csv')\
    .schema(schema_struct)\
        .option('header',True)\
            .option('inferSchema',True)\
                .load(path_csv)

In [None]:
df.display()

### Data Transormation

### SELECT

Method 1

In [None]:
df_csv.select('Item_Identifier','Item_Weight','Item_Fat_Content').display()

Method 2 --> By using Col()

In [None]:
df_csv.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

### ALIAS

--> Use for rename the column name


In [None]:
df_csv.select(col('Item_Identifier').alias('Item_ID')).display()

FILTER / WHERE

CASE 1: Filter the data with fat content = Regular

In [None]:
df_csv.printSchema()

In [None]:
df_csv.filter(col('Item_Fat_Content') == 'Regular').display()

CASE 2: Slice the data with item type = Soft Drinks and weight < 10

In [None]:
df_csv.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10)).display()

CASE 3: Feth the data Tier in (Tier1 or Tier2) and Outlet Size is Null

In [None]:
df_csv.filter((col('Outlet_Size').isNull())& col('Outlet_Location_Type').isin('Tier 1','Tier 2')).display()

### withColumnRenamed

--> use to rename column in data frame level

In [None]:
df_csv.withColumnRenamed('Item_Weight','Item_Wt').display()

###withColumn

Case 1 : Createa new column

In [None]:
df= df_csv.withColumn('Flag',lit("New"))

In [None]:
df.display()

In [None]:
df = df.withColumn('multiply',col('Item_Weight')*col('Item_MRP'))

In [None]:
df.display()

Case 2 : Modify the existing one

In [None]:
df_csv.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular",'Reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat",'LF'))\
        .display()

### Type Casting

In [None]:
df = df_csv.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [None]:
df.display()

In [None]:
df.printSchema()

### Sort/ orderBy

Case 1: sort data by descending order

In [None]:
df.sort(col('Item_weight').desc()).display()

case 2 : Sort by Assending

In [None]:
df.sort(col('Item_Visibility').asc()).display()

Case 3: sorting based on multiple columns

In [None]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[1,0]).display()

### Limit

--> use for specific data to display

In [None]:
df.limit(10).display()

### DROP

case 1: Drop 1 Column

In [None]:
df.drop("Item_visibility").display()

Case 2: Drop Multiple Columns as a time

In [None]:
df.drop("Item_visibility","Item_Type").display()

### DROP_DUPLICATES

Case 1 : Drop all the duplicate values in the data

In [None]:
df.dropDuplicates().display()

Case 2: Drop duplicates in perticular columns

In [None]:
df.drop_duplicates(subset=['Item_Type']).display()

### D_dup Data

In [None]:
df.distinct().display()

### UNION and UNION BY NAME

Prepared the data frames

In [None]:
data1 = [('1','cad'),
         ('2','bad'),]
schema1 = 'id string, name string'
df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','mad'),
         ('4','dad'),]
schema2 = 'id string, name string'
df2 = spark.createDataFrame(data2,schema2)

In [None]:
df1.display()

In [None]:
df2.display()

### Union

In [None]:
df1.union(df2).display()

###Union by Name

In [None]:
data1 = [('cad','1'),
         ('bad','2'),]
schema1 = 'name string, id string'
df1 = spark.createDataFrame(data1,schema1)
df1.display()

In [None]:
df1.union(df2).display( )

In [None]:
df1.unionByName(df2).display()

### STRING FUNCTIONS

1. Initcap()
2. lower()
3. upper()

In [None]:
df.select(initcap('Item_Type')).display()

### Date Functions

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
df = spark.read.format('csv')\
    .option('header','true')\
        .option('inferSchema','true')\
            .load(path_csv)

In [None]:
df.display()

1. Current_date()

In [None]:
df =df.withColumn('Curr_date',current_date())

In [None]:
df.display()

2. Date_add()

In [None]:
df = df.withColumn('week_after',date_add('Curr_date', 7))

In [None]:
df.display()

3. Date_sub()

In [None]:
df = df.withColumn('week_before', date_sub('Curr_date', 7))

In [None]:
df.display()

### DATEDIFF

--> to give the date difference

In [None]:
df = df.withColumn('diff', datediff('Curr_date','week_after'))

In [None]:
df.display()


Date_Format()

In [None]:
df = df.withColumn('week_before', date_format('week_before','dd-MM-yyyy'))

In [None]:
df.display()

### Handeling NULLS

Dropping Nulls

In [None]:
df.dropna('all').display()

In [None]:
df.dropna('any').display()

In [None]:
df.display()

Filling the Null Values

replace all the null values

In [None]:
df.fillna('NotAvailable').display()

In [None]:
df.fillna(value='NOtaVailablle', subset='Outlet_Size').display()

### SPLIT and Indexing

1. Split

In [None]:
df.withColumn('outlet_type', split('outlet_type', ' ')).display()

Indexing

In [None]:
df.withColumn('outlet_type', split('outlet_type', ' ')[1]).display()

## EXPLODE

In [None]:
df_exp = df.withColumn('outlet_type', split('outlet_type', ' '))


In [None]:
df_exp.display()

In [None]:
df_exp.withColumn('outlet_type', explode('outlet_type'))

In [None]:
df_exp.display()

### ARRAY_CONTAINS

In [None]:
df_exp.withColumn('type_flag', array_contains('outlet_type', 'Type1')).display()

### Group_By

Case 1 : Find SUM

In [None]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

Case 2 : find AVG

In [None]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

Case 3:

In [None]:
df.groupBy('Item_Type','outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

Case4:

In [20]:
df.groupBy('Item_Type','outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'),avg('Item_MRP').alias('Avg_MRP')).show()

+--------------------+-----------+------------------+------------------+
|           Item_Type|outlet_Size|         Total_MRP|           Avg_MRP|
+--------------------+-----------+------------------+------------------+
|       Starchy Foods|     Medium| 7124.136199999997| 148.4195041666666|
|Fruits and Vegeta...|     Medium|59047.217200000014| 142.9714702179177|
|       Starchy Foods|       NULL|         6040.6402|140.48000465116277|
|              Breads|       NULL|        10011.5004|139.04861666666667|
|        Baking Goods|       NULL|23433.838799999994|126.66939891891889|
|Fruits and Vegeta...|       NULL|49758.730999999985|142.57516045845267|
|        Frozen Foods|       High|12588.291000000001|         136.82925|
|         Soft Drinks|       High| 6456.165199999999|131.75847346938772|
|           Breakfast|      Small|3917.0407999999998|130.56802666666667|
|                Meat|     Medium| 20326.45059999999|136.41913154362408|
|Fruits and Vegeta...|       High| 20671.3475999999

### Collet_List

In [17]:
data = [('User 1', 'Book 1'),
        ('User 2', 'Book 2'),
        ('User 3', 'Book 1'),
        ('User 3', 'Book 2'),
        ('User 1', 'Book 3')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data, schema)

df_book.show()

+------+------+
|  user|  book|
+------+------+
|User 1|Book 1|
|User 2|Book 2|
|User 3|Book 1|
|User 3|Book 2|
|User 1|Book 3|
+------+------+



In [19]:
df_book.groupBy('user').agg(collect_list('book')).show( )

+------+------------------+
|  user|collect_list(book)|
+------+------------------+
|User 2|          [Book 2]|
|User 1|  [Book 1, Book 3]|
|User 3|  [Book 1, Book 2]|
+------+------------------+



Example

In [6]:
!pip install pyspark



In [11]:
from pyspark.sql import SparkSession # Import the SparkSession class from the pyspark.sql module
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('Practice').getOrCreate()

In [12]:
path = ('/content/sample_data/BigMart Sales.csv')

In [13]:
df = spark.read.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(path)

In [15]:
df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma