### Data Reading

In [0]:
dbutils.fs.ls('FileStore/tables/')

In [0]:
df = spark.read.format('csv').option('header',True).option('inferSchema',True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

### Data Reading JSON 

In [0]:
df_json = spark.read.format('json').option('inferSchema',True)\
                    .option('multiline',False).option('header',True)\
                        .load('/FileStore/tables/drivers.json')
df_json.display()

### Schema Definition

In [0]:
df.printSchema()

In [0]:
my_ddl_schema = '''


                Item_Identifier String,
                Item_Weight String,
                Item_Fat_Content String,
                Item_Visibility Double,
                Item_Type String,
                Item_MRP Double,
                Outlet_Identifier String,
                Outlet_Establishment_Year Int,
                Outlet_Size String,
                Outlet_Location_Type String,
                Outlet_Type String,
                Item_Outlet_Sales Double
                '''

In [0]:
df = spark.read.format('csv').schema(my_ddl_schema)\
    .option('header',True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.printSchema()

In [0]:
df.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
my_strct_schema = StructType([ StructField('Item_Identifier',StringType(),True), StructField('Item_Weight',StringType(),True), StructField('Item_Fat_Content',StringType(),True), StructField('Item_Visibility',StringType(),True), StructField('Item_MRP',StringType(),True), StructField('Outlet_Identifier',StringType(),True), StructField('Outlet_Establishment_Year',StringType(),True), StructField('Outlet_Size',StringType(),True), StructField('Outlet_Location_Type',StringType(),True), StructField('Outlet_Type',StringType(),True), StructField('Item_Outlet_Sales',StringType(),True)
])


In [0]:
df = spark.read.format('csv')\
.schema(my_strct_schema)\
.option('header',True)\
.load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.printSchema()

### Select

In [0]:
df_sele  = df.select('Item_Identifier','Item_Weight','Item_Fat_Content').display()

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()


#### Alias

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER

#### Scenario 1

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

### Scenario 2

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10)).display()

### Scenario 3

In [0]:
df.filter((col('Outlet_Location_Type').isin('Tier 1','Tier 2')) & col('Outlet_Size').isNull()).display()

### WithColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

### WithColumn

In [0]:
df = df.withColumn('flag',lit("new"))
df.display()

In [0]:
df.withColumn('multiply',col('Item_Weight')*col('Item_MRP')).display()

In [0]:
df.display()

## Scenario 2

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","LF")).display()

## Type Casting

In [0]:
df = df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))
df.display()

### Sort or Order By

#### Scenario 1

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col('Item_Visibility').asc()).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,0]).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).display()

### Limit

In [0]:
df.limit(10).display()

### Drop

In [0]:
df.drop('Item_Visibility').display()

In [0]:
df.drop('Item_Visibility','Item_Type').display()

##  drop Duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df.dropDuplicates(subset=['Item_Type']).display()

In [0]:
df.distinct().display()

In [0]:
data1 = [('1','kad'),('2','sid')]
schema1 = 'id String , name string'

df1 = spark.createDataFrame(data=data1,schema=schema1)

data2 = [('3','rahul'),('4','jas')]
schema2 = 'id String , name string'

df2 = spark.createDataFrame(data=data2,schema=schema2)


In [0]:
df1.display()
df2.display()

In [0]:
df1.union(df2).display()

In [0]:
data1 = [('kad','1'),('sid','2')]
schema1 = 'name String , id string'

df1 = spark.createDataFrame(data=data1,schema=schema1)


In [0]:
df1.display()

In [0]:
df1.unionByName(df2).display()

In [0]:
df.display()

### String Functions

#### initcap

In [0]:
df.select(initcap('Item_Type').alias('CapitalizedType')).display()

#### lower() and upper()

In [0]:
df.select(lower('Item_Type').alias('LowerType')).distinct().display()
df.select(upper('Item_Type').alias('UpperType')).distinct().display()

#### Date Functions

In [0]:
df = df.withColumn('curr_date',current_date())
df.display()

In [0]:
df= df.withColumn('one_week_later',date_add(col('curr_date'),7))
df.display()

In [0]:
df= df.withColumn('one_week_before',date_sub(col('curr_date'),7))
df.display()

In [0]:
df= df.withColumn('one_week_before_using_date_add',date_add(col('curr_date'),-7))
df.display()

#### datediff

In [0]:
df.withColumn('Date_Difference',datediff('one_week_later','curr_date')).display()


#### date format

In [0]:
df=df.withColumn('one_week_before',date_format('one_week_before','dd-MM-yyyy'))
df.display()

