###Data Reading

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.show()

In [0]:
df.display()

###Reading Json files

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
df_json = spark.read.format('json').option('inferschema', True).option('header', True).load('/FileStore/tables/drivers.json')

In [0]:
df_json.display()

In [0]:
df_json_2 = spark.read.format('json').option('inferSchema',True).option('header',True)\
                                        .option('multiline',False).load('/FileStore/tables/drivers.json')

In [0]:
df_json_2.display()

In [0]:
df.schema

In [0]:
df.printSchema()

In [0]:
my_ddl_schema = '''
Item_Identifier string,
Item_Weight string,
Item_Fat_Content string,
Item_Visibility double,
Item_Type string,
Item_MRP double,
Outlet_Identifier string,
Outlet_Establishment_Year integer,
Outlet_Size string,
Outlet_Location_Type string,
Outlet_Type string,
Item_Outlet_Sales double
'''

In [0]:
df_schema_change = spark.read.format('csv')\
                        .schema(my_ddl_schema)\
                            .option('header',True)\
                                .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_schema_change.display()

In [0]:
df_schema_change.printSchema()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:

'''
my_struct_schema = StructType([
    StructField('Item_Identifier', StringType(),True),
    StructField('Item_Weight', StringType(), True)
])


df3 = spark.read.format('csv').schema(my_struct_schema).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')
'''


In [0]:
df.select('Item_Identifier','Item_Weight').display()

In [0]:
df.select(col('Item_Weight'), col('Item_Identifier')).display()

In [0]:
df.select(col('Item_Identifier').alias('new_id')).display()

In [0]:
df.display()

###Filters

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

In [0]:
df.filter((col('Item_Weight') < 10) & (col('Item_Type') == 'Soft Drinks')).display()

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

In [0]:
df.display()

In [0]:
df = df.withColumn('Flag', lit('new_val'))

In [0]:
df.display()

In [0]:
df.withColumn('multiply', col('Item_Weight')*col('Item_MRP')).display()

In [0]:
df.display()

In [0]:
df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Low Fat', 'Lf'))\
                    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), 'Regular', 'Reg')).display()

In [0]:
df.printSchema()

In [0]:
df.withColumn('Item_Weight', col('Item_Weight').cast(StringType())).display()

In [0]:
df.display()

In [0]:
df.sort(col('Item_Weight')).display()

In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col('Item_Weight').asc()).display()

In [0]:
df.sort?


In [0]:
df.sort(['Item_Weight', 'Item_Visibility'], ascending = [0,1]).display()

###Limit

In [0]:
df.limit(10).display()

In [0]:
df.drop(col('Item_Weight')).display()

In [0]:
df.display()

In [0]:
df.drop('Item_Weight','Item_Fat_Content').display()

In [0]:
df.limit(5).display()

DropDuplicates

In [0]:
df.describe().display()

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=['Item_Type']).display()

In [0]:
df.drop_duplicates(subset=['Item_Fat_Content','Item_Type']).display()

In [0]:
df.distinct().display()

In [0]:
df.distinct().count()

In [0]:
df.dropDuplicates(subset=['Item_Type']).count()

In [0]:
df.dropDuplicates(subset=['Item_Type']).limit(50).display()

In [0]:
data_1 = [
    ('1','kad'),
    ('2','sid')
]

schema_1 = '''
            id STRING,
            name STRING
'''

df_11 = spark.createDataFrame(data=data_1, schema=schema_1)

df_11.display()

In [0]:
data_2  = [
    ('3', 'rahul'),
    ('4','jas')
]

schema_2 = '''
            id STRING,
            name STRING
'''

df_22 = spark.createDataFrame(data_2,schema_2)

df_22.display()

In [0]:
df_11.union(df_22).display()

In [0]:
data_3  = [
    ('3', 'rahul23'),
    ('4','jas')
]

schema_3 = '''
            roll_number STRING,
            name STRING
'''

df_23 = spark.createDataFrame(data_3,schema_3)

df_23.display()

In [0]:
df_11.union(df_23).display()

In [0]:
data_4  = [
    ('3', 'rahul24','py'),
    ('4','jas','json')
]

schema_4 = '''
            roll_number STRING,
            name STRING,
            sub STRING
'''

df_24 = spark.createDataFrame(data_4,schema_4)

df_24.display()

In [0]:
df_23.union(df_11).display()

df_24.union(df_11).display() ------- Error

In [0]:
data_5  = [
    ('rahul','3'),
    ('jas','4')
]

schema_5 = '''
            name STRING,
            id STRING
'''

df_25 = spark.createDataFrame(data_5,schema_5)

df_25.display()

In [0]:
df_11.union(df_25).display()

In [0]:
df_11.unionByName(df_25).display()

###String Functions

In [0]:
df.limit(5).display()

In [0]:
df.select(initcap('Item_Type')).display()

In [0]:
df.select(lower('Item_Type')).display()

In [0]:
df.filter(lower('Item_Type')=='meat').display()

In [0]:
df.select(upper('Item_Type').alias('new_upper_Item_type')).display()

DATE_FUNCTIONS

In [0]:
df = df.withColumn('curr_dt',current_date())
df.display()

In [0]:
df = df.withColumn('next_week', date_add('curr_dt',7))

In [0]:
df.display()

In [0]:
df.withColumn('last_week', date_sub('curr_dt',7)).display()

In [0]:
df = df.withColumn('last_week', date_add('curr_dt', -7))
df.display()

In [0]:
df = df.withColumn('datediff', datediff('curr_dt', 'next_week'))
df.display()

In [0]:
df.withColumn('date_format', date_format('last_week','dd-MM-yyyy' )).display()

In [0]:
df.withColumn('date_format_2', date_format('last_week','MM-dd-yyyy' )).display()

In [0]:
df.dropna().display()

In [0]:
df.display()

In [0]:
df.dropna('all').display()

In [0]:
df.dropna(subset=['Item_Weight']).display()

In [0]:
df.fillna('Not_Available').display()

In [0]:
df.fillna('Not_available', subset=['Item_Weight']).display()

In [0]:
df_fillna = df.fillna(11111111, subset=['Item_Weight'])
df_fillna.display()

In [0]:
df_fillna.filter(col('Item_Weight') == 11111111).display()

In [0]:
df_fillna.filter(lower(col('Item_Fat_Content')) == 'regular').display()

In [0]:
df.withColumn('splited_item_type', split('Item_Type',' ')).display()

In [0]:
df.withColumn('item_split_col', split('Item_Type', ' ')).withColumn('ind_split_col', split('Item_Type', ' ')[1]).display()

In [0]:
df.withColumn('split_col', split('Item_Type', ' ')).withColumn('explode_col', explode('split_col')).display()

In [0]:
df_split = df.withColumn('split_col', split('Item_Type', ' '))
df_split.display()

In [0]:
df_split.withColumn('arr_flay', array_contains('split_col', 'Drinks')).display()

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
df.groupBy(['Item_Type','Outlet_Size']).agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'), avg('Item_MRP').alias('Avg_MRP')).display()

In [0]:
df_collect = df.groupBy('Item_Type').agg(collect_list('Outlet_Size').alias('collect_col'))

In [0]:
df_collect.withColumn('col_len', size(col('collect_col'))).display()