### Import data as a df

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Bacis').getOrCreate()

In [None]:
# read in the data as df
df = spark.read.json('people.json')
# show the df
df.show()
# show the schema of the df
df.printSchema()
# return a list of columns 
df.columns
# return a df contains the statistic summary of the numeric columns in the df
df.describe()
df.describe().show()

### Define a schema

Sometimes the incoming data might not have a clear schema, so you can define/specify the schema by yourself

In [None]:
from pyspark.sql.types import StructField, StructureType, IntegerType, StructureType

In [None]:
data_schema = [StructureField('age',IntegerType(),True),
               StructureField('name',StringType(),True)]
final_struc = StructType(fields = data_scehma)

In [None]:
df = spark.read.json('people.json',schema = final_struc)

### 

In [None]:
# return a column object
df['age']
# retrun a df
df.select('age')
# show the column
df.select('age').show()
# select mutiple columns
df.select(['age','name'])
df.select(['age','name']).show()
# first two rows, return a list of row objects
df.head(2)

In [None]:
# add a new column, doesn't impact the original df
df.withColumn('newage',df['age']).show()
# rename the column, impact the original df
df.withColumnReanamed('age','new_age').show()


### Intereact with SQL

In [None]:
# create/replace a temp view on the df 
df.createOrReplaceTempView('people')
# use sql to interact with view
results = spark.sql("select * From people")

### Filtering

In [None]:
# return all rows satisfying the condition
df.filter(df['age'] < 20).show()
df.filter(df['age'] == 20).show()
# return selected rows and columns satisfying the condition
df.filter(df['age'] < 20).select('name').show()

# you can collect the result for later usage
# returns a list of row objects
results = df.filter(df['age'] < 20).select('name').collect()
# convert to dict =>easier to use
result = results[0].asDict()

# Filtering on multiple conditions
# Logical Operators and:&  or:pipe  not:~ 
# conditions need to be contained in brakets
df.filter((df['age'] > 5 ) & (df['age'] < 20)).show()

### Joins

In [None]:
df_ab_inner = df_a.join(df_b, df_a.id == df_b.id, "inner")
df_ab_outer = df_a.join(df_b, df_a.id == df_b.id, "outer")

### Aggregate Function, GroupBy, and Window Function

In [None]:
from pyspark.sql.functions import count,countDistinct, sum

In [None]:
# Aggregate Fucntions
df.select(count("*")).show()
df.select(count("col_name")).show()
df.select(count("col_name"),countDistinct("col_name")).show()
df.filter(df['name']=='Kim').select(count("col_name"),countDistinct("col_name")).show()

In [None]:
# Group By
# groupby by a column, then aggregate, here only allows for one aggregation function
df.groupBy('Column_Name').max().show()
# to apply mutiple aggregation function, we need to use agg()
df.groupBy('Column_Name').agg(sum("points").alias("Total_points"),count("ID").alias("Total_numebr")).show()

In [None]:
from from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

In [None]:
# Window Fucntions
# Example: create a rank column based on partitioned by col and order by col_2
# Define the window
RankSpec = Window.partitionBy("col_1").orderBy(desc("col_2"))
# Apply the window with funciton
df.withColumn("Rank",rank().over(RankSpec))

### Order by

In [None]:
# Ascending
df.orderBy('Column_Name').show()
# Descending
df.orderBy(df['Column_Name'].desc()).show()

### Functions

In [None]:
from pyspark.sql.functions import countDistinct, avg, stddev, format_number

In [None]:
# calculate the average of a column
df.select(avg('Column_Name')).show()
# you can give the calcuated column an alias
df.select(avg('Column_Name').alias('Average_value')).show()

In [None]:
# format the number, keep 2 decimal places
df.select(format_number('Column_Nmae',2)).show()

### Mssing Values

In [None]:
# Dropping a row as long as it has a missing value
df.na.drop().show()
# Set a threshold for the number of non-missing values one row must have
# otherwise it will be dropped
df.na.drop(thresh = 2).show()

# drop a row as long as there is a null value
df.na.drop(how = 'any').show()
# drop a row only when all of its value is null
df.na.drop(how = 'all').show()
# drop a row based on a subset of columns
df.na.drop(subset = ['Column_Name']).show()


In [None]:
# fill the missing value
# it will fill the null cell 
# if the data type of 'fill value' matches the data type of the cell
df.na.fill('Fill Value').show()

df.na.fill('Zoe', subset=['name']).show()

mean_val = df.select(mean(df['Sales'])).collect()
mean_sales = mean_val[0][0]
df.na.fill(mean_sales,subset=['Sales']).show()