In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *

In [0]:
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/workspace/landing/datafiles/BigMartSales.csv')

In [0]:
df.display()

### Union and Union by name

### Preparing Dataframes

In [0]:
schema_one = "id String, name string"
data_one = [("1", "John"), ("2", "Mary")]

df_one = spark.createDataFrame(data_one, schema_one)

data2 = [("3", "rahul"), ("4", "jas")]
schema2 = "id STRING, name STRING"

df_two = spark.createDataFrame(data2, schema2)


In [0]:
df_one.display()
df_two.display()

In [0]:
df_one.union(df_two).display()

In [0]:
schema_one = "name String, id string"
data_one = [( "John","1"), ("Mary","2")]

df_one = spark.createDataFrame(data_one, schema_one)
df_one.display()

In [0]:
df_one.union(df_two).display()

### Union by name

In [0]:
df_one.unionByName(df_two).display();

### String Functions


### Initcap

In [0]:
df.display();

In [0]:

### Used to proper the case of the Item_Type column
df.select(initcap(col("Item_Type"))).display( )

In [0]:
### Used to lower the case of the Item_Type column
df.select(lower(col("Item_Type"))).display( )

In [0]:
### Used to upper the case of the Item_Type column
df.select(upper('Item_Type').alias('upper_Item_Type')).display()

### Date Functions

### Current_Date

In [0]:
df.display()

In [0]:
### Used to add a new column and current_date to add the current date
df=df.withColumn('curr_date', current_date())
df.display()

In [0]:
### Date add with 7 days
df=df.withColumn('week_after', date_add(col('curr_date'),7))
df.display()

In [0]:
### Date sub with 7 days
df=df.withColumn('week_before', date_add(col('curr_date'),-7))
df.display()

In [0]:
### Calculate date difference
df=df.withColumn('date_diff', datediff(col('week_after'),col('curr_date')))
df.display()

In [0]:
### Date Format
df=df.withColumn('week_before', date_format(col('week_before'),'dd-mm-yyyy'))
df.display()

### Handling Nulls

In [0]:
### Dropping Nulls
### drop the all records with null values in any column
df.dropna(how='any').display()
### Dropping Duplicates
###df.dropDuplicates().display()

In [0]:
### Drop the null value in Outlet_Size
df=df.dropna(subset=['Outlet_Size'])
df.display()

### Filling Nulls

In [0]:
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/workspace/landing/datafiles/BigMartSales.csv')
df.display()

In [0]:
### Replace all the null with provided value
df.fillna('NotAvailable').display()

In [0]:
### Replace the null values in Outlet_Size column with notavaibale we cannot change interger values
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

In [0]:
my_ddl_schema = """
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE
"""
df = spark.read.format('csv')\
            .schema(my_ddl_schema)\
            .option('header',True)\
            .load('/Volumes/workspace/landing/datafiles/BigMartSales.csv')
df.display()
df.printSchema()

In [0]:
df.fillna('NotAvailable',subset=['Item_Weight']).display()

### Split and Indexing

In [0]:
### Splite used to split the column values into array of values
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')).display()

In [0]:
### Indexing -> split indxing used to value avaiable at that index
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')[1]).display()

##df.withColumn('Outlet_Type', split('Outlet_Type', ' ').getItem(1)).display()

### Explode

In [0]:
df_exp=df.withColumn('Outlet_Type', split('Outlet_Type', ' '))
df_exp.display()

In [0]:
### explode used to explode the array values into rows
df_exp.withColumn('Outlet_Type', explode('Outlet_Type')).display()

In [0]:
### array contains type1 return that rows
df_exp.withColumn('Type_Flag', array_contains('Outlet_Type','Type1')).display()

### Group By

In [0]:
df.display()

In [0]:
### Group by used to group to get the sum of the column
df.groupBy("Item_Type").agg(sum("Item_MRP").alias('Total_MRP')).display()

In [0]:
### Group by used to group to get the avg of the column
df.groupBy("Item_Type").agg(avg("Item_MRP").alias('Avg_MRP')).display()

In [0]:
df.display()

In [0]:
### Used to group by multiple columns and appply the avg function to get the avg of the column
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
### Apply group by two column and find sum as well as avg of the column
df.groupBy("Item_Type", "Outlet_Size").agg(sum("Item_MRP").alias("Total_MRP"), avg("Item_MRP").alias("Avg_MRP")).display()

### Collect_List


In [0]:
### Preparing data to perform collect list
data_list=[
    ('user1','book1'),
    ('user1','book2'),
    ('user2','book2'),
    ('user2','book4'),
    ('user3','book1'),
    ('user3','book2')
]
data_schema=['user','book']
df_collect_list=spark.createDataFrame(data_list,data_schema )
df_collect_list.display()
### Collect list used to get the list of the column


In [0]:
### Collect the books(list) belong to user
df_collect_list.groupBy("user").agg(collect_list("book").alias("books")).display()

### PIVOT

In [0]:
df.select('Item_Type','Outlet_Size','Item_MRP').display()

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### When-Otherwise

In [0]:
df.display()

In [0]:
### when and otherwise case if the condition is true then it will return the value
df_when_otherwisecase=df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df_when_otherwisecase.display()

In [0]:

df_when_otherwisecase.withColumn('veg_exp_flag', 
    when((col('veg_flag')=='Veg') & (col('Item_MRP')<100), 'Veg_Inexpensive')
    .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100), 'Veg_Expensive')
    .otherwise('Non_Veg')
).display()
                
