In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### To check catalog data loans table

In [0]:
%sql
select * from customer.customer_db.loans

### convert sql to saprk

In [0]:
data = spark.sql("select * from customer.customer_db.loans")

display(data)

### schema check and total data count

In [0]:
data.printSchema()
data.count()


### to check if any null values in all columns

In [0]:
null_check = data.select([count(when(col(a).isNull(),a)).alias(a) for a in data.columns])

display(null_check)

### replace column null data to values

In [0]:
null_fill = data.fillna({"Income": 0, "Expenditure": 0})

display(null_fill)

### create schema and datas

In [0]:
schema = '''

ID int,
Name String,
Age int,
Gender String,
City String 

'''


data = [
    (1,"suresh",25,"Male","Bangalore"),
    (2,"kumar",30,"Male","Mumbai"),
    (3,"murugan",35,"Male","Chennai"),
    (4,"Priya",28,"Female","Delhi")
    ]

df = spark.createDataFrame(data, schema)
display(df)


In [0]:
df.printSchema()
df.count()

### To check catalog data bigmart sales table

In [0]:
%sql
select * from customer.customer_db.big_mart_sales

### convert sql to spark

In [0]:
mart_data = spark.sql("select * from customer.customer_db.big_mart_sales")

display(mart_data)


### Schema and count

In [0]:
mart_data.printSchema()
mart_data.count()

In [0]:
check_null = mart_data.select([count(when(col(a).isNull(),a)).alias(a) for a in mart_data.columns])

display(check_null)

In [0]:
fill_null = mart_data.fillna({"Item_Weight": 0, "Outlet_Size": "No size"})

display(fill_null)

### TRANSFORMATIONS


### SELECT

In [0]:
df = fill_null.select(col("Item_Identifier"),col("Item_Weight"),col("Item_Fat_Content"))

display(df)

### ALIAS

In [0]:
df = fill_null.select(col("Item_Identifier").alias("Items"),col("Item_Weight").alias("Weight"))
display(df)

### FILTER


In [0]:
df = fill_null.filter(col("Item_fat_content") == "Low Fat")
display(df)


In [0]:
df = fill_null.filter((col("Item_Type") == "Dairy") & (col("Outlet_Size") == "Small"))


display(df)


In [0]:
df = fill_null.filter(col("Item_Identifier").isNull())

display(df)

In [0]:
df = fill_null.filter(col("Outlet_Size").isin("Small","Medium"))

display(df)

### withColumnRenamed


In [0]:
df = fill_null.withColumnRenamed("Item_Weight","Weight")

df = fill_null.withColumnRenamed("Item_Weight","Weight").withColumnRenamed("Item_Fat_Content","Fat")


display(df)


### withColumn


In [0]:
df = fill_null.withColumn("new_column",lit("new_values"))

display(df)


In [0]:
df = fill_null.withColumn('Item_fat_content',regexp_replace('Item_fat_content','Low Fat','LF'))

display(df)

### Type Casting


In [0]:
df = fill_null.withColumn('Item_Visibility',col('Item_Visibility').cast('string'))

display(df)


### sort

In [0]:
df = fill_null.sort(col("Item_Weight").desc())

display(df)

In [0]:
df = fill_null.sort(col("Item_MRP").asc())

display(df)



In [0]:
df = fill_null.sort(['Item_Weight','Item_MRP'],ascending=[1,0])

display(df)

### Limit

In [0]:
df = fill_null.limit(10)

display(df)

### DROP

In [0]:
# df = fill_null.drop("Item_Identifier")

df = fill_null.drop("Item_Identifier","Item_Weight")

display(df)

In [0]:
df = fill_null.dropDuplicates()

display(df)

In [0]:
df = fill_null.dropDuplicates(['Item_Weight'])

display(df)

In [0]:
df = fill_null.distinct()

display(df)


### Union

In [0]:
schema1 = 'Id int, Name string'

data1 = [(1,'James'),(2,'Michael'),(3,'Robert')]

df1 = spark.createDataFrame(data1,schema1)

display(df1)


schema2 = 'Id int, Name string'

data2 = [(1,'James'),(2,'Michael'),(3,'Robert'),(4,'Robert')]

df2 = spark.createDataFrame(data2,schema2)

display(df2)



In [0]:
df1.union(df2).display()



In [0]:
schema1 = 'Id int, Name string'

data1 = [(1,'James'),(2,'Michael'),(3,'Robert')]

df1 = spark.createDataFrame(data1,schema1)

display(df1)


schema2 = 'Name string,Id int'

data2 = [('James',1),('Michael',2),('Robert',3)]
df2 = spark.createDataFrame(data2,schema2)

display(df2)



In [0]:
df1.unionByName(df2).display()

### String Functions


### UPPERCASE

In [0]:
df = fill_null.select(upper('Item_Type').alias('Item_Type_upper'))

display(df)

### lowercase

In [0]:
df = fill_null.select(lower('Item_Fat_Content').alias('Item_Fat_Content_lower'))

display(df)

### concat

