In [0]:
from pyspark.sql.functions import *

### Data Reading

In [0]:
df=spark.read.format('csv').options(header=True, inferSchema=True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_json=spark.read.format('json')\
    .options(header=True, inferSchema=True, multiline=False)\
    .load('/FileStore/tables/drivers.json')

### DDL Schema

In [0]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



In [0]:
my_ddl_schema = '''
Item_Identifier STRING,
Item_Weight STRING,
Item_Fat_Content STRING, 
Item_Visibility DOUBLE,
Item_Type STRING,
Item_MRP DOUBLE,
Outlet_Identifier STRING,
Outlet_Establishment_Year INT,
Outlet_Size STRING,
Outlet_Location_Type STRING, 
Outlet_Type STRING,
Item_Outlet_Sales DOUBLE
'''

In [0]:
df=spark.read.format('csv').option('header', True).schema(my_ddl_schema).load('/FileStore/tables/BigMart_Sales.csv')

### StructType Schema

In [0]:
#to import StructType and StructField, rest all
from pyspark.sql.types import *


In [0]:
my_struct_schema = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', StringType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_Visibility', StringType(), True),
    StructField('Item_Type', StringType(), True),
    StructField('Item_MRP', StringType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', StringType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Outlet_Location_Type', StringType(), True),
    StructField('Outlet_Type', StringType(), True),
    StructField('Item_Outlet_Sales', StringType(), True)
])

In [0]:
df=spark.read.format('csv').option('header', True).schema(my_struct_schema).load('/FileStore/tables/BigMart_Sales.csv')

### PySpark Transformations

In [0]:
df=spark.read.format('csv').options(header=True, inferSchema=True).load('/FileStore/tables/BigMart_Sales.csv')

### Select

In [0]:
df_sel=df.select('item_identifier', 'item_weight', 'item_fat_content', '*')

In [0]:
df_sel=df.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content'))

### Alias

In [0]:
df_alias=df.select('Item_Weight').alias('Item_Wt')

In [0]:
df_alias=df.select(col('Item_Identifier').alias('Item_Id'), col('Item_Weight').alias('Item_Wt'))

### Filter

In [0]:
df_filter1=df.filter("item_fat_content='Regular'")
#df_filter1=df.filter(col('item_fat_content')=='Regular').display()

df_filter2=df.filter("item_type='Soft Drinks' and item_weight<10")

df_filter3=df.filter((col('outlet_size').isNull()) & (col('outlet_location_type').isin('Tier 1', 'Tier 2')))



### Rounding

In [0]:
df_round=df.select(round('Item_MRP', 2).alias('MRP_Round'))

### withColumnRenamed

In [0]:
df_wcr=df.withColumnRenamed('Item_Weight', 'Item_Wt')

### withColumn

In [0]:
df_wc1=df.withColumn('Flag_New', lit('New'))

df_wc2=df.withColumn('Multiply', col('Item_Weight') * col('Item_MRP'))



In [0]:
df_wc3=df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Regular', 'Reg'))\
    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Low Fat', 'LF'))

| Function                                  | Use                                                     |
| ----------------------------------------- | ------------------------------------------------------- |
| `withColumn(colName, expr)`               | Add/replace **one** column                              |
| `withColumns({col1: expr1, col2: expr2})` | Add/replace **multiple** columns at once (PySpark 3.4+) |


### Type Casting

In [0]:
df_cast=df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

### Sort

In [0]:
df_d=df.sort(col('Item_Weight').desc())
df_a=df.sort(col('Item_Visibility').asc())

In [0]:
df_dd=df.sort(['Item_Weight', 'Item_Visibility'], ascending=[0, 0])
df_da=df.sort(['Item_Weight', 'Item_Visibility'], ascending=[0, 1])

### Limit

In [0]:
df_limit=df.filter(col('Item_Weight').isNotNull()).sort(col('Item_Weight').asc()).limit(10)

### Drop

In [0]:
df_single_drop=df.drop(col('Item_Visibility'))
df_multi_drop=df.drop('Item_Weight', 'Item_Type')

### dropDuplicates/drop_duplicates/deDup

In [0]:
#Both are Same! Use any: when a complete row has the duplicate values
df_complete_row=df.drop_duplicates()
df_complete_row=df.dropDuplicates()

