# **Data Reading**

In [0]:
df = spark.read.format("csv")\
    .options(header="true", inferSchema="true")\
    .load("/FileStore/tables/BigMart_Sales.csv")
display(df)

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
df.display()

In [0]:
dbutils.fs.ls("/FileStore/tables")

# **Reading Json File
**

In [0]:
df = spark.read.format("Json")\
    .options(header="true")\
    .options(inferSchema="true")\
    .options(multiline="False")\
    .load("/FileStore/tables/titanic.json")


In [0]:
df.display()

In [0]:
df.printSchema()
df.show()

In [0]:
df_json = spark.read.format("Json")\
    .options(header="true")\
    .options(inferSchema="true")\
    .options(multiline="False")\
    .load("/FileStore/tables/titanic.json")

In [0]:
df_json.printSchema()

# Select


In [0]:
df.select('Age','Cabin','Embarked').display()

In [0]:
from pyspark.sql.functions import col

In [0]:
df.select(col('Age'),col('Cabin'),col('Embarked')).display()


# Alias

In [0]:
df.filter(col('Age').alias('Age_1')).display()

In [0]:
df.display()

# Filters

In [0]:
df.filter(col('Embarked') == 'S').display()

In [0]:
df.filter(col('Embarked') == 'S') & df.filter(col('Age') > 30).display()



In [0]:
df.display()

In [0]:
df.filter(col('Embarked') == 'S').filter(col('Age').isNull()).display()

In [0]:
df.display()

In [0]:
df.withColumnRenamed('Age','Age_1').display()


# withColumn

In [0]:
df.display()

In [0]:
from pyspark.sql.functions import regexp_replace

In [0]:
df.withColumn('Embarked', regexp_replace(col('Embarked'),'S','Southampton')).display()



In [0]:
df.withColumn(('Embarked'),regexp_replace(col('Embarked'),'Southampton','S')).display()

In [0]:
from pyspark.sql import SparkSession



Type Casting

In [0]:
df.printSchema()


In [0]:
df.display()

In [0]:
df.withColumn('Embarked',col('Embarked').cast('string')).display()

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.types import StringType

In [0]:
df.withColumn('Fare',col('Fare').cast(StringType())).display()

In [0]:
df.printSchema()

# Sort/Order by

In [0]:
df.display()

In [0]:
df.sort(col('Age').asc()).display()

In [0]:
df.sort(['PassengerId','Pclass'], ascending = [False,False]).display()


# Limit

In [0]:
df.limit(10).display()

In [0]:
df.display()

# Drop

In [0]:
df.drop(col('cabin')).display()


In [0]:
undrop = df.drop(col('cabin'))
undrop.display()


In [0]:
undrop = df.drop('cabin')

In [0]:
df.display()

# Drop Dublicates

In [0]:
df.dropDuplicates().display()


In [0]:
pip install databricks-sql-connector

In [0]:
from pyspark.sql.functions import subset


In [0]:
from pyspark.sql import SparkSession

In [0]:
df.dropDuplicates(subset=['Embarked']).display()

In [0]:
data1 = [('1','mukthar'),
        ('2','Basha')]
schema1 = 'id string,name string'
df1 = spark.createDataFrame(data1,schema1)
df1.display()

data2 = [('3','Akthar'),
        ('4','Basha')]
schema2 = 'id string,name string'
df2 = spark.createDataFrame(data2,schema2)
df2.display()

In [0]:
df1.union(df2).display()

In [0]:
df3 = [('1','mukthar'),
        ('2','Basha')]
schema3 = 'id string,name string'
df3 = spark.createDataFrame(df3,schema3)
df3.display()

df4 = [('Mukthar','3'),
        ('Basha','4')]
schema4 = 'name string,id string'
df4 = spark.createDataFrame(df4,schema4)
df4.display()

**Union by**

In [0]:
df3.union(df4).display()

In [0]:
df3.unionByName(df4).display()

**String Functions**

Initcap()

In [0]:
df.display()


In [0]:
from pyspark.sql.functions import *

In [0]:
upper_case = df.select(upper('Name')).display()
lower_case = df.select(lower('Name')).display()


In [0]:
df.display()

In [0]:
from pyspark.sql.functions import current_date

In [0]:
df.display()

In [0]:
df.withColumn('current_date',current_date()).display()

In [0]:
from pyspark.sql.functions import  current_date

In [0]:
df.withColumn('week_date',date_add('current_date',7)).display()

In [0]:
df.display()

In [0]:
df.withColumn('current_date',current_date()).display()

In [0]:
df.withColumn('week_before',date_sub('current_date',7)).display()

In [0]:
df.withColumn('current_date',current_date()).display()


In [0]:
df.display()

In [0]:
df_date = df.withColumn('current_date',current_date())
df_date.display()


In [0]:
df_date = df_date.withColumn('week_before',date_sub('current_date',7))
df_date.display()

In [0]:
from pyspark.sql.functions import datediff

In [0]:
df_date.display()

In [0]:
df.withColumn('datediff',datediff('current_date','week_before'))


In [0]:
display(df)

# **Handling Nulls**

In [0]:
df.display()

Dropping Nulls

In [0]:
df.dropna(subset = ['cabin']).display()

In [0]:
df.fillna('Not Available').display()

In [0]:
df.fillna('Not available', subset=['Age'])