### # **Importing Library**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

### **Data**

In [0]:
df=spark.read.csv('/Volumes/workspace/data/bigmartdata/BigMart Sales.csv',inferSchema=True,header=True)
df.display()

### **Print Schema | Record Count | No of Columns**

In [0]:
df.printSchema()
print(df.count())
print(len(df.columns))

### **Select Columns | Alias | Limit**

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight')).limit(4).display()

df.select(col('Item_Identifier').alias('X'),col('Item_Weight').alias('Y')).limit(4).display()

### **Implement limit and offset getting records from line 3 and 4**

In [0]:
U=1
L=4

df.select(col('Item_Identifier')).limit(L)\
    .withColumn('id',monotonically_increasing_id()).orderBy(col('id').desc())\
    .limit(L-U)\
    .drop('id')\
    .display()

### **Filter Data**

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10)).display()

df.filter(col('Outlet_Location_Type').isin('Tier 1','Tier 2') & col('Outlet_Size').isNotNull()).display()

### **Rename Column**

In [0]:
df.withColumnRenamed('Outlet_Size','Outlet_Size').display()

### **Add new column
###   Constant Value : Lit
###   Variable Value : multiply**

In [0]:
df.withColumn('flag',lit(0))\
    .withColumn('multiply',col('Item_Visibility')*col('Item_MRP'))\
        .display()

### **String Functions**

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','LF'))\
    .withColumn('Item_Fat_Content',regexp_replace('Item_Fat_Content','Regular','Rg'))\
        .display()


df.withColumn('Item_Weight',col('Item_Weight').cast(StringType())).display()

df.select(initcap(col('Item_Type'))).display()

df.select(lower(col('Item_Type'))).display()
df.select(upper(col('Item_Type'))).display()

### **Order By Asc or Desc**

In [0]:
df.orderBy(col('Item_Weight').desc()).orderBy(col('Item_Visibility').asc()).display()

df.select(col('Item_Identifier')).distinct().orderBy(col('Item_Identifier').asc()).display()

### **Aggregate Function**

In [0]:
df.agg(max(col('Item_Visibility'))).display()

df.select(col('Item_Visibility')).limit(1).display()



### **Dropping Column**

In [0]:
df.drop(col('Outlet_Type'),col('Outlet_Identifier')).display()

### **Dropping Duplicates**

In [0]:
df.drop_duplicates().display()

df.drop_duplicates(subset=['Item_Identifier']).display()

### **Union between DataFrames**

In [0]:
d1=spark.createDataFrame(
    [
        [1,'A'],
        [2,'B']
    ],
    'id int,name string'
)

d2=spark.createDataFrame(
    [
        ['X',10],
        ['Y',11]
    ],
    'name string,id int'
)

d1.unionByName(d2).display()



### **Date Functions**

In [0]:
df=df.withColumn('curr_date',current_date())


df=df.withColumn('last_week',date_sub(col('curr_date'),7)).withColumn('next_week',date_add(col('curr_date'),7))


df.select(date_diff(col('last_week'),col('next_week')).alias('date_diff')).display()

df.select(date_format(col('last_week'),'MM-yyyy')).display()

### **Handling Nulls**

In [0]:
df.dropna('all').display()

df.dropna('any').display()

df.dropna(subset=['Outlet_Size']).display()

df.fillna('Not Available').display()

df.fillna('Not Available',subset=['Outlet_Size']).display()

### **Split | Explode | array_contains**

In [0]:
df.select(split(col('Outlet_Type'),' ')[1]).display()

new_df=new_df.withColumn('Outlet_Type',explode(split(col('Outlet_Type'),' ')))
new_df.display()

df.select(array_contains(split(col('Outlet_Type'),' '),'Type1')).display()

### **GroupBy**

In [0]:
df.groupby(col('Item_type')).agg(sum(col('Item_MRP'))).display()

df.groupby(
    col('Item_Type'),col('Outlet_Size')
).agg(
    sum(col('Item_MRP')).alias('S')
).display()




### **GroupBy collect_list**

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]
schema='user string,book string'
df_bk=spark.createDataFrame(data,schema)

df_bk.groupby(col('user')).agg(collect_list(col('book'))).display()

### **GroupBy Pivot**

In [0]:
df.groupby(col('Item_Type')).pivot('Outlet_Size').agg(sum(col('Item_MRP'))).display()

### **When Clause**

In [0]:
df.select(col('Outlet_Size'),
          when(col('Outlet_Size')=='High','H')
          .when(col('Outlet_Size')=='Medium','M')
          .when(col('Outlet_Size')=='Small','S')
          .otherwise('N/A')
          .alias('out')
          ).display()

### **Joins | Left | Right | Anti | Full**

In [0]:
df1=spark.createDataFrame(
    [
        [1,'a'],
        [2,'b']
    ],
    'id int,v1 string'
)
df2=spark.createDataFrame(
    [
        [2,'m'],
        [3,'p']
    ],
    'id int,v2 string'
)

df1.join(df2,on='id',how='inner').display()

df1.join(df2,on='id',how='left').display()

df1.join(df2,on='id',how='right').display()

df1.join(df2, on='id', how='left_anti').display()

df1.join(df2,on='id',how='full').display()

### **Window Functions | Row Number | Rank | Dense Rank | Total Sum | Cumulative Sum**

In [0]:
new_df.withColumn('row_number',row_number().over(Window.orderBy(col('Item_Identifier')))).display()

new_df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier')))).display()

new_df.withColumn('dense_rank',dense_rank().over(Window.orderBy(col('Item_Identifier')))).display()

new_df.withColumn('total_sum',sum(col('Item_MRP')).over(Window.orderBy(col('Item_Identifier')))).display()

new_df.withColumn('cum_sum', sum(col('Item_MRP'))
    .over(Window.orderBy(col("Item_Identifier"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow))
).display()

new_df.withColumn('cum_sum', sum(col('Item_MRP'))
    .over(Window.orderBy(col("Item_Identifier"))
    .rowsBetween(Window.currentRow, Window.unboundedFollowing))
).display()


### **User Defined Functions | UDF || Not optimised as api**

In [0]:
def my_func(x):
    return x[:3]+'_'+x[3:]

pyspark_func=udf(my_func)

new_df.select(pyspark_func(col('Item_Identifier'))).display()

### **Writing to File and Table**

In [0]:
# append | overwrite | error | ignore
new_df.write.format('delta')\
    .mode('overwrite')\
    .save('/Volumes/workspace/data/bigmartdata/delta_file')

new_df.write.mode('overwrite').saveAsTable('My_Table')

### **DataFrame -> TempView -> Execute SQL -> DataFrame**

In [0]:
new_df.createTempView('My_View')

%sql
select * from My_View;


new_sql_df=spark.sql('select * from My_View')
new_sql_df.display()