## data reading 


In [0]:
file_path = "/Volumes/workspace/dataset_volumes/sales/BigMart Sales.csv"

salesdf = spark.read.format('csv').option('inferSchema',True).option('header',True).load(file_path)

In [0]:
display(salesdf.limit(10))


## Read Json Data 


In [0]:
json_path = "/Volumes/workspace/dataset_volumes/sales/drivers.json"
df_json = spark.read.option('header',True).option('inferSchema',True)\
            .json(json_path)

In [0]:
display(df_json.limit(10))

In [0]:
salesdf.printSchema()

In [0]:
display(salesdf.columns)

## DDL Schema

In [0]:
sales_schema_ddl ='''

Item_Identifier string,
Item_Weight Double,
Item_Fat_Content string,
Item_Visibility double,
Item_Type string,
Item_MRP double,
Outlet_Identifier string,
Outlet_Establishment_Year integer,
Outlet_Size string,
Outlet_Location_Type string,
Outlet_Type string,
Item_Outlet_Sales double

''' 

In [0]:
df_ddl = spark.read.format('csv').schema(sales_schema_ddl).option('header',True).load(file_path)


In [0]:
display(df_ddl.limit(10))


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


## struct type

In [0]:
display(df_json.columns)

In [0]:
myjson_schema = StructType(
    [
        StructField('code', StringType(), True),
        StructField('dob', StringType(), True),
        StructField('driverId', LongType(), True),
        StructField('driverRef', StringType(), True),
        StructField('name', StringType(), True),
        StructField('nationality', StringType(), True),
        StructField('number', StringType(), True),
        StructField('url', StringType(), True)
    ]
)

In [0]:
df_json_stru = spark.read.format('json').schema(myjson_schema).option('header',True).load(json_path)


In [0]:
display(df_json_stru.limit(10))

### select  

In [0]:
df_sel = df_ddl.select('Item_Identifier','Item_Weight','Item_Fat_Content')
display(df_sel.limit(10))


In [0]:
#using col for select

df = df_ddl.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content'))
display(df.limit(10))

In [0]:
from pyspark.sql.functions import col, sum
from pyspark.sql.types import DoubleType

df2 = df.withColumn(
    'Item_Weight_org',
    col('Item_Weight').cast(DoubleType())
).filter(
    col('Item_Weight_org') > 15
).groupBy(
    'Item_Fat_Content'
).agg(
    sum('Item_Weight_org').alias('Item_Weight_total')
).orderBy(
    'Item_Fat_Content'
)

display(df2)

### Alias

In [0]:
df = df_ddl.select(col('Item_Identifier').alias('Item_ID'))
display(df.limit(10))

filter/where  

- filter the data with fact content = regular
- slice the data with item type = soft drinks and wegight < 10
- fetch the data with tier in (tier1 or tier2) and outlet size is null

In [0]:
display(df_ddl.limit(10))

In [0]:
#senario 1
df_regular = df_ddl.filter(col('Item_Fat_Content') == 'Regular')
display(df_regular.limit(10))


In [0]:
df_senario2 = df_ddl.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10))

display(df_senario2.limit(10))

In [0]:
#senario 3
df_senario3 = df_ddl.filter( (col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin(['Tier 1','Tier 2'])))
display(df_senario3.limit(10))

withcolumnrename 

In [0]:
df_renamecol = df_ddl.withColumnRenamed('Item_Weight','Item_Weight_new')
display(df_renamecol.limit(10))

withcolumn

In [0]:
#create new col
df_withcol = df_ddl.withColumn('flag',lit('new'))
display(df_withcol.limit(10))

In [0]:
#withcol and transformation
df_withcol_mult = df_ddl.withColumn('multiply', col("Item_Weight") * col("Item_MRP"))
display(df_withcol_mult.limit(10))

In [0]:
from pyspark.sql.functions import col, regexp_replace

df_replace = (
    df_ddl.withColumn(
        'Item_Fat_Content',
        regexp_replace(col('Item_Fat_Content'), "Regular", "Reg")
    )
    .withColumn(
        'Item_Fat_Content',
        regexp_replace(col('Item_Fat_Content'), "Low Fat", "LF")
    )
)

display(df_replace.limit(10))

### type casting

In [0]:
df_typecast = df_ddl.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))
display(df_typecast.limit(10))

df_cast = df_ddl.withColumn('Item_visibility', col('Item_Visibility').cast('string'))
display(df_cast.limit(10))

