In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
df= spark.read.format('csv').option('inferSchema',True)\
    .option('header',True)\
        .load('/Volumes/spakr_data/default/datastorage/BigMart Sales.csv')

## when otherwise

In [0]:
df_veg_flag=df.withColumn('Veg_nonVeg_flag', when(col('Item_Type')=='Meat','NonVeg').otherwise('Veg'))
df_veg_flag.display()

In [0]:
df_veg_flag.withColumn('Veg_expensive_flag', when((col('Veg_nonVeg_flag')=='Veg') & (col('Item_MRP')< 100), 'Veg_non_expensive')\
    .when((col('Veg_nonVeg_flag')=='Veg') & (col('Item_MRP')> 100), 'Veg_expensive').otherwise('NonVeg')).display()

## Joins


In [0]:
dataj1 = [
    (1, 'gaur', 'd01'),
    (2, 'kit', 'd02'),
    (3, 'sam', 'd03'),
    (4, 'tim', 'd03'),
    (5, 'aman', 'd05'),
    (6, 'nad', 'd06'),
    (7, 'sam', 'd07')
]
schema1 = "Emp_id INT, Emp_Name STRING, DepartmentId STRING"
df1 = spark.createDataFrame(
    data=dataj1,
    schema=schema1
)
display(df1)

In [0]:
dataj2= [
    ('d01', 'hr'),
    ('d02', 'it'),
    ('d03', 'Marketing'),
    ('d04','Sales'),
    ('d05','Finance'),
    ('d06','Production'),
    ('d08','Distribution')]
schema2 = "DepartmentId STRING, Department STRING"
df2 = spark.createDataFrame(
    data=dataj2,
    schema=schema2
)
display(df2)

In [0]:
#Inner join
df1.join(df2, df1.DepartmentId == df2.DepartmentId, 'inner').display()

In [0]:
# Inner join way2
df1.join(df2, df1['DepartmentId']==df2['DepartmentId'], 'inner').display()


In [0]:
#left join
df1.join(df2, df1.DepartmentId == df2.DepartmentId, 'left').display()

In [0]:
#left join
df1.join(df2, df1.DepartmentId == df2.DepartmentId, 'right').display()

In [0]:
#anti join
df1.join(df2, df1.DepartmentId == df2.DepartmentId, 'anti').display()
#anti join
df2.join(df1, df2.DepartmentId == df1.DepartmentId, 'anti').display()

## Window functions


In [0]:
from pyspark.sql.window import Window

In [0]:
#row number function which gives unique row number based on the order of the column
df.withColumn('RowNum',row_number().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
df.withColumn('RowNum',row_number().over(Window.partitionBy('Outlet_Identifier').orderBy(col('Item_Identifier')))).display()

In [0]:
#rank function which gives unique rank based on the order of the column and if there is a tie, it will give the same rank and skips the next rank
df.withColumn('Rank',rank().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
# dense rank function which gives unique rank based on the order of the column and if there is a tie, it will give the same rank and does not skip the next rank
df.withColumn('DenseRank',dense_rank().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
df.withColumn('rank', rank().over(Window.orderBy(col('Item_Identifier')))).withColumn('DenseRank', dense_rank().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
# cumulative sum gives the sum of the column based on the order of the column but gives same value for all the rows belonging to same partition

df.withColumn('CumSum',sum(col("Item_MRP")).over(Window.orderBy('Item_Type'))).display()

In [0]:
# Cumulative sum gives sum till current row
df.withColumn('Cumsum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
# Gives sum of all rows so same value for all
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type')\
                                            .rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

## User defined functions(UDF)

In [0]:
def square(x):
    return x*x

In [0]:
print(square(5))

In [0]:

df.withColumn('Item_MRP_square',udf(square)(col('Item_MRP'))).display()

In [0]:
df.withColumn('Item_MRP_square',square(col('Item_MRP'))).display()

## writing data   


In [0]:
df.write.format('csv').save('/Volumes/spakr_data/default/datastorage/BigMart SalesTransformed')

In [0]:
#append mode
df.write.format('csv')\
    .mode('append')\
        .save('/Volumes/spakr_data/default/datastorage/BigMart SalesTransformed/Data.csv')

In [0]:
df.write.format('csv')\
    .option('path','/Volumes/spakr_data/default/datastorage/BigMart SalesTransformed/Data.csv')\
    .mode('append')\
    .save()


In [0]:
#Overwrite
df.write.format('csv')\
    .mode('overwrite')\
    .save('/Volumes/spakr_data/default/datastorage/BigMart SalesTransformed/Data.csv')

In [0]:
#error mode
df.write.format('csv')\
    .mode('error')\
    .save('/Volumes/spakr_data/default/datastorage/BigMart SalesTransformed/Data.csv')

In [0]:
#parquet format 
df.write.format('parquet')\
    .mode('overwrite')\
    .save('/Volumes/spakr_data/default/datastorage/BigMart SalesTransformed/Data.parquet')

In [0]:
# Save as Delta table
df.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("BigMartSales")

In [0]:
%sql
SHOW TABLES IN default;


SQL views

In [0]:
df.createTempView('myview')
result=spark.sql("select * from myview where Item_Fat_Content = 'Low Fat'")

In [0]:
result.display()