#CSV Data reading

In [0]:
dbutils.fs.ls('/Volumes/workspace/default/datareading/')

In [0]:
df = spark.read.format("csv").option("inferSchema",True).option("header",True).load('/Volumes/workspace/default/datareading/BigMart Sales.csv')

In [0]:
df.display()

#JSON datareading

In [0]:
df1 = spark.read.format("json").option("inferSchema",True)\
                                .option("header",True)\
                                .option("multiline",False)\
                                .load('/Volumes/workspace/default/datareading/drivers.json')

In [0]:
df1.display()

#Schema Defination

In [0]:
df.printSchema()

In [0]:
df1.printSchema()

In [0]:
my_ddl_schema ='''Item_Identifier string ,
                    Item_Weight string,
                    Item_Fat_Content string ,
                    Item_Visibility double ,
                    Item_Type string ,
                    Item_MRP double ,
                    Outlet_Identifier string ,
                    Outlet_Establishment_Year integer ,
                    Outlet_Size string ,
                    Outlet_Location_Type string ,
                    Outlet_Type string ,
                    Item_Outlet_Sales double'''

In [0]:
df_struct = spark.read.format('csv')\
                .schema(my_ddl_schema)\
                .option('header',True)\
                .load('/Volumes/workspace/default/datareading/BigMart Sales.csv')\
                .display()


#structType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType([
    StructField('Item_Identifier',StringType(),True),
    StructField('Item_Weight',DoubleType(),True),
    StructField('Item_Fat_Content',StringType(),True)
])

In [0]:
df2 = spark.read.format('csv')\
                .schema(my_struct_schema)\
                .option('header',True)\
                .load('/Volumes/workspace/default/datareading/BigMart Sales.csv')\
                .display()


#Select

In [0]:
df_select = df.select('Item_Weight','Item_Visibility','Item_MRP','Outlet_Establishment_Year','Item_Outlet_Sales')
df_select.display()

In [0]:
df_select = df.select(col('Item_Weight').alias('weight_of_the_item') ,'Item_Visibility','Item_MRP','Outlet_Establishment_Year','Item_Outlet_Sales')
df_select.display()

#Filter

In [0]:
df.filter(col('Item_Weight')<=20).display()

In [0]:
df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight')<10)).display()

In [0]:
df.filter((col('Outlet_Size').isNull()) & ~(col('Outlet_Location_Type')).isin('Tier 3'))

df.filter((col('Outlet_Size').isNull()) & ((col('Outlet_Location_Type')).isin('Tier 1', 'Tier 2')) & ~(col('Outlet_Location_Type')).isin('Tier 3')).display()

#withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt')
df.display()

#withColumn

In [0]:
df.withColumn('flag',lit('new'))

df.withColumn('multiply', col('Item_MRP')*col('Item_Weight')).display()

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Regular','Reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','LF'))\
    .display()

#Type Casting

In [0]:
# df = df.withColumn('Item_Weight', col('Item_Weight').cast('string'))
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))
df.printSchema()

#Sort

In [0]:
# df.sort(col('Item_Weight')).display()
# df.sort(col('Item_Weight').asc()).display()
# df.sort(col('Item_Weight').desc(), col('Item_MRP').desc() ).display()
df.sort(['Item_Weight', 'Item_MRP'], ascending = [0,1] ).display()
# df.sort(col('Item_Weight').desc())\
#     .sort(col('Item_MRP').desc()).display()

#Limit

In [0]:
df.limit(10).display()

#Drop

In [0]:
df.drop('Item_Visibility','Item_MRP').display()

#Drop Duplicates

In [0]:
# df.dropDuplicates().display()
df.dropDuplicates(subset = ['Item_Type','Item_Fat_Content']).display()

#Distinct

In [0]:
df.distinct().count()

#Count

In [0]:
df.count()

#Union

In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.union(df2).display()

In [0]:
#for jumbles schema/columns
df1.unionByName(df2).display()


#String functions

In [0]:
# df.select(initcap('Item_Type')).display()
# df.select(lower('Item_Type')).display()
df.select(upper('Item_Type').alias('Item_Type')).display()

#Date Functions

In [0]:
df = df.withColumn('curr_date', current_date())
df.display()

In [0]:
# df.withColumn('week_after', date_add('curr_date', 7)).display()
# df.withColumn('week_before', date_sub('curr_date', 7)).display()
df = df.withColumn('week_before', date_add('curr_date', -7)).display()

