In [0]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()

In [0]:
read_df1=spark.read.csv("/Volumes/workspace/default/volume1/custsmodified")
display(read_df1)

In [0]:
read_df1.write.csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",mode="append")

####a. Passive Data Munging

In [0]:
rawdf1=spark.read.csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",header=False,inferSchema=True).toDF("id","firstname","lastname","age","profession")
r_c=rawdf1.count()
print("Total row count",r_c)
rawdf1.show(20,False)
display(rawdf1.take(21))
display(rawdf1.sample(0.1))

In [0]:
rawdf1.printSchema()

In [0]:
print(rawdf1.columns)

In [0]:
print(rawdf1.dtypes)

In [0]:
for i in rawdf1.dtypes:
    if i[1]=='string':
        print(i[0])

In [0]:
print(rawdf1.schema)

In [0]:
print("Actual total record",rawdf1.count())
print("de-duplicated record (all columns) count",rawdf1.distinct().count())
print("de-duplicated record (all columns) count",rawdf1.dropDuplicates().count())
print("de-duplicated given cid column count",rawdf1.dropDuplicates(['id']).count())
display(rawdf1.describe())
display(rawdf1.summary())

####b. Active Data Munging

In [0]:
from pyspark.sql.session import SparkSession  #approx 15lakh bulidin funct
spark=SparkSession.builder.getOrCreate()

In [0]:
# Extraction(Ingestion) method
#single file path
struct1="id string, firstname string, lastname string, age string, profession string"
rawdf1=spark.read.schema(struct1).csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified")

#multiple path
struct2="id string, firstname string, lastname string, age string, profession string"
rawdf2=spark.read.schema(struct2).csv(path=["/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified","/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified"])

#multiple files in multiple path
struct2="id string, firstname string, lastname string, age string, profession string"
rawdf3=spark.read.schema(struct2).csv(path=["/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified","/Volumes/workspace/default/volumewd36/ingestion_volume/source/source2/"],pathGlobFilter="cust*",recursiveFileLookup=True)

In [0]:
#Active data munging..

struct5="id string, firstname string, lastname string, age string, profession string"
rawdf5=spark.read.schema(struct5).csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/",pathGlobFilter="custsmodified_N*",recursiveFileLookup=True)

struct6="id string, firstname string, age string, profession string,city string"
rawdf6=spark.read.schema(struct6).csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/",pathGlobFilter="custsmodified_T*",recursiveFileLookup=True)

display(rawdf5)
display(rawdf6)

rawdf7=rawdf5.union(rawdf6)    #this method not correct, see the output
display(rawdf7)

rawdf8=rawdf5.unionByName(rawdf6,allowMissingColumns=True)  #this is correct method, if we have 2 diff files and diff coulmn
display(rawdf8)

In [0]:
#Validation by doing cleansing
from pyspark.sql.types import StructType,StructField,StringType,ShortType,IntegerType

struttype1=StructType([StructField('id', IntegerType(), True), StructField('firstname', StringType(), True), StructField('lastname', StringType(), True), StructField('age', ShortType(), True), StructField('profession', StringType(), True)])

#method1 - permissive with all rows with respective nulls
cleandf1=spark.read.schema(struttype1).csv(path="/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",mode='permissive')
print("after keeping nulls on the wrong data format",cleandf1.count())#all rows count
display(cleandf1)#We are making nulls where ever data format mismatch is there (cutting down mud portition from potato)
#or
#method2 - drop malformed rows
cleandf1=spark.read.schema(struttype1).csv(path="/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",mode='dropMalformed')
print("after cleaning wrong data (type mismatch, column number mismatch)",len(cleandf1.collect()))
display(cleandf1)#We are removing the entire row, where ever data format mismatch is there (throwing away the entire potato)

In [0]:
#method3 best methodology of applying active data munging
#Validation by doing cleansing (not at the time of creating Dataframe, rather we will clean and scrub subsequently)...
struttype1=StructType([StructField('id', StringType(), True), StructField('firstname', StringType(), True), StructField('lastname', StringType(), True), StructField('age', StringType(), True), StructField('profession', StringType(), True)])
#method1 - permissive with all rows with respective nulls
rawdf9=spark.read.schema(struttype1).csv(path="/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",mode='permissive')
print("allow all data showing the real values",rawdf1.count())#all rows count
display(rawdf9)#We are making nulls where ever data format mismatch is there (cutting down mud portition from potato)

####Rejection Strategy