### sort | orderby


In [0]:
#senario1
df = df_ddl.sort(col("Item_Weight").desc())
display(df.limit(10))



In [0]:
#senario2
df_sort = df_ddl.sort(col('Item_Visibility').asc())
display(df_sort.limit(10)) 

In [0]:
#senario3
"""sort the data bases on multiple column"""

df_sort_multi = df_ddl.sort(['Item_Weight','Item_Visibility'], ascending=[False,True])

display(df_sort_multi.limit(10))

### limit

In [0]:
df_limit = df_ddl.limit(10).display()

### drop

In [0]:
#senario1

df_drop = df_ddl.drop('Item_Visibility').limit(10).display()

In [0]:
#senario 2 multiplule drop columns
df_drop_multi = df_ddl.drop('Item_Visibility','Item_Type').limit(10).display()


### drop duplicates

In [0]:
df_count = df_ddl.count()
display(df_count)

#drop duplicates of entair table
df_drop_dup = df_ddl.dropDuplicates()
display(df_drop_dup)

In [0]:
#senario 2
#drop duplicates on perticluar colums

df_drop_subset = df_ddl.dropDuplicates(subset=['Item_Type'])
df_drop_subset.display()

In [0]:
#senario 3
#drop duplicates on perticluar colums
df_drop_distinct = df_ddl.distinct()
df_drop_distinct.display()

df_dist_col = df_ddl.select('Item_Type').distinct().display()

union and union byname

In [0]:
data1 = [
  (1,'alice'),
  (2,'bob'),
  (3,'charlie'),
  (4,'david'),
  (5,'eve')
]

data2 = [
  (6,'frank'),
  (7,'grace'),
  (8,'hannah'),
  (9,'ian'),
  (10,'judy')
]

schema1 = 'id int, name string'
schema2 = 'id int, name string'

df1 = spark.createDataFrame(data1,schema1)
df2 = spark.createDataFrame(data2,schema2)

display(df1)
display(df2)


In [0]:
#apply union betwen 2 dataframes
df_union = df1.union(df2)
display(df_union)

In [0]:
#unionbyname 
data3 = [
    ('bob',6),
    ('charlie',7),
    ('david',8),
    ('eve',9)
]

schema3 = 'name string, id int'
df3 = spark.createDataFrame(data3,schema3)
df3.display()

#unionbyname will give us if the order of the columns is interchanged
df_uni = df1.unionByName(df3).display()



### String function

In [0]:
#inicap
df_init = df_ddl.select(initcap(col('Item_Type')).alias('Item_Type'))
display(df_init.limit(5))

In [0]:
#lower and upper 
df_lower = df_ddl.select(lower(col('Item_Type')).alias('Item_Type_lower'))
display(df_lower.limit(5))

df_upper = df_ddl.select(upper(col('Item_Type')).alias('Item_Type_upper'))
display(df_upper.limit(5))

### date function

In [0]:
#current date 
df_currentdate = df_ddl.withColumn('current_date',current_date())
df_currentdate.limit(10).display()


date add()

In [0]:
df_weekafter = df_currentdate.withColumn('week_after',date_add(col('current_date'),7))
df_weekafter.limit(10).display()

datesub()

In [0]:
df_datesub = df_weekafter.withColumn('date_sub',date_sub(col('current_date'),7))
df_datesub.limit(10).display()

In [0]:
#using adddate 
df_weekbefore = df_datesub.withColumn('week_before',date_add(col('current_date'),-7))
df_weekbefore.limit(10).display()

date diff()

In [0]:
df_datediff = df_weekbefore.withColumn('date_diff',datediff('current_date','week_after'))
df_datediff.limit(10).display()

### date_format

In [0]:
df_dateformat = df_weekbefore.withColumn('date_format',date_format(col('week_before'),'MM-dd-yyyy'))
df_dateformat.limit(10).display()

### handling nulls

In [0]:
#drop nulls  drop using all drop the records with all in nulls 
df_org = df_dateformat

df_drop = df_org.dropna(how='all')
df_drop.display()


In [0]:

#drop null using (any) it will scan the data and drop the nulls from that df
df_drop_any = df_org.dropna(how='any')
df_drop_any.display()



In [0]:
#drop null using subset it will scan the data and drop the nulls from that df
df_drop_subset = df_dateformat.dropna(subset=['Outlet_Size']).display()

fillna

In [0]:
#fillna : replace the nulls with the value you want to replace it with 
#filling nulls 
df_fillna = df_dateformat.fillna("not available").display()