In [0]:
df = df.withColumn(col('date_difference'), datediff('curr_date','week_before'))
df.display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-8240497651175401>, line 1[0m
[0;32m----> 1[0m df [38;5;241m=[39m df[38;5;241m.[39mwithColumn(col([38;5;124m'[39m[38;5;124mdate_difference[39m[38;5;124m'[39m), datediff([38;5;124m'[39m[38;5;124mcurr_date[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mweek_before[39m[38;5;124m'[39m))
[1;32m      2[0m df[38;5;241m.[39mdisplay()

[0;31mAttributeError[0m: 'NoneType' object has no attribute 'withColumn'

In [0]:
df.withColumn('chnage format week', date_format('curr_date','yyyy-MM-dd')).display()

#Handle Nulls

In [0]:
df.display()

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna('any', subset=['Outlet_Size']).display()

In [0]:
df.fillna('NA').display()
# df.fullna("NA", subset=['Outlet_Size']).display()

#Split and Indexing

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')).display()

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')[0]).display()

#Explode

In [0]:
df_explode = df.withColumn('Outlet_Type', split('Outlet_Type', ' '))
df_explode.display()
# df_explode.withColumn('Outlet_Type', explode('Outlet_Type')).display()

#Array Contains

In [0]:
df_explode.withColumn('Type1_Flag',array_contains('Outlet_Type', 'Type1')).display()

#Group by

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type').agg(avg("Item_MRP").alias("average")).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Sum'), avg("Item_MRP").alias("avg")).display()

#collect_List

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

#Pivot

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

#When otherwise

In [0]:
df_veg = df.withColumn('veg_flag', when(col('Item_Type')=='Meat', 'Non-veg').otherwise('Veg')).display()

In [0]:
df_veg.withColumn('veg_flag',when(((col('veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display() 

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-4755155056066264>, line 1[0m
[0;32m----> 1[0m df_veg[38;5;241m.[39mwithColumn([38;5;124m'[39m[38;5;124mveg_flag[39m[38;5;124m'[39m,when(((col([38;5;124m'[39m[38;5;124mveg_flag[39m[38;5;124m'[39m)[38;5;241m==[39m[38;5;124m'[39m[38;5;124mVeg[39m[38;5;124m'[39m) [38;5;241m&[39m (col([38;5;124m'[39m[38;5;124mItem_MRP[39m[38;5;124m'[39m)[38;5;241m<[39m[38;5;241m100[39m)),[38;5;124m'[39m[38;5;124mVeg_Inexpensive[39m[38;5;124m'[39m)\
[1;32m      2[0m                             [38;5;241m.[39mwhen((col([38;5;124m'[39m[38;5;124mveg_flag[39m[38;5;124m'[39m)[38;5;241m==[39m[38;5;124m'[39m[38;5;124mVeg[39m[38;5;124m'[39m) [38;5;241m&[39m (col([38;5;124m'[39m[38;5;124mItem_MRP[39m[38;5;124m'[39m)[38;5;241m>[39m[38;5;241m100[39m),[38;

#JOIN

##Inner Join

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').display()

##Left join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'left').display()

##Anti join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'anti').display()

#Window Functions

In [0]:
from pyspark.sql.window import *

##Row Number

In [0]:
df.withColumn('rowCol', row_number().over(Window.orderBy('Item_identifier'))).display()

##Rank

In [0]:
df.withColumn('rank', rank().over(Window.orderBy(col('Item_identifier').desc())))\
            .withColumn('dense_rank', dense_rank().over(Window.orderBy(col('Item_Identifier').desc())))\
            .display()

##Cumalative Sum

In [0]:
# df.withColumn('cumsum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()
df.withColumn('cumsum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

#User Defined Functions

In [0]:
def my_func(x):
    return x**2

my_udf = udf(my_func)

In [0]:
df.withColumn('mynewcol', my_udf('Item_MRP')).display()

#Data Writing

In [0]:
df.write.format('csv').save('/Volumes/workspace/default/datareading/data.csv')

In [0]:
# df.write.format('csv').mode('error').save('/Volumes/workspace/default/datareading/data.csv')
df.write.format('csv').mode('append').save('/Volumes/workspace/default/datareading/data.csv')
# df.write.format('csv').mode('overwrite').save('/Volumes/workspace/default/datareading/data.csv')
# df.write.format('csv').mode('ignore').save('/Volumes/workspace/default/datareading/data.csv')

#Parquet

In [0]:
df.write.format('parquet').mode('append').save('/Volumes/workspace/default/datareading/data.csv')


#Table

In [0]:
df.write.format('delta').mode('overwrite').saveAsTable('my_table')

#Managed VS External Tables

#####External table are managed by the user if the data is dropped the schema of the table is dropped not he actal data and in the managed table the table is managed by the AZURE or Hive and when the table is dropped the data is also dropped by the AZURE and Hive

In [0]:
df.display()

#Spark SQL

In [0]:
df.createTempView('my_view')

In [0]:
%sql
select * from my_view;

In [0]:
df_sql = spark.sql('select * from my_view where Item_weight= 10')
df_sql.display()