In [0]:
#Creating rejection dataset to send to our source system for future fix
from pyspark.sql.types import StructType,StructField,StringType,ShortType,IntegerType
struttype1=StructType([StructField('id', IntegerType(), True), StructField('firstname', StringType(), True), StructField('lastname', StringType(), True), StructField('age', ShortType(), True), StructField('profession', StringType(), True),StructField("corruptedrows",StringType())])
#method1 - permissive with all rows with respective nulls
cleandf1=spark.read.schema(struttype1).csv(path="/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",mode='permissive',columnNameOfCorruptRecord="corruptedrows")
#Create a reject dataset
rejectdf1=cleandf1.where("corruptedrows is not null")
display(rejectdf1)
rejectdf1.write.csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/source/reject",mode="overwrite",header=True)
retaineddf1=cleandf1.where("corruptedrows is null")
print("Overall rows in the source data is ",len(cleandf1.collect()))
print("Rejected rows in the source data is ",len(rejectdf1.collect()))
print("Clean rows in the source data is ",len(retaineddf1.collect()))

In [0]:
####Cleansing

#We already know how to do cleansing applying the strict Structure on method1 and method2
#Important na functions we can use to do cleansing
cleanseddf1=rawdf9.na.drop(how="any")#This function will drop any column in a given row with null otherwise this function returns rows with no null columns
display(cleanseddf1.where("age is null"))
display(rawdf9.where("age is null"))
cleanseddf1=rawdf9.na.drop(how="any",subset=["id","age"])
display(cleanseddf1)
cleanseddf1=rawdf9.na.drop(how="all",subset=["lastname","profession"])
display(cleanseddf1)
print("any one row in the cleansed df with firstname and lastname is null")
print("Total rows without firstname and lastname with null values",len(cleanseddf1.collect()))
display(cleanseddf1)#We are taking this DF further for munging..

In [0]:
scrubbeddf1=cleanseddf1.na.fill('not provided',subset=["lastname","profession"])#fill will help us replace nulls with some value
display(scrubbeddf1)
find_replace_values_dict1={'Pilot':'Captain','Actor':'Celeberity'}
find_replace_values_dict2={'not provided':'NA'}
find_replace_values_dict3={'not provided':'NA'}
scrubbeddf2=scrubbeddf1.na.replace(find_replace_values_dict1,subset=["profession"])
display(scrubbeddf2)
scrubbeddf3=scrubbeddf2.na.replace(find_replace_values_dict2,subset=["lastname"])
display(scrubbeddf3)
scruppeddf4=scrubbeddf2.na.replace(find_replace_values_dict3,subset=["lastname"])
display(scruppeddf4)

####deduplication

In [0]:
display(scrubbeddf3.where("id in ('4000001')"))#before row level dedup
dedupdf1=scrubbeddf3.distinct()#It will remove the row level duplicates
display(dedupdf1.where("id in ('4000001')"))
display(dedupdf1.coalesce(1).where("id in ('4000003')"))

dedupdf2=dedupdf1.coalesce(1).dropDuplicates(subset=["id"])
display(dedupdf2.where("id in ('4000003')"))

dedupdf3=dedupdf1.coalesce(1).orderBy(["id","age"],ascending=[True,True]).dropDuplicates(subset=["id"])
display(dedupdf3.where("id in ('4000003')"))

#####Standaradization--Adding column

In [0]:
from pyspark.sql.functions import lit,initcap,col
#withColumn("stringcolumnname to add in the df",lit('hardcoded')/initcap(col("colname")))
standarddf1=dedupdf3.withColumn("sourcesystem",lit("Retail"))#SparkSQL - DSL(FBP)
display(standarddf1.limit(20))

#####Coulmn Uniformity

In [0]:
from pyspark.sql.functions import upper

#Standardization2 - column uniformity
standarddf2=standarddf1.withColumn("profession",initcap("profession"))#inicap or any other string function with columnOr name can accept either column or string type provided if the string is a column name for eg. profession/age/sourcesystem.
display(standarddf2.limit(20))
display(standarddf2.groupBy("profession").count())

#####Standardization3 - Format Standardization

In [0]:
#Did analysis to understand the format issues in our id and age columns
#standarddf2.where("id like 't%'").show()

standarddf2.where("id rlike '[a-zA-Z]'").show()
#standarddf2.where("id rlike '[a-zA-Z]'").show()#rlike is regular expression like function that help us identify any string data in our DF column
standarddf2.where("age rlike '[^0-9]'").show()#checking for any non number values in age column

In [0]:
from pyspark.sql.functions import regexp_replace,replace
#Let's apply scrubbing features to our id column to replace ten with 10 (or we can think of using GenAI here)
replaceval={'one':'1','two':'2','three':'3','four':'4','five':'5','six':'6','seven':'7','eight':'8','nine':'9','ten':'10'}
standarddf3=standarddf2.na.replace(replaceval,["id"])
#standarddf3=standarddf2.withColumn("id",replace("id",lit('ten'),"10"))
standarddf3=standarddf3.withColumn("age",regexp_replace("age",'-',""))
display(standarddf3)

standarddf4=standarddf3.where("id='10'")
display(standarddf4)

