### Data Reading

In [0]:
dbutils.fs.ls("dbfs:/Volumes/init/infer_schema/initvolume/")


In [0]:
df = spark.read.format('csv').option('infer_schema',True).option('header',True).load("/Volumes/init/infer_schema/initvolume/sample_data.csv")

In [0]:
df.display()

In [0]:
df.show()

##Data Reading Json

In [0]:
dfJson = spark.read.format('json').option('infer_schema',True).option('header',True).option('multiLine',True).load("/Volumes/init/infer_schema/initvolume/sample_data.json")
 
 

In [0]:
dfJson.display()

In [0]:
dfJson2 = spark.read.format('json').option('multiLine',True).load("/Volumes/init/infer_schema/initvolume/sample_data.json")

In [0]:
dfJson2.display()

### Schema Defination 


In [0]:
df.printSchema()

In [0]:
df = spark.read.format("csv") \
     .option("header", True) \
     .option("inferSchema", True) \
     .load("/Volumes/init/infer_schema/initvolume/sample_data.csv")

df.printSchema()


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

structschema = StructType([
StructField('ID', StringType(), True),
StructField('Name', StringType(), True),
StructField('Age', StringType(), True),
StructField('City', StringType(), True)

])

In [0]:
df = spark.read.format("csv") \
     .option("header", True) \
     .schema(structschema) \
     .load("/Volumes/init/infer_schema/initvolume/sample_data.csv")

df.printSchema()

In [0]:
myDDLSchema = '''
 ID integer,
 Name string,
 Age integer,
 City string

'''

In [0]:
df = spark.read.format("csv") \
     .option("header", True) \
     .schema(myDDLSchema) \
     .load("/Volumes/init/infer_schema/initvolume/sample_data.csv")


In [0]:
df.printSchema()


###Select

In [0]:
df.display()

In [0]:
df.select('ID','Name').display()

###Alais

In [0]:
df.select(col('ID').alias('id')).display()


In [0]:
df.display()

### Filter 

In [0]:
df.filter(col('Age')>= 30 ).display()

In [0]:
df.filter((col('ID')>2)&(col('Age')>= 30)).display()
 

In [0]:
df.filter(col('Age').isNull()).display()

In [0]:
df.filter((col('Age').isNotNull() )& (col('Age')>=30)).display()

In [0]:
df.withColumnRenamed ('Age' , 'age') .display()

In [0]:
df.withColumn('nationality',lit('Egyptian')).display()

In [0]:
df.withColumn('multiply', col('Age')*col('ID')).display()

In [0]:
df.withColumn( 'Age' ,regexp_replace(col('Age'), 22 ,33)).display()

In [0]:
df.withColumn( 'Age' ,regexp_replace(col('Age'), 22 ,33))\
.withColumn('Name', regexp_replace(col('Name'), 'Sara' ,'Jana')).display()

###Cast

In [0]:
df =  df.withColumn('Age' , col('Age').cast(IntegerType())) 

In [0]:
df.display()

###Sort

In [0]:
df.sort(col('Age').desc()).display()
 

In [0]:
df.sort( [ 'ID' , 'Age'  ] , ascending  = [0,0]).display()

###Limit

In [0]:
df.limit(2).display() 

###Drop

In [0]:
df.drop('Age').display()

In [0]:
df.drop(df.Age == 25).display()

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset=['Age']).display()

In [0]:
df1 = [
    (1,'Ahmed'),
    (2,'Shawky')
]
schema1 = '''
ID integer,
Name string

'''
df1 =spark.createDataFrame(df1,schema1)
df2 = [
    (3,'Omer'),
    (4,'Hema')
]
schema2 = '''
ID integer,
Name string

'''
df2 = spark.createDataFrame(df2,schema2)



In [0]:
df1.union(df2).display()

###Upper & Lower & initcap 

In [0]:
df.select(upper('Name')).display()

In [0]:
df.select(lower('Name')).display()

In [0]:
df.select(initcap('Name')).display()

In [0]:
df.display()

### new df with null 

In [0]:
extra_rows = [
    ( 6 , "Sara", 30, None),
    (7, "Omar ebrahim", 28, None)
]
extra_df = spark.createDataFrame(extra_rows, df.schema)
extra_df.display()
 

### fillna and dropna

In [0]:
extra_df = extra_df.fillna("Cairo")
extra_df.display()


In [0]:
extra_df = extra_df.dropna('all')
extra_df.display()



In [0]:
extra_df = extra_df.dropna('any')
extra_df.display()

In [0]:
extra_df = extra_df.dropna(subset=['City'])
extra_df.display()


###split & arraycontains

In [0]:
from pyspark.sql.functions import *
extra_df.withColumn('Name', split('Name' , ' ')).display()

In [0]:
extra_df.withColumn('Name', split('Name' , ' ')[0]).display()

In [0]:
extra_df=extra_df.withColumn('Name', explode(split('Name', ' '))).display()


In [0]:
extra_df=extra_df.withColumn('Name', split('Name' , ' '))

In [0]:
extra_df = extra_df.withColumn('status' , array_contains('Name', 'ebrahim'))


In [0]:
extra_df.display()

###Group By

In [0]:
df = spark.read.format('csv').option('infer_schema',True).option('header',True).load("/Volumes/init/infer_schema/initvolume/BigMart Sales.csv")

In [0]:
df.display()

In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

In [0]:

df = spark.read.format('csv')\
            .schema(my_ddl_schema)\
            .option('header',True)\
            .load('/Volumes/init/infer_schema/initvolume/BigMart Sales.csv') 

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.functions import sum as spark_sum

df.groupBy("Item_Type") \
  .agg(spark_sum("Item_MRP").alias("Total_MRP")) \
  .display()


In [0]:
df.groupBy('Item_Type' , 'Outlet_Size').agg(spark_sum('Item_MRP')).display()

In [0]:

data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
from pyspark.sql.functions import * 


In [0]:
df_book.groupBy('user').agg(collect_list('book').alias('total_books')).display()

Collect_List

In [0]:

data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
from pyspark.sql.functions import *
df_book.groupBy('user').agg(collect_list('book')).display()

###When

In [0]:
df = df.withColumn('veg_flag',
        when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
   


In [0]:
     df.display()

###Join

In [0]:
  dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.join(df2,df1.dept_id == df2.dept_id , 'inner').display()

In [0]:
df1.join(df2,df1.dept_id == df2.dept_id , 'left').display()

In [0]:
df1.join(df2,df1.dept_id == df2.dept_id , 'right').display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'anti').display()


###WINDOW FUNCTIONS


In [0]:
from pyspark.sql.window import Window
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()


###UDF

In [0]:
def func (x) :
    return x*x 

In [0]:
my_udf = udf(func)

In [0]:
df.withColumn('newCol' , my_udf(col('Item_MRP'))).display()

In [0]:

df.write.format('csv').save("/Volumes/init/infer_schema/initvolume/data.csv")