In [0]:
df_filnasub = df_dateformat.fillna('not avalible',subset=['Item_Weight','Item_Type']).display()

split and indexing

In [0]:
#split
df_split = df_ddl.withColumn('Outlet_Type_splill',split('Outlet_Type',' '))
df_split.limit(10).display()


In [0]:
from pyspark.sql.functions import col

df_index = df_split.withColumn(
    'Outlet_TypeIndex',
    col('Outlet_Type_splill')[1]
)
df_index.limit(10).display()

Explode

In [0]:
df_exp = df_ddl.withColumn('combine_outlet',split('outlet_Type',' '))
df_exp.limit(10).display()

In [0]:
df_explode = df_exp.withColumn('explode_outlet',explode('combine_outlet'))
df_explode.display()

Array contains

In [0]:
df_arrycon = df_explode.withColumn('Type_flag',array_contains('combine_outlet','Supermarket'))
df_arrycon.display()

GroupBY

In [0]:
#senario 1
#sum of item mrp on item type sales of each item type

df_sales_of_item = df_ddl.groupBy("Item_Type")\
    .agg(
        sum('Item_MRP').alias('sum_Item_MRP'),
        count("Item_Type").alias('count_Item_Type')
    )
df_sales_of_item.display(
    truncate=False
)


#find the avg 

In [0]:
df_avg = df_ddl.groupBy("Item_Type")\
  .agg(
    round(avg("Item_MRP"),3).alias("avg_Item_MRP"),
    count("Item_Type").alias("count_Item_Type")
  )
  
df_avg.display(truncate=True)

Grroup 2or more columns

In [0]:
df_group = df_ddl.groupBy('Item_Type','Outlet_Size')\
    .agg(
        sum('Item_MRP').alias('Total_MRP'),
        count('Item_Type').alias('count_Item_Type')
    )
df_group.display(truncate=False)

### Collect List

In [0]:
data = [
    ('user1','book1'),
    ('user1','book2'),
    ('user2','book3'),
    ('user3','book4'),
    ('user3','book5')
]

schema = 'user string , books string'

#create df

df = spark.createDataFrame(data,schema)

display(df)

In [0]:
#group the books 
df_colllist = df.groupBy('user').agg(collect_list('books').alias('books'))
df_colllist.display(truncate=True)

In [0]:
df_explo = df_colllist.withColumn('books',explode('books'))
df_explo.display(truncate=True)


pivot

In [0]:
#pivot the datset based on the itemtype and outletsize and do sum or avg of mrp

df_pivot = df_ddl.groupBy('Item_Type')\
  .pivot('Outlet_Size')\
  .agg(avg('Item_MRP'))


df_pivot.display()

when-otherwise

when we want to use contional statemnts use this whenotherwise it like a case statement

In [0]:
df_when = df_ddl.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df_when.display(truncate=False)
                            