standarddf4=standarddf3.where("age='77'")
display(standarddf4)

#####Standardization4 - Data Type Standardization

In [0]:
standarddf3.printSchema()#Still id and age are string type, though it contains int data
standarddf4.withColumn("id",standarddf3.id.cast("long"))      #---------not working
standarddf4.printSchema()

standarddf4=standarddf3.withColumn("id",standarddf3["id"].cast("int"))    #----->working
standarddf4.printSchema()

standarddf4=standarddf3.withColumn("id",col("id").cast("long"))       #------>working
standarddf4.printSchema()

standarddf4=standarddf4.withColumn("age",col("age").cast("short"))      #---->working
standarddf4.printSchema()

display(standarddf4)

#####Standardization5 - Naming Standardization

In [0]:
standarddf5=standarddf4.withColumnRenamed("id","custid")
standarddf5=standarddf4.withColumnsRenamed({"id":"custid","sourcesystem":"srcsystem"})
display(standarddf5)

######Standardization6 - Reorder Standadization

Reordering the column selecting..

In [0]:
standarddf6=standarddf5.select("custid", "age", "firstname","lastname","profession","srcsystem")
mungeddf=standarddf6
display(mungeddf.take(10))

##**2. Data Enrichment** - Detailing of data
Makes your data rich and detailed <br>
a. Add (withColumn,select,selectExpr), Derive (withColumn,select,selectExpr), Remove(drop,select,selectExpr), Rename (withColumnRenamed,select,selectExpr), Modify/replace (withColumn,select,selectExpr) - very important spark sql functions <br>
b. split, merge/Concat <br>
c. Type Casting, reformat & Schema Migration <br>

a. Add (), Derive (), Rename (), Modify/replace (), Remove/Eliminate () - very important spark sql DF functions

Adding of columns
Lets add datadt (date of the data orgniated from the source for eg. provided in the filename in a format of yy/dd/MM) and loaddt (date when we are loading the data into our system)

In [0]:
derived_datadt='25/30/12'
print(f"hello '{derived_datadt}'")

In [0]:
from pyspark.sql.functions import lit,current_date#already imported, not needed here
original_filename='custsmodified_25/30/12.csv'#We are deriving this date from the filename provided by the source custsmodified_25/30/12.csv
derived_datadt=original_filename.split('_')[1].split('.')[0]
#derived_datadt='25/30/12'#We are deriving this date from the filename provided by the source custsmodified_25/30/12.csv
enrichdf1=mungeddf.withColumn("datadt",lit('25/30/12')).withColumn("loaddt",current_date())
display(enrichdf1)
enrichdf1.printSchema()

enrichdf1=mungeddf.withColumns({"datadt":lit('25/30/12'),"loaddt":current_date()})
display(enrichdf1)
enrichdf1.printSchema()

enricheddf1=mungeddf.select("*",lit(derived_datadt).alias('datadt'),current_date().alias('loaddt'))
display(enricheddf1)
enricheddf1.printSchema()


enrichdf1=mungeddf.selectExpr("*",f"'{derived_datadt}' as datadt","current_date() as loaddt")#DSL(select) + SQL expression
enrichdf1.printSchema()
display(enrichdf1)

######Deriving of columns

bringing mentioned letters from column value, here substring/substr just bringing first letter by giving substr(colmnname,1,1)...index 1 and character lenghth 1 so first letter

In [0]:
from pyspark.sql.functions import *
enrichdf2=enrichdf1.withColumn("professionflag",substring("profession",1,1))
#or
enrichdf2=enrichdf1.select("*",substring("profession",1,1).alias("professionflag"))
#or
enrichdf2=enrichdf1.selectExpr("*","substr(profession,1,1) as professionflag")
display(enrichdf2.take(20))

Renaming of Columns

In [0]:
#Can we use withColumn to rename? not directly, its costly
enrichdf3=enrichdf2.withColumn("sourcename",col("srcsystem"))
enrichdf3=enrichdf3.drop("srcsystem").select("custid","age","firstname","lastname","profession","sourcename","datadt","loaddt","professionflag")
#or
enrichdf3=enrichdf2.select("custid","age","firstname","lastname","profession",col("srcsystem").alias("sourcename"),"datadt","loaddt","professionflag")#costly too, since we have to choose all columns in the select
#or
#enrichdf2.printSchema()
enrichdf3=enrichdf2.selectExpr("custid","age","firstname","lastname","profession","srcsystem as sourcename","datadt","loaddt","professionflag")#costly too, since we have to choose all columns in the select
#or
enrichdf3=enrichdf2.withColumnRenamed("srcsystem","sourcename")#Best function to rename the column(s)
#or
enrichdf3=enrichdf2.withColumnsRenamed({"srcsystem":"sourcename","professionflag":"profflag"})
display(enrichdf3.take(20))

