# Reading data 

In [None]:
# Read from filepath on distributed system
file_location = "/FileStore/tables/avocado.csv"
file_type = "csv"
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)


# Read from existing table on hive
 df=spark.sql('select * from tablename')

# Metadata of the dataframe

In [None]:
df.printSchema()

# Glimpse of the data

In [None]:
df.show()

# Count the number of records

In [None]:
count_rows=df.count()
print ('Total data count is '+str(count_rows))

# Subset Data

In [None]:
# Method 1 - using lists 
columns_to_subset=['Total Volume','AveragePrice']
df1=df.select(*[columns_to_subset])

# Method 2 - Column names
df1=df.select('Total Volume','AveragePrice')

# Method 3 - Indexes
df1=df.select(df[2],df[3])

# Count Missing Values

In [None]:
df.where(df['Total Volume'].isNull()).count()

# One way Frequency

In [None]:
df.groupby(df['type']).count().show()

# Crosstab

In [None]:
df.where(df['region']=='Albany').crosstab('type','region').show()

# Summary Statistics

In [None]:
# This will give a quick glimpse to data, if columns are not mentioned all the numeric columns stats are produced 
columns_to_analyze=['Total Volume','AveragePrice']
df.select(*[columns_to_analyze]).describe().show()

# Casting a variable

In [None]:
df = df.withColumn('Total Volume',df['Total Volume'].cast("float"))
df.printSchema()

# Median Value Calculation

In [None]:
# Three parameters have to be passed through approxQuantile function
#1. col – the name of the numerical column
#2. probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
#3. relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.

median=df.approxQuantile('Total Volume',[0.5],0.1)
print ('The median of Total Volume is '+str(median))

# Number of distinct levels

In [None]:
from pyspark.sql.functions import col, countDistinct
column_name='region'
count_distinct=df.agg(countDistinct(col(column_name).alias("distinct_counts"))).head()[0]
print ('The number of distinct values of '+column_name+ ' is ' +str(count_distinct))

# Distinct Levels

In [None]:
column_name='type'
df.select(column_name).distinct().show()

# Filter Data

In [None]:
# Lets filter the organic type avocados within Albany region
filtered_count=df.filter((df['type']=='organic') & (df.region=='Albany')).count()
print ('subset data count is '+str(filtered_count))

# Rename Columns

In [None]:
df=df.withColumnRenamed("Total Volume","Total_Volume")
# For multiple columns
df=df.withColumnRenamed("Total Bags","Total_Bags").withColumnRenamed("Small Bags","Small_Bags").withColumnRenamed("Large Bags","Large_Bags")
df.printSchema()

# Create new columns

In [None]:
#using withColumn
df1=df.select('Total_Volume','Total_Bags').withColumn('avg_volume_bag',df['Total_Volume']/df['Total_Bags'])

# Alternative method 
df = df.withColumn('Total_Bags',df['Total_Bags'].cast("float"))
df1=df.select('_c0','Total_Volume','Total_Bags',(df['Total_Volume']/df['Total_Bags']).alias('avg_volume_bag'))
df1.show()

# Create multiple columns

In [None]:
# Import Necessary data types
from pyspark.sql.functions import udf,split
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

# Create a function for all the data maipulations
def new_cols(Total_Volume,AveragePrice):
 if Total_Volume<44245: Volume_Category='Small'
 elif Total_Volume<850644: Volume_Category='Medium'
 else: Volume_Category='Large'
 if AveragePrice<1.25: Price_Category='Low'
 elif AveragePrice<1.4: Price_Category='Mid'
 else: Price_Category='High'
 return Volume_Category,Price_Category  

# Apply the user defined function on the dataframe 

udfB=udf(new_cols,StructType([StructField("Volume_Category", StringType(), True),StructField("Price_Category", StringType(), True)]))
df2=df.select('_c0','Total_Volume','AveragePrice').withColumn("newcat",udfB("Total_Volume","AveragePrice"))

# Unbundle the struct type columns into individual columns and drop the struct type 
df3 = df2.select('_c0','Total_Volume','AveragePrice','newcat').withColumn('Volume_Category', df2.newcat.getItem('Volume_Category')).withColumn('Price_Category', df2.newcat.getItem('Price_Category')).drop('newcat')
df3.show()

# String Operations - Concatenation

In [None]:
from pyspark.sql.functions import concat
df3=df3.withColumn('VolumePrice_Category',concat(df3.Volume_Category,df3.Price_Category))
df3.show()

# String Operations - ChangeCases

In [None]:
from pyspark.sql.functions import concat,trim,upper
df3=df3.withColumn('Price_Category',trim(upper(df3.Price_Category)))
df3.show()

# Update a column value

In [None]:
from pyspark.sql.functions import *
df4 = df3.withColumn('Volume_Category',when(df3['Volume_Category']=='Medium','Mid').otherwise(df3['Volume_Category']))
df4.show()

# Drop a column

In [None]:
df4=df4.drop('VolumePrice_Category')
df4.columns

# Sorting

In [None]:
df4=df4.sort(col("Total_Volume").desc())
df4.show()

# Save as hive table 

In [None]:
df3.write.saveAsTable('avocado')

# Save as text file

In [None]:
df3.write.format("csv").option("delimiter", "|").save('avocado_textfile')

# Convert to Pandas

In [None]:
import pandas as pd
pandas_df=df3.toPandas()
pandas_df

# Add a montonically increasing id

In [None]:
# The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
df5 = df4.withColumn("new_id", monotonically_increasing_id())

# Joins

In [None]:
# The join will include both keys from the tables. Common key can be explicitly dropped using a drop statement or subset of columns needed after join can be selected
# inner, outer, left_outer, right_outer, leftsemi joins are available 
joined_df = df3.join(df1, df1['_c0'] == df3['_c0'], 'inner')