
Data profiling is critical for the success of the data loads.

Profile across file is more important.

In many scenarios, I have observed that the attribute behavior change between multiple files sent.

There can be multiple reasons for that - 
e.g. 
1. Change in upstream environment DEV/UAT/PROD i.e. first file was generated from DEV env and later from different env and hence data difference may occur
2. Change in business logic
3. 

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName('PySpark_Tutorial').getOrCreate()

In [3]:
spark

In [4]:
import platform, sys, os
print('Platform = ',platform.platform())  
print('Version of Spark = ',spark.version)
print('Python version = ',sys.version)

Platform =  Windows-10-10.0.19041-SP0
Version of Spark =  3.2.0
Python version =  3.7.11 (default, Jul 27 2021, 09:42:29) [MSC v.1916 64 bit (AMD64)]


In [5]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col

def dataprofile(data_all_df,data_cols):
    data_df = data_all_df.select(data_cols)
    columns2Bprofiled = data_df.columns
    global schema_name, table_name
    if not 'schema_name' in globals():
        schema_name = 'schema_name'
    if not 'table_name' in globals():
        table_name = 'table_name' 
    dprof_df = pd.DataFrame({'schema_name':[schema_name] * len(data_df.columns),\
                             'table_name':[table_name] * len(data_df.columns),\
                             'column_names':data_df.columns,\
                             'data_types':[x[1] for x in data_df.dtypes]}) 
    dprof_df = dprof_df[['schema_name','table_name','column_names', 'data_types']]
    dprof_df.set_index('column_names', inplace=True, drop=False)
    # ======================
    num_rows = data_df.count()
    dprof_df['num_rows'] = num_rows
    # ======================    
    # number of rows with nulls and nans   
    df_nacounts = data_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_df.columns \
                                  if data_df.select(c).dtypes[0][1]!='timestamp']).toPandas().transpose()
    df_nacounts = df_nacounts.reset_index()  
    df_nacounts.columns = ['column_names','num_null']
    dprof_df = pd.merge(dprof_df, df_nacounts, on = ['column_names'], how = 'left')
    # ========================
    # number of rows with white spaces (one or more space) or blanks
    num_spaces = [data_df.where(F.col(c).rlike('^\\s+$')).count() for c in data_df.columns]
    dprof_df['num_spaces'] = num_spaces
    num_blank = [data_df.where(F.col(c)=='').count() for c in data_df.columns]
    dprof_df['num_blank'] = num_blank
    # =========================
    # using the in built describe() function 
    desc_df = data_df.describe().toPandas().transpose()
    desc_df.columns = ['count', 'mean', 'stddev', 'min', 'max']
    desc_df = desc_df.iloc[1:,:]  
    desc_df = desc_df.reset_index()  
    desc_df.columns.values[0] = 'column_names'  
    desc_df = desc_df[['column_names','count', 'mean', 'stddev']] 
    dprof_df = pd.merge(dprof_df, desc_df , on = ['column_names'], how = 'left')
    # ===========================================
    allminvalues = [data_df.select(F.min(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmaxvalues = [data_df.select(F.max(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmincounts = [data_df.where(col(x) == y).count() for x,y in zip(columns2Bprofiled, allminvalues)]
    allmaxcounts = [data_df.where(col(x) == y).count() for x,y in zip(columns2Bprofiled, allmaxvalues)]    
    df_counts = dprof_df[['column_names']]
    df_counts.insert(loc=0, column='min', value=allminvalues)
    df_counts.insert(loc=0, column='counts_min', value=allmincounts)
    df_counts.insert(loc=0, column='max', value=allmaxvalues)
    df_counts.insert(loc=0, column='counts_max', value=allmaxcounts)
    df_counts = df_counts[['column_names','min','counts_min','max','counts_max']]
    dprof_df = pd.merge(dprof_df, df_counts , on = ['column_names'], how = 'left')  
    # ==========================================
    # number of distinct values in each column
    dprof_df['num_distinct'] = [data_df.select(x).distinct().count() for x in columns2Bprofiled]
    # ============================================
    # most frequently occuring value in a column and its count
    dprof_df['most_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=False).limit(1).\
                                       toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['most_freq_value'] = [x[0] for x in dprof_df['most_freq_valwcount']]
    dprof_df['most_freq_value_count'] = [x[1] for x in dprof_df['most_freq_valwcount']]
    dprof_df = dprof_df.drop(['most_freq_valwcount'],axis=1)
    # least frequently occuring value in a column and its count
    dprof_df['least_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=True).limit(1).\
                                        toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['least_freq_value'] = [x[0] for x in dprof_df['least_freq_valwcount']]
    dprof_df['least_freq_value_count'] = [x[1] for x in dprof_df['least_freq_valwcount']]
    dprof_df = dprof_df.drop(['least_freq_valwcount'],axis=1)

    return dprof_df

In [6]:
import pandas as pd

In [8]:
df = spark.read.text(r'file:/D:/Projects/saket1471/learn-spark/data/netflix_titles.csv')

In [9]:
df.show()

+--------------------+
|               value|
+--------------------+
|show_id,type,titl...|
|s1,Movie,Dick Joh...|
|s2,TV Show,Blood ...|
|s3,TV Show,Gangla...|
|s4,TV Show,Jailbi...|
|s5,TV Show,Kota F...|
|s6,TV Show,Midnig...|
|s7,Movie,My Littl...|
|s8,Movie,Sankofa,...|
|s9,TV Show,The Gr...|
|s10,Movie,The Sta...|
|s11,TV Show,"Vend...|
|s12,TV Show,Bangk...|
|s13,Movie,Je Suis...|
|s14,Movie,Confess...|
|s15,TV Show,Crime...|
|s16,TV Show,Dear ...|
|s17,Movie,Europe'...|
|s18,TV Show,Falsa...|
|s19,Movie,Intrusi...|
+--------------------+
only showing top 20 rows



In [10]:
df = spark.read.text(r'file:/D:/Projects/saket1471/learn-spark/data/netflix_titles.csv', sep=",")

TypeError: text() got an unexpected keyword argument 'sep'

In [11]:
help(spark.read.text)

Help on method text in module pyspark.sql.readwriter:

text(paths, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads text files and returns a :class:`DataFrame` whose schema starts with a
    string column named "value", and followed by partitioned columns if there
    are any.
    The text files must be encoded as UTF-8.
    
    By default, each line in the text file is a new row in the resulting DataFrame.
    
    .. versionadded:: 1.6.0
    
    Parameters
    ----------
    paths : str or list
        string, or list of strings, for input path(s).
    
    Other Parameters
    ----------------
    Extra options
        For the extra options, refer to
        `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option>`_
        in the version you use.
    
        .. # noqa
    
    Examples
    ------

In [13]:
df = spark.read.text(r'file:/D:/Projects/saket1471/learn-spark/data/netflix_titles.csv').option(sep,",")

AttributeError: 'DataFrame' object has no attribute 'option'

In [16]:
df = spark.read.options(delimiter=",", header=True).text(r'file:/D:/Projects/saket1471/learn-spark/data/netflix_titles.csv')

In [17]:
df.show()

+--------------------+
|               value|
+--------------------+
|show_id,type,titl...|
|s1,Movie,Dick Joh...|
|s2,TV Show,Blood ...|
|s3,TV Show,Gangla...|
|s4,TV Show,Jailbi...|
|s5,TV Show,Kota F...|
|s6,TV Show,Midnig...|
|s7,Movie,My Littl...|
|s8,Movie,Sankofa,...|
|s9,TV Show,The Gr...|
|s10,Movie,The Sta...|
|s11,TV Show,"Vend...|
|s12,TV Show,Bangk...|
|s13,Movie,Je Suis...|
|s14,Movie,Confess...|
|s15,TV Show,Crime...|
|s16,TV Show,Dear ...|
|s17,Movie,Europe'...|
|s18,TV Show,Falsa...|
|s19,Movie,Intrusi...|
+--------------------+
only showing top 20 rows



In [18]:
df = spark.read.options(delimiter=",", header=True).csv(r'file:/D:/Projects/saket1471/learn-spark/data/netflix_titles.csv')

In [21]:
df.columns

['show_id',
 'type',
 'title',
 'director',
 'cast',
 'country',
 'date_added',
 'release_year',
 'rating',
 'duration',
 'listed_in',
 'description']

In [19]:
df.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

In [20]:
# Driver code for the data profle function
import time
start = time.time()
cols2profile = df.columns  # select all or some columns from the table
dprofile = dataprofile(df, cols2profile)
end = time.time()
print('Time taken to execute dataprofile function ', (end - start)/60,' minutes')

ValueError: 'column_names' is both an index level and a column label, which is ambiguous.