In [0]:
df_condwhen = df_when.withColumn(
    'veg_exp_flag',when(((col('Veg_flag')=='Veg') & (col('Item_MRP')>100)),'Veg_Expensive')\
        .when(((col('Veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_inexpensive')
        .otherwise('Non-Veg')    
)

display(df_condwhen)

JOINS

- innerjoin
- left join
- right join
- full join
- ANTI JOIN

In [0]:
data_emp = [
  (1,"alice",'do1'),
  (2,"bob",'do2'),
  (3,"charlie",'do3'),
  (4,"david",'do3'),
  (5,"eve",'do2'),
  (6,"fred",'do5'),
  (7,"gary",'do6')
]

schema_emp = 'emp_id int,emp_name string,edept_id string'

data_dept = [
  ('do1','HR'),
  ('do2','Marketing'),
  ('do3','accounts'),
  ('do4','IT'),
  ('do5','Finace')
]

schema_dept = 'ddept_id string,dept_name string'


df_emp = spark.createDataFrame(data_emp,schema_emp)
df_dept = spark.createDataFrame(data_dept,schema_dept)

display(df_emp)
display(df_dept)


In [0]:
#innerjoin - it will match the same records from both the tables and return the matched records
df_innerjoin = df_emp.join(df_dept,df_emp['edept_id']==df_dept['ddept_id'],'inner')
df_innerjoin.display()

In [0]:
#leftouterjoin - it will match the same records from both the tables and return the matched records from left table and unmatched records from right table

df_leftjoin = df_emp.join(df_dept,df_emp['edept_id']==df_dept['ddept_id'],'leftouter')
df_leftjoin.display()


In [0]:
from pyspark.sql.functions import col, when, lit

df_rightjoin = df_emp.join(
    df_dept,
    df_emp['edept_id'] == df_dept['ddept_id'],
    'rightouter'
)
display(df_rightjoin)

df_lit_join = df_rightjoin.withColumn(
    'new_col',
    when(
        col('edept_id') == col('ddept_id'),
        lit('yes')
    ).otherwise(lit('no'))
)
display(df_lit_join)

In [0]:
#antijoin - it will match the same records from both the tables and return the unmatched records from left table

df_anti = df_emp.join(df_dept,df_emp['edept_id']==df_dept['ddept_id'],'anti')
df_anti.display()

window function

In [0]:
#row_number in window function 
from pyspark.sql.window import Window

df_rownum =df_ddl.withColumn('row_num',row_number().over(Window.orderBy(col('Item_Identifier').asc())))
df_rownum.display()

Rank and dense_rank

In [0]:
df_rank = df_ddl.withColumn('rank_column',rank().over(Window.orderBy(col('Item_Identifier').asc())))
df_rank.display()



In [0]:
#dense_rank

df_denserank = df_ddl.withColumn('dense_rank_col',dense_rank().over(Window.orderBy(col('Item_Identifier').asc())))
df_denserank.display()

cumulative sum

In [0]:
#cumulative sum row level cumsum 
df_cumsum = df_ddl.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy(col('Item_Identifier')).rowsBetween(Window.unboundedPreceding,Window.currentRow)))

df_cumsum.display()


In [0]:

df_cumsum_total = df_ddl.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy(col('Item_Identifier')).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)))

display(df_cumsum_total)

user defined functionsa UDFS

In [0]:
#step 1
#create a udf - python function

def my_fun(x):
    return x*x

#step 2 
#convert into pysparkfunction
my_udf = udf(my_fun)



df_itemsqu = df_ddl.withColumn('new_col',my_udf(col('Item_MRP')))
df_itemsqu.display()

data writing

In [0]:
#csv 

df_csv = df_ddl.write.format('csv').mode('overwrite').save('/Volumes/workspace/dataset_volumes/output_writes/csv_data.csv')

writing modes


In [0]:
#append mode
df_appmode = df_ddl.write.mode('append').format("csv").save("/Volumes/workspace/dataset_volumes/output_writes/csv_data")



In [0]:
#overwrite mode 

df_overwrite = df_ddl.write.mode('overwrite').format("csv")\
    .option('path','/Volumes/workspace/dataset_volumes/output_writes/overwrite')\
    .save()

In [0]:
#error

df_error = df_ddl.write.mode('error').format("csv")\
    .option('path','/Volumes/workspace/dataset_volumes/output_writes/error1')\
    .save()

"""
    PATH_ALREADY_EXISTS] Path dbfs:/Volumes/workspace/dataset_volumes/output_writes/error already exists. Set mode as overwrite to overwrite the existing path. SQLSTATE: 42K04 
"""

In [0]:
#ignore

df_ignore = df_ddl.write.mode('ignore').format("csv")\
    .option('path','/Volumes/workspace/dataset_volumes/output_writes/csv_data')\
    .save()

file formats

In [0]:
#parquet formats
df_parq = df_ddl.write.mode('overwrite').format("parquet")\
    .option('path','/Volumes/workspace/dataset_volumes/output_writes/parquet')\
    .save()




In [0]:
#delta format 
df_delta = df_ddl.write.mode('overwrite').format("delta")\
    .option('path','/Volumes/workspace/dataset_volumes/output_writes/delta')\
    .save()
    

In [0]:
#table format

df_table = df_ddl.write.mode('overwrite').format("delta")\
    .saveAsTable('workspace.dataset_volumes.Mytable')

pyspark sql

In [0]:
#create temp view

spark.catalog.dropTempView('mytemp_view')

df_ddl.createTempView('mytemp_view')


In [0]:
%sql
select * from mytemp_view
where Item_outlet_Sales > 2000


In [0]:
#write a data into dataframe and save 

df_sql = spark.sql("SELECT * FROM mytemp_view where Item_Type = 'Dairy' and item_fat_content = 'Low Fat'")

df_sql.write.mode('overwrite').format("parquet")\
    .option('path','/Volumes/workspace/dataset_volumes/output_writes/sql')\
    .save()