#### handling nulls

##### dropping nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

#### filling Nulls

In [0]:
df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))
df.printSchema()

In [0]:
df.fillna('NA').display()

In [0]:
df.fillna('NA',subset=['Outlet_Size']).display()

##### Split and indexing

In [0]:
df.withColumn('Outlet_Type',split("Outlet_Type",' ')).display()

In [0]:
df.withColumn('Outlet_Type',split("Outlet_Type",' ')[1]).display()

### explode

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

#### Array_contains

In [0]:
df_exp.display()
df_exp.withColumn('Type1_Flag',array_contains('Outlet_Type','Type1')).display()

### Group by

In [0]:
df.limit(10).display()

In [0]:
df.groupBy(col('Item_Type')).agg(sum(col('Item_MRP')).alias('SUM_MRP')).display()

In [0]:
df.groupBy(col('Item_Type')).agg(avg(col('Item_MRP')).alias('Avg_MRP')).display()

In [0]:
df.groupBy(col('Item_Type'),col('Outlet_Size')).agg(sum(col('Item_MRP')).alias('Sum_MRP')).orderBy(col('Item_Type')).display()

In [0]:
df.groupBy(col('Item_Type'),col('Outlet_Size'))\
    .agg(sum(col('Item_MRP')).alias('Sum_MRP'),avg(col('Item_MRP')).alias('Avg_MRP'))\
    .orderBy(col('Item_Type')).display()

### Collect List

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]
schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)
df_book.display()

In [0]:
df_book.groupBy(col('user')).agg(collect_list(col('book'))).display()

#### Pivot

In [0]:
df.groupBy(col('Item_Type')).pivot('Outlet_Size').agg(avg(col('Item_MRP'))).display()

#### when-otherwise

In [0]:
df_veg = df.withColumn('VegFlag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df_veg.display()

In [0]:
df_veg.withColumn('Veg_Exp_Flag',when(((col('VegFlag')=='Veg') & (col('Item_MRP')>100)),'Veg Expensive')\
    .when(((col('VegFlag')=='Veg') & (col('Item_MRP')<100)),'Veg InExpensive')\
    .otherwise('Non Veg')
).display()

#### Joins

In [0]:
df

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)
     


In [0]:
df1.display()
df2.display()

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'inner').display()

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('rn',row_number().over(Window.orderBy('Item_Identifier'))).display()

In [0]:
df.withColumn('rnk',rank().over(Window.orderBy('Item_Identifier'))).display()

In [0]:
df.withColumn('dense_rnk',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

## Cumulative Sum

In [0]:
df.withColumn('cum_sum',sum(col('Item_MRP')).over(Window.orderBy(col('Item_Type')))).display()

In [0]:
df.withColumn('cum_sum',sum(col('Item_MRP')).over(Window.orderBy(col('Item_Type')).rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
df.withColumn('total_sum',sum(col('Item_MRP')).over(Window.orderBy(col('Item_Type')).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

### User Defined Functions UDF

In [0]:
def my_func(x):
    return x+1

In [0]:
my_udf  = udf(my_func)

In [0]:
df.withColumn('my_new_col',my_udf('Item_MRP')).display()

#### Data Writing

#### csv

In [0]:
df.write.format('csv')\
    .save('/FileStore/tables/CSV/data.csv')

### Append

In [0]:
df.write.format('csv')\
    .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')

In [0]:
df.write.format('csv')\
    .mode('append')\
        .option('path','/FileStore/tables/CSV/data.csv').save()

### overwrite

In [0]:
df.write.format('csv')\
    .mode('overwrite')\
        .option('path','/FileStore/tables/CSV/data.csv').save()

### error

In [0]:
df.write.format('csv')\
    .mode('error')\
        .save('/FileStore/tables/CSV/data.csv')

#### ignore

In [0]:
df.write.format('csv')\
    .mode('ignore')\
        .save('/FileStore/tables/CSV/data.csv')

### Parquet `file`

In [0]:
df.write.format('parquet')\
    .mode('overwrite')\
        .option('path','/FileStore/tables/CSV/data.csv').save()

#### Table

In [0]:
df.write.format('parquet')\
    .mode('overwrite')\
        .saveAsTable('my_table1')

In [0]:
df.display()

### Spark SQL

In [0]:
df.createTempView('my_view')

In [0]:
%sql
select * from my_view;

In [0]:
df_new = spark.sql("select * from my_view where Item_type='Dairy'")

In [0]:
df_new.display()