In [0]:
# if need to drop duplicates for specific a column or set of columns records
df_col_drop=df.dropDuplicates(subset=['Item_Type'])

In [0]:
df_dis=df.distinct()

### Union

In [0]:
data1=[(1, 'Arijeet'),
       (2, 'Himanshu')]
schema1='Id int, Name string'
df1=spark.createDataFrame(data1, schema1)

data2=[(3, 'Vivek'),
       (4, 'Chandan')]
schema2='Id int, Name string'
df2=spark.createDataFrame(data2, schema2)

In [0]:
df_union=df1.union(df2).display()

Id,Name
1,Arijeet
2,Himanshu
3,Vivek
4,Chandan


### unionByName

In [0]:
data1=[('Arijeet', 1),
       ('Himanshu', 2)]
schema1='Name string, Id int'
df1=spark.createDataFrame(data1, schema1) 

In [0]:
df3=df1.union(df2).display()

Name,Id
Arijeet,1
Himanshu,2
3,Vivek
4,Chandan


In [0]:
df4=df1.unionByName(df2).display()

Name,Id
Arijeet,1
Himanshu,2
Vivek,3
Chandan,4


## String Functions

### initcap

In [0]:
#converts the column data in Title format!
df_initcap=df.select(initcap('Item_Fat_Content'))

### lower & upper

In [0]:
df_lower=df.select(lower('Item_Type').alias('Lower_Item_Type'))

df_upper=df.select(upper('Item_Type').alias('Upper_Item_Type'))

## Date Functions

### current_date()-YYYY-MM-DD

In [0]:
df_curr_date=df.withColumn('curr_date', current_date())

### date_add

In [0]:
#It adds the day
df_day_add=df_curr_date.withColumn('Week_After', date_add('Curr_Date', 7))

### date_sub

In [0]:
#It subtracts the day
df_day_sub=df_curr_date.withColumn('Week_After', date_sub('curr_date', 7))

### DATEDIFF

In [0]:
#if need to find the gap/interval between two dates
df_datediff=df_day_sub.withColumn('DateDiff', datediff('curr_date', 'week_after'))

### date_format-'yyyy-MM-dd'/'dd-MM-yyyy'

In [0]:
df_date_format=df_datediff.withColumn('Formating_Date', date_format('Week_After', 'dd-MM-yyyy'))

### Handling Nulls

In [0]:
#drop only those records in which all columns contain null values
df_drop_null1=df.dropna('all')
#drop all those records in which any columns contain null values
df_drop_null2=df.dropna('any') #df.dropna(), if we don't pass it will treat as 'any' byDefault
#drop only those records in which column 'Outlet_Size' contain null values and rest columns have null just ignore
df_drop_null3=df.dropna(subset=['Outlet_Size'])

In [0]:
#applies to String datatype for other columns need to firstly change datatype to perform
df_fill_null1=df.fillna('N/A')

df_fill_null2=df.fillna('N/A', subset=['Outlet_Size', 'Item_Weight'])

### Split and Indexing

In [0]:
#returns as a list
df_split=df.withColumn('Outlet_Type', split('Outlet_Type', ' '))

#returns as a list
df_split=df.withColumn('Outlet_Type', split('Outlet_Type', ' ')[1])

### Explode

In [0]:
'''
Tier 1 ['SuperMarket', 'Type1']

Tier 1 SuperMarket
Tier 1 Type1
'''
#first break into the list and then separate the values of the list using explode
df_list=df.withColumn('Outlet_Type', split('Outlet_Type', ' '))
df_explode=df_list.withColumn('Outlet_Type', explode('Outlet_Type'))

### Array Contains

In [0]:
#it will check for that a value contains in a list or not and return true/false: need a list for this function
df_arr_contains=df_list.withColumn('Type1_Flag', array_contains('Outlet_Type', 'Type1'))

### Group By

In [0]:
df_g_sum=df.groupBy('Item_Type').agg(sum('Item_MRP'))

df_g_avg=df.groupBy('Item_Type').agg(avg('Item_MRP'))

df_g_multiple_col=df.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'))

df_g_multiple_agg_col=df.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'), avg('Item_MRP').alias('Avg_MRP'))

### Alternative of group concat: Collect_List

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_g_c=spark.createDataFrame(data, schema)

df_grp_c=df_g_c.groupBy('User').agg(collect_list('Book')).display()

