In [0]:
rawdf2=spark.read.csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",header=True,inferSchema=True)
display(rawdf2.take(20))
#rawdf3=spark.read.csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",header=True,inferSchema=True)
#display(rawdf3.take(20)



In [0]:
rawdf1=spark.read.csv("/Volumes/workspace/default/volumewd36/ingestion_volume/source/custsmodified",header=False,inferSchema=True).toDF("id","firstname","lastname","age","profession")
rawdf1.show(20,False)
display(rawdf1.take(20))
display(rawdf1.sample(.01))
display(rawdf1.sample(.3))

In [0]:
#Important passive EDA structure functions we can use
#rawdf1.printSchema()#I am realizing the id & age columns are having some non numerical values (supposed to be numeric)
#rawdf1.show(50)
print(rawdf1.columns)#I am understanding the column numbers/order and the column names
print(rawdf1.dtypes)#Realizing the datatype of every columns (even we can do programattic column & type identification for dynamic programming)
for i in rawdf1.dtypes:
    print(i)
    if i[1]=='string':
            print(i[0])

print(rawdf1.schema)#To identify the structure of the data in the StructType and StructField format

In [0]:
#Important passive EDA data functions we can use
#We identified few patterns on this data
#1. Deduplication of rows and given column(s)
#2. Null values ratio across all columns
#3. Distribution (Dense) of the data across all number columns
#4. Min, Max values
#5. StdDeviation - 
#6. Percentile - Distribution percentage from 0 to 100 in 4 quadrants of 25%
print("actual count of the data",rawdf1.count())
print("de-duplicated record (all columns) count",rawdf1.distinct().count())#de duplicate the entire columns of the given  dataframe
print("de-duplicated record (all columns) count",rawdf1.dropDuplicates().count())#de duplicate the entire columns of the given  dataframe
print("de-duplicated given cid column count",rawdf1.dropDuplicates(['id']).count())#de duplicate the entire columns of the given  dataframe
display(rawdf1.describe())
display(rawdf1.summary())

###**b. Active Data Munging**
1. Combining Data + Schema Evolution/Merging (Structuring)
2. Validation, Cleansing, Scrubbing - Cleansing (removal of unwanted datasets), Scrubbing (convert raw to tidy)
3. De Duplication and Levels of Standardization () of Data to make it in a usable format (Dataengineers/consumers)

In [0]:
from pyspark.sql.session import SparkSession#15lakhs
spark=SparkSession.builder.appName("WD36 - ETL Pipeline - Bread & Butter").getOrCreate()#3 lakhs LOC by Databricks (for eg. display, delta, xml)

In [0]:
#Extraction (Ingestion) methodologies
#1. Single file
struct1="id string, firstname string, lastname string, age string, profession string"
rawdf1=spark.read.schema(struct1).csv(path="/Volumes/workspace/wd36schema/ingestion_volume/source/custsmodified")
#2. Multiple files (with same or different names)
rawdf1=spark.read.schema(struct1).csv(path=["/Volumes/workspace/wd36schema/ingestion_volume/source/custsmodified","/Volumes/workspace/wd36schema/ingestion_volume/source/custsmodified"])
#3. Multiple files in multiple paths or sub paths
rawdf1=spark.read.schema(struct1).csv(path=["/Volumes/workspace/wd36schema/ingestion_volume/source/","/Volumes/workspace/wd36schema/ingestion_volume/staging/"],recursiveFileLookup=True,pathGlobFilter="custsm*")


In [0]:
#Active Data munging...
#When you go for Schema Merging/Melting and Schema Evolution?
#Schema Merging/Melting (unionByName,allowMissingColumns)- If we get multiple files
#Schema Evolution (orc/parquet with mergeSchema) - If no. of columns are keeps added by the source system
#when we know structure of the file already - schema merge/ schema not known earlier  - schema evolution

#COMBINING OR SCHEMA MERGING or SCHEMA MELTING of Data from different sources(Important interview question also as like schema evolution...)
#4. Multiple files with different structure in multiple paths or sub paths
strt1="id string, firstname string, lastname string, age string, profession string"
rawdf1=spark.read.schema(strt1).csv(path=["/Volumes/workspace/wd36schema/ingestion_volume/source/"],recursiveFileLookup=True,pathGlobFilter="custsmodified_N*")
#recursiveFileLookup - This allows Spark to read files from a hierarchical folder structure, including all subfolders, using a single base path. 
strt2="id string, firstname string, age string, profession string,city string"
rawdf2=spark.read.schema(strt2).csv(path=["/Volumes/workspace/wd36schema/ingestion_volume/source/"],recursiveFileLookup=True,pathGlobFilter="custsmodified_T*")
display(rawdf1)
display(rawdf2)
rawdf_merged=rawdf1.union(rawdf2)#Use union only if the dataframes are having same columns in the same order with same datatype..
display(rawdf_merged)
#Expected right approach to follow
rawdf_merged=rawdf1.unionByName(rawdf2,allowMissingColumns=True)
display(rawdf_merged)

#Here, we are merging two files because both are in CSV format. If one file is CSV and the other file is in a different format, what should we do in this scenario? it will be handled automatically
#rawdf2.write.json("/Volumes/workspace/wd36schema/ingestion_volume/staging/csvjson")
rawdf3=spark.read.json("/Volumes/workspace/wd36schema/ingestion_volume/staging/csvjson")
rawdf_merged=rawdf_merged.unionByName(rawdf3,allowMissingColumns=True)
display(rawdf_merged)#Expected dataframe to proceed further munging on a single dataframe


In [0]:
#Validation by doing cleansing
from pyspark.sql.types import StructType,StructField,StringType,ShortType,IntegerType
#print(rawdf1.schema)
struttype1=StructType([StructField('id', IntegerType(), True), StructField('firstname', StringType(), True), StructField('lastname', StringType(), True), StructField('age', ShortType(), True), StructField('profession', StringType(), True)])
#method1 - permissive with all rows with respective nulls
cleandf1=spark.read.schema(struttype1).csv(path="/Volumes/workspace/wd36schema/ingestion_volume/source/custsmodified",mode='permissive')
print("after keeping nulls on the wrong data format",cleandf1.count())#all rows count
display(cleandf1)#We are making nulls where ever data format mismatch is there (cutting down mud portition from potato)
#or
#method2 - drop malformed rows
cleandf1=spark.read.schema(struttype1).csv(path="/Volumes/workspace/wd36schema/ingestion_volume/source/custsmodified",mode='dropMalformed')
print("after cleaning wrong data (type mismatch, column number mismatch)",len(cleandf1.collect()))
display(cleandf1)#We are removing the entire row, where ever data format mismatch is there (throwing away the entire potato)
print(cleandf1.count())#count will return the original count of the raw data
print(len(cleandf1.collect()))#collect+len will return the dropmalformed count of the raw data