In [0]:
df = fill_null.select(concat('Item_Fat_Content',lit('-'),'Item_Type').alias('Item_Fat_Content_Item_Type'))

display(df)

### substring

In [0]:
from pyspark.sql.functions import substr

df1 = fill_null.select(col('Item_Fat_Content'), substring(col('Item_Fat_Content'), 1, 3)).alias('Item')
display(df1)

### Date Functions


### Current_Date


In [0]:
df1 = fill_null.withColumn('curr_dates',current_date())

display(df1)

### Date_Add()


In [0]:
df1.printSchema()

In [0]:
df2 = df1.withColumn('one_week_after',date_add('curr_dates',7))

display(df2)

### Date_Sub

In [0]:
df3 = df2.withColumn('one_week_before',date_sub('curr_dates',7))

display(df3)

### DateDIFF


In [0]:
df4 = df3.withColumn('date_difference',datediff('one_week_after','curr_dates'))

display(df4)

### Date_Format()


In [0]:
df5 = df4.withColumn('one_week_after', date_format('one_week_after','dd-MM-yyyy'))

display(df5)

### SPLIT

In [0]:
df1 = fill_null.withColumn('Outlet_Type',split('Outlet_Type',' '))

display(df1)


### Indexing

In [0]:
df = fill_null.withColumn('Outlet_Type',split('Outlet_Type',' ')[1])

display(df)

### Explode

In [0]:
df2 = df1.withColumn('Outlet_Type',explode('Outlet_Type'))

display(df2)

df2.count()

In [0]:
df3 = df2.withColumn('Type1_flag', col('Outlet_Type').contains('Type1'))
display(df3)

### GroupBY


In [0]:
display(fill_null)

In [0]:
df = fill_null.groupBy('Item_Fat_content').agg(avg('Item_MRP').alias('avg_MRP'))

display(df)


In [0]:
df = fill_null.groupBy('Item_Type').agg(sum('Item_MRP').alias('sum_MRP'))

display(df)

In [0]:
df = fill_null.groupBy('Item_Fat_Content','Item_Type').agg(sum('Item_MRP').alias('sum_MRP'))

display(df)

In [0]:
df = fill_null.groupBy('Item_Fat_Content','Item_Type').agg(sum('Item_MRP'),avg('Item_MRP'))

display(df)

### Collect_List

In [0]:
schema_new = 'Users string,Books string'

data_new = [
    ("user1","book1"),
    ("user2","book5"),
    ("user1","book3"),
    ("user3","book4"),
    ("user2","book1"),
    ("user2","book2")
]

df_new = spark.createDataFrame(data_new,schema_new)

display(df_new)

In [0]:
df = df_new.groupBy('Users').agg(collect_list('Books').alias('Books'))

display(df)


### When-Otherwise


In [0]:
display(fill_null)


In [0]:
df = fill_null.withColumn('Item_Types',when(col('Item_Type') == "Meat","Non_veg").otherwise("Veg"))

display(df)


In [0]:
df = fill_null.withColumn('Outlet_Sizes',when(col('Outlet_Size') == "High","Big").otherwise("Normal"))

display(df)

### JOINS


In [0]:
sch1 = 'id int,name string,dept_id string'

dat1 = [
    (1,"suresh","d01"),
    (2,"rajesh","d02"),
    (3,"kumar","d03"),
    (4,"murugan","d03"),
    (5,"hari","d04"),
    (6,"ram","d05"),
    (7,"ganesh","d06")]

df1 = spark.createDataFrame(dat1,sch1)

display(df1)


    

In [0]:

sch2 = 'dept_id string,dept string'

dat2 = [
    ("d01","HR"),
    ("d02","IT"),
    ("d03","FINANCE"),
    ("d04","MARKETING"),
    ("d05","SALES"),
    ("d07","ADMIN")]

df2 = spark.createDataFrame(dat2,sch2)


display(df2)

In [0]:
df1.join(df2,df1['dept_id'] == df2['dept_id'],'inner').display()

In [0]:
df1.join(df2,df1['dept_id'] == df2['dept_id'],'left').display()

In [0]:
df1.join(df2,df1['dept_id'] == df2['dept_id'],'right').display()


In [0]:
df1.join(df2,df1['dept_id'] == df2['dept_id'],'anti').display()


### WINDOW FUNCTIONS


### ROW_NUMBER()


In [0]:
display(fill_null)

In [0]:
from pyspark.sql.window import Window


In [0]:
df = fill_null.withColumn('row no',row_number().over(Window.orderBy('Item_Identifier')))

display(df)

### RANK

In [0]:
df = fill_null.withColumn('ranks',rank().over(Window.orderBy(col('Item_Identifier').desc())))
                          

display(df)

### DENSE RANK

In [0]:
df = fill_null.withColumn('dense',dense_rank().over(Window.orderBy(col('Item_Identifier').desc())))

display(df)


### Cumulative Sum


In [0]:
df = fill_null.withColumn('sums',sum('Item_MRP').over(Window.orderBy('Item_Type')))

display(df)

In [0]:
df = fill_null.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow)))

display(df)


In [0]:
df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()
