Reading a csv

In [0]:
df_csv = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_csv.display()

In [0]:
df_csv.printSchema()

In [0]:
df_json= spark.read.format('json').option('header', True).option('inferSchema', True).option('multiLine',False).load('/FileStore/tables/drivers.json')

In [0]:
df_json.display()

**SCHEMA**

In [0]:
my_schema = '''
                Item_Identifier STRING,
                Item_Weight STRING,
                Item_Fat_Content STRING, 
                Item_Visibility DOUBLE,
                Item_Type STRING,
                Item_MRP DOUBLE,
                Outlet_Identifier STRING,
                Outlet_Establishment_Year INT,
                Outlet_Size STRING,
                Outlet_Location_Type STRING, 
                Outlet_Type STRING,
                Item_Outlet_Sales DOUBLE 
            '''

In [0]:
df_schema= spark.read.format('csv').schema(my_schema).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_schema.display()

In [0]:
df_schema.printSchema()

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *  
my_strct_schema = StructType([ StructField('Item_Identifier',StringType(),True), StructField('Item_Weight',StringType(),True), StructField('Item_Fat_Content',StringType(),True), StructField('Item_Visibility',StringType(),True), StructField('Item_MRP',StringType(),True), StructField('Outlet_Identifier',StringType(),True), StructField('Outlet_Establishment_Year',StringType(),True), StructField('Outlet_Size',StringType(),True), StructField('Outlet_Location_Type',StringType(),True), StructField('Outlet_Type',StringType(),True), StructField('Item_Outlet_Sales',StringType(),True)

])

df = spark.read.format('csv')\
.schema(my_schema)\
.option('header',True)\
.load('/FileStore/tables/BigMart_Sales.csv')

In [0]:

df.printSchema()

# TRANSFORMATIONS


## SELECT

In [0]:
df_csv.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

## LIMIT DISPLAY

## ALIAS

In [0]:
df_csv.select(col('Item_Identifier').alias('Item_ID')).display()

## FILTER

In [0]:

df_csv.filter(col('Item_Fat_Content')=='Regular').display()

In [0]:

df_csv.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight')<10)).display() 

In [0]:
df_csv.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).select(col('Outlet_Size'), col('Outlet_Location_Type')).display()

In [0]:

df_csv.withColumnRenamed('Item_Weight','Item_Wt').display()

## withColumn

In [0]:

df = df_csv.withColumn('flag',lit("new")).display()

In [0]:
df_csv.withColumn('multiply',col('Item_Weight')*col('Item_MRP')).display()

In [0]:
df = df_csv.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf")).display()

TYPECASTING

In [0]:

df = df_csv.withColumn('Item_Weight', col('Item_Weight').cast(StringType())).printSchema()

## SORT

In [0]:
df_csv.sort(col('Item_Weight').desc()).display()

In [0]:

df_csv.sort(col('Item_Visibility').asc()).display()

In [0]:

df_csv.sort(['Item_Weight','Item_Visibility'],ascending = [0,0]).display()

In [0]:
df_csv.sort(['Item_weight','Item_Visibility'], ascending = [0,1]).display()

## DROP

In [0]:
df_csv.drop('Item_Visibility','Item_Type').display()

In [0]:
df_csv.dropDuplicates().display()


In [0]:
df_csv.drop_duplicates(subset=['Item_Type']).display()


In [0]:
df_csv.distinct().display()


UNION and UNION BY NAME
## # 

In [0]:

data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)

df1.display()
df2.display()

In [0]:

df1.union(df2).display()

In [0]:

data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)

df1.display()

In [0]:
df1.union(df2).display()


In [0]:
df1.unionByName(df2).display()


String Functions
# 

In [0]:
from pyspark.sql.functions import upper

df= df_csv.select(upper('Outlet_Type').alias('upper_Item_Type'))

df.display()


## Date Functions

In [0]:
from pyspark.sql.functions import *

df = df_csv.withColumn('curr_date',current_date())
df.display()

In [0]:
from pyspark.sql.functions import *

df = df.withColumn('week_after',date_add('curr_date',7))
df.display()

In [0]:
df.withColumn('week_before',date_sub('curr_date',7)).display()


In [0]:
 df.withColumn('week_before',date_add('curr_date',-7)).display()

In [0]:
df = df.withColumn('datediff',datediff('week_after','curr_date'))

df.display()

In [0]:
df= df.withColumn('week_before', date_sub('curr_date', -7))
df = df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))

df.display()

## Handling Nulls


In [0]:
df.dropna('all').display()


In [0]:
df.dropna('any').display()


In [0]:
df.dropna(subset=['Outlet_Size']).display()


## FILLING NULLS

In [0]:
df.fillna('NotAvailable').display()


In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()


## SPLIT INDEXING

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()


In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()


In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

In [0]:
dx = df.withColumn('split_Item_Type', explode(split('Item_Type', ' '))).display()

In [0]:
dx = dx.withColumn('Outlet_Type' , split('Outlet_Type', ' ')).display()



In [0]:
df.display()


## GROUPBY

In [0]:
df_csv.display()

from pyspark.sql.functions import *
df = df_csv.withColumn("Item_MRP_string", col("Item_MRP").cast("string"))
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()



In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()


In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()


In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()


## PIVOT

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()


## COLLECTLIST

In [0]:

data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()


## When-Otherwise



In [0]:
df = df_csv.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg')).display()


In [0]:
df_csv = df_csv.withColumn('veg_flag',when(col('Item_Type')=='Fruit','Green').otherwise('Veg')).display()


In [0]:
df_csv = df_csv.withColumn('veg_exp_flag',when(((col('veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display() 