##Databricks Tutorial

### Reading Data

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read \
    .format("CSV")\
    .option("header", "true") \
    .option("inferSchema", "true")\
    .load("/Volumes/workspace/default/my_volume/big_mart_sales.csv")

df.display()



### DDL Schema

In [0]:
my_schema = '''
                Item_Identifier string,
                Item_Weight string,
                Item_Fat_Content string,
                Item_Visibility string,
                Item_Type string,
                Item_MRP double,
                Outlet_Identifier string,
                Outlet_Establishment_Year integer,
                Outlet_Size string,
                Outlet_Location_Type string,
                Outlet_Type string,
                Item_Outlet_Sales string
            '''

### Select

#### method 1 

In [0]:
df.select('Item_Fat_Content',"Item_Identifier", "Item_MRP", "Outlet_Establishment_Year").display()

#### method 2 | standardize way

In [0]:
df.select(
    col('Item_Identifier'), 
    col('Item_Weight'), 
    col('Item_Fat_Content'),
    col("Item_Outlet_Sales")
).display()

### Alias
Renames a column (temporary name in result)

In [0]:
df.select(col("Item_Identifier").alias('Item_Id')).display()

### Filter/Where

### Scenario 1 

In [0]:
df.filter(
            (col("Item_Type")=="Soft Drinks") & 
            (col("Item_Weight")<10)
).orderBy(
    col("Item_Weight").asc()
).limit(10).display()

### Scenario 2

In [0]:
df.filter(
    (col("Outlet_Size").isNotNull()) &
    (col("Outlet_Location_Type").isin("Tier 1", "Tier 2"))\
).orderBy(col("Outlet_Location_Type").asc()).limit(20).display()

### WithColumnRenamed
renames column name at Data Frame level

In [0]:
df = df.withColumnRenamed("Item_Weight","Item_wt")
df.display()

### WithColumn   
add/modify colum

#### Scenario 1

In [0]:
df = df.withColumn("Flag",lit("new"))

#### Scenario 2

In [0]:
df = df.withColumn("product_wt_mrp",col("Item_Wt")*col("Item_MRP"))
df.display()


#### Scenario 3

In [0]:
from pyspark.sql.functions import *
df = df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf"))

df.display()

### Typacasting

In [0]:
df = df.withColumn('Item_Visibility',col('Item_Visibility').cast("string"))
df.display()
df.printSchema()

### Sort

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col('Item_Visibility').asc()).display()

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).display()

### limit

In [0]:
df.limit(10).display()

### Drop

In [0]:
df.drop('Item_Weight').display()

In [0]:
df.drop('Item_Weight','Item_Visibility').display()

### Drop Duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=['item_wt']).display()

### Union and UnionByname

In [0]:
from pyspark.sql.functions import *
data1 = [('1','Aditi'),
         ('2','Harshal')
        ]
schema1 = 'id string, name string' 
df1= spark.createDataFrame(data1,schema1)

data2 = [
    ('3','Prajwal'),
    ('4','Saurabh')
]
schema2 = 'id string,name string'
df2 = spark.createDataFrame(data2,schema2)


data3 = [('Tanmay','5'),('Rajlaxmi','6')]
schema3 = 'name string, id string'
df3 = spark.createDataFrame(data3,schema3)

In [0]:
df1.display()
df2.display()
df1Uniondf2 = df1.union(df2)


In [0]:
df1Uniondf2.union(df3).display()

### unionByName
you can see the proble in above cell right, we have done union but the data is messed, <br>
--> thats when we use unionByName

In [0]:
df1Uniondf2.unionByName(df3).display()

### String Functions

#### initcap()

In [0]:
df.select(initcap('Item_Type')).limit(5).display()

#### lower()

In [0]:
df.select(lower('Item_Type')).limit(5).display()

#### upper()

In [0]:
df.select(upper('Item_Type')).limit(5).display()

### Date Functions

#### current_date()

In [0]:
check_point1 = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/workspace/default/my_volume/1_tutorial_bigmart.csv')
df = check_point1

In [0]:
df = df.withColumn("current_date", current_date().cast('date'))
df.display()

#### date_add()

In [0]:
df = df.withColumn("week_after",date_add('current_date',7))
df.display()

#### date_sub()

In [0]:
#df.withColumn("weekbefore",date_sub(col('current_date'),7))

# we can also use date_add giving negative values
df = df.withColumn('week_before',date_add(col('current_date'),-7))
df.display()


#### datediff()

In [0]:

df = df.withColumn('date_diff',datediff('week_after','current_date'))
df.display()

#### date format

In [0]:
df = df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))

df.display()
     

### Handling Nulls 

In [0]:
# df = df.withColumn('Outlet_Size',regexp_replace('Outlet_Size','null',None))
# df.display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna('all').display()

In [0]:
df.dropna(subset=['Outlet_Type']).display()

#### Filling Nulls

In [0]:
df = df.fillna('NA')
df.display()

In [0]:
df.fillna('NA',subset=['Outlet_Size'])
df.display()

### Split and Indexing

#### split

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

#### indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### Explode
it woorks with arrays and lists it seperates value wise records you can see we have created seperate datframe cause we dont want to ruin our main dataframe

In [0]:
df_explode = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_explode.display()

In [0]:
df_explode.withColumn('Outlet_Type',explode('outlet_type')).display()

### Array_Contains

In [0]:
df_explode.withColumn('type1_flag',array_contains('Outlet_type','Type1')).display()

### Group By

#### scenario 1 sum

In [0]:
df.groupby('item_type').agg(sum('item_mrp')).display()

#### scenario 2 avg

In [0]:
df.groupby('item_type').agg(avg('item_mrp')).display()

#### scenario 3 

In [0]:
df.groupby('Item_Type','Outlet_Size').agg(sum('Item_MRP'))\
    .alias('Total_MRP').display()

#### scenario 4

In [0]:
df.groupby('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

### Collect list

In [0]:
data = [
    ('user1','book1'),
    ('user2','book2'),
    ('user3','book3'),
    ('user4','book4'),
    ('user5','book5')]
schema = 'user String, book String'
dfbook = spark.createDataFrame(data,schema)
dfbook.display()


In [0]:
dfbook = dfbook.groupby('user').agg(collect_list('book'))
dfbook.display()

### Pivot

In [0]:
df.display()

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(round(avg('Item_MRP'),4)).display()

### When Otherwise

In [0]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df.display()

In [0]:
df.withColumn('veg_exp_flag',when((col('veg_flag')=='Veg') & (col('Item_MRP')<100),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non-Veg')).display()

### Joins
#### Inner Join
#### Left Join
#### Right Join
#### Anti Join