# Data Processing using Pyspark

In [None]:
#import SparkSession
from pyspark.sql import SparkSession

In [None]:
#create spar session object
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [None]:
# Load csv Dataset 
df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)

In [None]:
#columns of dataframe
df.columns

In [None]:
#check number of columns
len(df.columns)

In [None]:
#number of records in dataframe
df.count()

In [None]:
#shape of dataset
print((df.count(),len(df.columns)))

In [None]:
#printSchema
df.printSchema()

In [None]:
#fisrt few rows of dataframe
df.show(5)

In [None]:
#select only 2 columns
df.select('age','mobile').show(5)

In [None]:
#info about dataframe
df.describe().show()

In [None]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [None]:
#with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

In [None]:
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

In [None]:
#with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

In [None]:
#filter the records 
df.filter(df['mobile']=='Vivo').show()

In [None]:
#filter the records 
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

In [None]:
#filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()

In [None]:
#filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

In [None]:
#Distinct Values in a column
df.select('mobile').distinct().show()

In [None]:
#distinct value count
df.select('mobile').distinct().count()

In [None]:
df.groupBy('mobile').count().show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').mean().show(5,False)

In [None]:
df.groupBy('mobile').sum().show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').max().show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').min().show(5,False)

In [None]:
#Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

In [None]:
# UDF
from pyspark.sql.functions import udf


In [None]:
#normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'

In [None]:
#create udf using python function
brand_udf=udf(price_range,StringType())
#apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

In [None]:
#using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
#apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10,False)

In [None]:
#pandas udf
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
#create python function
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left

In [None]:
#create udf using python function
length_udf = pandas_udf(remaining_yrs, IntegerType())
#apply pandas udf on dataframe
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

In [None]:
#udf using two columns 
def prod(rating,exp):
    x=rating*exp
    return x

In [None]:
#create udf using python function
prod_udf = pandas_udf(prod, DoubleType())
#apply pandas udf on multiple columns of dataframe
df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False)

In [None]:
#duplicate values
df.count()

In [None]:
#drop duplicate values
df=df.dropDuplicates()

In [None]:
#validate new count
df.count()

In [None]:
#drop column of dataframe
df_new=df.drop('mobile')

In [None]:
df_new.show(10)

In [None]:
# saving file (csv)

In [None]:
#current working directory
pwd

In [None]:
#target directory 
write_uri='s3://<bucket/dir>/df_csv'

In [None]:
#save the dataframe as single csv 
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

In [None]:
# parquet

In [None]:
#target location
parquet_uri='s3://<bucket/dir>/df_parquet'

In [None]:
#save the data into parquet format 
df.write.format('parquet').save(parquet_uri)