User,collect_list(Book)
user1,"List(book1, book2)"
user2,"List(book2, book4)"
user3,List(book1)


### Pivot

In [0]:
df_pivot=df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).limit(3).display()

Item_Type,null,High,Medium,Small
Starchy Foods,140.48000465116277,158.15707368421053,148.4195041666666,150.2701736842105
Breads,139.04861666666667,133.75896,140.8610385542169,145.5236507042254
Baking Goods,126.66939891891889,129.20204383561642,126.1785684729064,125.21336363636368


### When-Otherwise

In [0]:
df_flag1 = df.withColumn(
    'veg_flag', when(col('item_type') == 'Meat', 'Non-Veg').otherwise('Veg')
)

df_flag2=df_flag1.withColumn(
    'food_expensivness',
    when(((col('veg_flag') == 'Veg') & (col('item_mrp') < 100)), 'Veg-Inexpensive')
    .when(((col('veg_flag') == 'Veg') & (col('item_mrp') > 100)), 'Veg-Expensive')
    .otherwise('Non-Veg'),
)

### Joins

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df_inner=df1.join(df2, on='dept_id', how='inner')
#df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').display()

In [0]:
df_left=df1.join(df2, on='dept_id', how='left')
#df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

In [0]:
df_right=df1.join(df2, on='dept_id', how='right')
#df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

In [0]:
#whenever we want to fetch the data from one df which is not available in second df
df_anti=df1.join(df2, on='dept_id', how='anti')
#df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

In [0]:
df_outer=df1.join(df2, on='dept_id', how='full')

### Window Functions

In [0]:
from pyspark.sql.window import Window

In [0]:
df_rowNum=df.withColumn('Row_Num', row_number().over(Window.orderBy('Item_Identifier')))

df_rank=df_rowNum.withColumn('Rank', rank().over(Window.orderBy(col('Item_Identifier').desc())))\
    .withColumn('Dense_Rank', dense_rank().over(Window.orderBy(col('Item_Identifier').desc())))

### Cumulative Sum

In [0]:
#sum the previous value and store it in the current row: similar to fibonacci series
df_cum_sum=df.withColumn('cumsum', sum('item_mrp').over(Window.orderBy('item_type').rowsBetween(Window.unboundedPreceding, Window.currentRow)))

df_cum_total_sum=df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))

### UDF (User Defined Function)

In [0]:
def my_fun(x):
  return x*x

In [0]:
my_udf=udf(my_fun)

df_sqrt=df.withColumn('Sqrt_Item_Price', my_udf('Item_MRP'))

### Data Writing

In [0]:
df.write.format('csv').save('/FileStore/tables/CSV/data.csv')

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2064302435562264>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43mcsv[39;49m[38;5;124;43m'[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msave[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43m/FileStore/tables/CSV/data.csv[39;49m[38;5;124;43m'[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;

In [0]:
#if a same file already exists still it will add another file
df.write.format('csv').mode('append').save('/FileStore/tables/CSV/data.csv')



In [0]:
#if a same file already exists it will delete that file and add this new file version
df.write.format('csv').mode('overwrite').save('/FileStore/tables/CSV/data.csv')



In [0]:
'''
#if a same file already exists it will throws an error if we try to add the same file
df.write.format('csv').mode('error').save('/FileStore/tables/CSV/data.csv')
'''



In [0]:
#if a same file already exists it will just ignore the error and add a new file
df.write.format('csv').mode('ignore').save('/FileStore/tables/CSV/data.csv')



In [0]:
df.write.format('parquet').mode('overwrite').save('/FileStore/tables/CSV/data.csv')



In [0]:
df.write.format('parquet').mode('overwrite').saveAsTable('my_table')



delta lake / delta file format

In [0]:
'''
it is built on top of parquet format, the real difference is in Parquet file format header/metadata will store at footer of the file
whereas in delta file format metadata will not be stored in the files! it will use a transaction/delta log which will actually hold all the information and metadata
'''



### Spark SQL

In [0]:
df.createOrReplaceTempView('my_view')



In [0]:
%sql
select * from my_view where Item_Fat_Content='LF' limit 3



In [0]:
df_spark_sql=spark.sql("select * from my_view where Item_Fat_Content='LF'")



In [0]:
df_spark_sql.display()

