### Data Reading ###


In [0]:
df = spark.read \
    .format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/default/my_volume/BigMart Sales.csv")
    
df.display()


In [0]:
df_json1 = spark.read.format("json").option("inferSchema", "true").load('/Volumes/workspace/default/my_volume/drivers.json')


In [0]:
df_json1.display()


In [0]:
df.printSchema()

### DDL Schema ###

In [0]:
"""my_ddl_schema = '''Item_Identifier string, Item_Weight string, Item_Fat_Content string, Item_Visibility double, Item_Type string, Item_MRP double, Outlet_Identifier string, Outlet_Establishment_Year long, Outlet_Size string, Outlet_Location_Type string, Outlet_Type string, Item_Outlet_Sales double'''
df = spark.read \
    .format("csv") \
    .schema(my_ddl_schema) \
    .option("header", "true") \
    .load("/Volumes/workspace/default/my_volume/BigMart Sales.csv")

df.printSchema()"""

In [0]:
df.display()

 # read the original data frame for further use cases #

### Select Command ###

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df.select(col("Item_Identifier"), col("Item_Weight"), col("Item_Fat_Content")).display()

### Alias ###

In [0]:
df.select(col("Item_Identifier").alias("Item_Id")).display()

### Filter ###

### Scenario -1 ###
filter item fat content where it is regular

In [0]:
df.filter(col("Item_Fat_Content") == "Regular").display()


 ### Scenario - 2 ###

 filtering using 2 conditions

In [0]:
df.filter((col("Item_Type") == 'Soft Drinks') & (col('Item_Weight') < 10)).display()

### Scenario - 3 ###
filtering with 3 conditions

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1', 'Tier 2'))).display()

### With column renamed ###

In [0]:
df.withColumnRenamed("Item_Weight", "Item Wt").display()

### withColumn ###
scenario - 1

In [0]:
df = df.withColumn('flag',lit('new'))

In [0]:
df.display()

In [0]:
df.withColumn('multiply',col('Item_Weight')*col('Item_MRP')).display()

### scenario -2 ###
in item fat content replace low fat and regular with lf and reg

In [0]:
df = df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf"))

df.display()

### TypeCasting ###

In [0]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast('String'))

In [0]:
df.printSchema()

### Sort/orderBy ###

In [0]:
df.sort(col('Item_Weight').desc()).display()


In [0]:
df.sort(col('Item_Visibility').asc()).display()


### Scenario 3 ###
sorting based on multiple columns

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,1]).display()

### LIMIT Function ###

In [0]:
df.limit(10).display()

### Drop ###


In [0]:
df.drop('Item_Visibility','Item_Type').display()

### Drop Duplicates ###
scenario 1

In [0]:
df.dropDuplicates().display()


### Drop Duplicates ###
scenario 2

In [0]:
df.drop_duplicates(subset=['Item_Type']).display()


###UNION and UNION BY NAME###
Preaparing Dataframes


In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()
df2.display()


###Union###


In [0]:
df1.union(df2).display()

In [0]:
data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)

df1.display()

In [0]:
df1.union(df2).display()

###Union by Name###


In [0]:
df1.unionByName(df2).display()


###String Functions###
initcap
upper
lower

In [0]:
df.select(upper('Item_Type').alias('upper_Item_Type')).display()


###Date Functions###
current date

In [0]:
df = df.withColumn('curr_date',current_date())

df.display()

Date_Add()


In [0]:
df = df.withColumn('week_after',date_add('curr_date',7))

df.display()

Date_Sub()


In [0]:
df.withColumn('week_before',date_sub('curr_date',7)).display()


In [0]:
df = df.withColumn('week_before',date_add('curr_date',-7)) 

df.display()

DateDIFF


In [0]:
df = df.withColumn('datediff',datediff('week_after','curr_date'))

df.display()

Date_Format()


In [0]:
df = df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))

df.display()

###handling nulls###
dropping nulls

In [0]:
#it will drop a row when all the columns in that row has null values
df.dropna('all').display()


In [0]:
#it will drop a row when any one or more of the columns in that row has null values
df.dropna('any').display()

In [0]:
#it will drop null values in the column 'Item_Weight'
df.dropna('any',subset=['Item_Weight'])
df.display()

###filling nulls###


In [0]:
#fill all null values with 'NotAvailable'
df.fillna('NotAvailable').display()


In [0]:
#fill null values in the column 'Outlet_Size'
df.fillna('NotAvailable',subset=['Outlet_Size']).display()


###SPLIT and Indexing###
##Split##

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()


####Indexing####

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()


###Explode###

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))

df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()


In [0]:
df_exp.display()


In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()


### GroupBY
Scenario - 1

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()


Scenario - 2


In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()


### SCenario - 3


In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()


### Scenario - 4


In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()


###collect list

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()


###Pivot


In [0]:
df.select('Item_Type','Outlet_Size','Item_MRP').display()


In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()


###When-Otherwise

In [0]:
df = df.withColumn(
    'veg_flag',
    when(col('Item_Type') == 'Meat', 'Non-Veg').otherwise('Veg')
)

display(df)



scenario-2

In [0]:
df.withColumn('veg_exp_flag',when(((col('veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display()

In [0]:
df = df.withColumn(
    'veg_exp_flag',
    when((col('veg_flag') == 'Veg') & (col('Item_MRP') < 100), 'Veg_Inexpensive')
    .when((col('veg_flag') == 'Veg') & (col('Item_MRP') > 100), 'Veg_Expensive')
    .otherwise('Non_Veg')
)

display(df)

####JOINS

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()
df2.display()


###Inner Join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').display()


###Left Join


In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

###Right Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

###Anti Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()


###WINDOW FUNCTIONS
row_number(())


In [0]:
df.display()


In [0]:
from pyspark.sql.window import Window


In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()


###Rank vs Dense Rank

In [0]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()
     

In [0]:
df.withColumn('dum',sum('Item_MRP').over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()


###Cumulative Sum


In [0]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()


In [0]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()


In [0]:
df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()


###USER DEFINED FUNCTIONS (UDF)
Step - 1

In [0]:
def my_func(x):
    return x*x 

### STEP - 2


In [0]:
my_udf = udf(my_func)


In [0]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()


###DATA WRITING
CSV

In [0]:
df.write.format('csv')\
        .save('/FileStore/tables/CSV/data.csv')

###APPEND

In [0]:
df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')

In [0]:
df.write.format('csv')\
        .mode('append')\
        .option('path','/FileStore/tables/CSV/data.csv')\
        .save()

###Overwrite

In [0]:
df.write.format('csv')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

###Error


In [0]:
df.write.format('csv')\
.mode('error')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

###Ignore

In [0]:
df.write.format('csv')\
.mode('ignore')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()
     

###PARQUET

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

###TABLE

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')

In [0]:
df.display()