Modify/replace (withColumn, select/selectExpr)

In [0]:
enrichdf4=enrichdf3.withColumn("profession",col("sourcename"))#This will replace the profession with sourcename
display(enrichdf4.take(20))

#or
enrichdf4=enrichdf3.withColumn("profession",concat("profession",lit('-'),"profflag"))#This will modify/enrich the profession column with sourcename

#or using select/selectExpr
enrichdf4=enrichdf3.select("custid","age","firstname","lastname",concat("profession",lit('-'),"profflag").alias("profession"),"sourcename","datadt","loaddt","profflag")

#or use selectExpr
enrichdf4=enrichdf3.selectExpr("custid","age","firstname","lastname","concat(profession,'-',profflag) as profession","sourcename","datadt","loaddt","profflag")


display(enrichdf4.take(20))

Remove/Eliminate

In [0]:
#enrichdf4=enrichdf3.withColumn("profession",col("sourcename"))#Cannot be used
#or using select/selectExpr (yes, but costly)
enrichdf5=enrichdf4.select("custid","age","firstname","lastname","profession","sourcename","datadt","loaddt")
#or use selectExpr (yes, but costly)
enrichdf5=enrichdf4.selectExpr("custid","age","firstname","lastname","profession","sourcename","datadt","loaddt")
#or 
enrichdf5=enrichdf4.drop("profflag")#right function to use from dropping
display(enrichdf5.take(20))

Splitting & Melting/merging of columns

In [0]:
#Splitting of column
splitdf=enrichdf5.withColumn("profflag",split("profession",'-'))
splitdf=splitdf.withColumn("profession",col("profflag")[0])
display(splitdf.take(20))


splitdf=splitdf.withColumn("shortprof",upper(substring(col("profession"),1,3))).drop("profflag")
display(splitdf)

#Merging of column
mergeddf=splitdf.select(col("custid"),"age",concat_ws(" ",col("firstname"),col("lastname")).alias("fullname"),"profession","sourcename","datadt","loaddt","shortprof")#usage of select will help us avoid chaining of withColumn,drop,select
display(mergeddf.limit(10))

In [0]:
mergeddf.printSchema()

Formatting and Typecasting:

In [0]:
formateddf=mergeddf.withColumn("datadt",to_date(col('datadt'),'yy/dd/MM'))
formateddf.printSchema()
display(formateddf.limit(10))

##3. Data Customization - Application of Tailored Business specific Rules
a. User Defined Functions<br/>
b. Building of Frameworks & Reusable Functions (We will learn very next)

In [0]:
def upperadd(addingupper_case):
   return addingupper_case.upper()
print(upperadd("hello"))

In [0]:
from pyspark.sql.functions import upper,col
formateddf1=formateddf.withColumn("fullname",upper(col("fullname")))
display(formateddf1.limit(10))

from pyspark.sql.functions import udf
udfupper=udf(upperadd)#promote normal python function to spark ready udf
formateddf1=formateddf.withColumn("fullname",udfupper(col("fullname")))#if udf is inevitable, then we create despite of performance bottleneck
formateddf1.explain()
display(formateddf1.take(10))


#####Create Python Custom Function with complex logics

In [0]:
#Calculating age category from the given age of the customer
def pythonAgeCat(dfcol):
    dfcol=int(dfcol)
    if dfcol is None:
        return "Unknown"
    elif dfcol<=10:
        return "child"
    elif dfcol>10 and dfcol<=18:
        return "teenager"
    elif dfcol>18 and dfcol<=30:
        return "young"
    elif dfcol>30 and dfcol<=50:
        return "middleaged"
    else:
        return "senior"

In [0]:
from pyspark.sql.functions import udf
sparkudfageCat=udf(pythonAgeCat)
customdf=formateddf1.withColumn("agecat",sparkudfageCat("age"))
display(customdf.take(10))

####4. Data Curation/Processing (Pre Wrangling Stage) - Applying different levels of business logics, transformation, filtering, grouping, aggregation and limits applying different transformation functions<br/>
Select, Filter<br/>
Derive flags & Columns<br/>
Format<br/>
Group & Aggregate<br/>
Limit<br/>

1.Select, Filter<br/>
In terms of Performance Optimzation - I ensured to do Push Down Optimization by doing select(project) & Filter(predicate) of what ever the expected data

In [0]:
selectdf=customdf.select("custid","fullname",col("profession").alias("prof"),"sourcename","age","datadt","loaddt","shortprof","agecat")
display(selectdf.take(10))

selectdf=customdf.selectExpr("custid","fullname","profession as prof","sourcename","age","datadt","loaddt","shortprof","agecat")
display(selectdf.take(10))

#####Filter

In [0]:
filterdf=selectdf.filter((col("age")>40) & (col("age")<50))
display(filterdf.take(10))

filterdf=selectdf.where(col)
