In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.functions import row_number
from datetime import datetime

In [0]:
dt_string = datetime.now().strftime("%Y/%m/%d")

metadata_path = 'dbfs:/Metadata/Metacolumns__1_.csv'
Metadata = spark.read.csv(metadata_path, header='True')
df1_path = f"dbfs:/Bronze/2_WheelSales/{dt_string}"
df2_path = f"dbfs:/Bronze/4_WheelSales/{dt_string}"
parquet_path_df1 = 'dbfs:/Silver'
parquet_path_df2 = 'dbfs:/Silver'



In [0]:
ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load('dbfs:/Metadata/Metacolumns__1_.csv').withColumn('FILECOLUMNS', trim('FILECOLUMNS')).show(30)



+----+------+------------------+-----------------+------------+-----------------+--------------+--------------+--------------+
|  ID|Schema|         TableName|       ColumnName|    FILENAME|      FILECOLUMNS| RENAMECOLUMNS|SourceDataType|TargetDataType|
+----+------+------------------+-----------------+------------+-----------------+--------------+--------------+--------------+
|   1|   Dim|             Brand|            Brand|2-WheelSales|              OEM|         brand|        string|        string|
|   3|   Dim|              Fuel|         FuelType|2-WheelSales|            Model|          null|        string|        string|
|   4|   Dim|          Location|          Country|2-WheelSales|          Segment|          null|        string|        string|
|   4|   Dim|          Location|           Region|2-WheelSales|            Month|   monthofsale|        string|        string|
|   4|   Dim|          Location|             City|2-WheelSales|             Year|    yearofsale|           int|

In [0]:
def column_comparision(srcpath, filename, parquet_path):
    print("reading the source file")
    
    dfs = spark.read.format('csv').option('Header', True).option('inferSchema', True).load(srcpath)
    
    print("reading metacolumns file")

    ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load('dbfs:/Metadata/Metacolumns__1_.csv').withColumn('FILECOLUMNS', trim('FILECOLUMNS'))

    df_columns = set(map(str.lower, dfs.columns))
    print("DF Columns:", df_columns)

    ref_filter = ref_df.filter(col('FILENAME') == filename)

    ref_columns = set(map(str.lower, ref_filter.select('FILECOLUMNS').rdd.flatMap(lambda x: x).collect()))
    print("Ref Columns:", ref_columns)

    # Compare column orders
    if df_columns == ref_columns:
        print("Column orders match. and writing to parquet path")

        timestamp = datetime.now().strftime("%Y%m/%d")
    
        dfs.coalesce(1).write.mode('overwrite').option("header", "true").format('parquet').save(f"{parquet_path}/{filename}/{timestamp}")
        print("done written to parquet file...")

    else:
        print("Column orders differ:")
        missmatched_cols = ref_columns - df_columns
        print(missmatched_cols)
        error_message = "Column names don't match for {} and missing columns are {}: ".format(filename,)
        print("DF columns:", df_columns)
        print("Ref columns:", ref_columns)

        raise ValueError(error_message)


In [0]:
column_comparision(df1_path, '2-WheelSales', parquet_path_df1)
column_comparision(f"dbfs:/Bronze/4_WheelSales/{dt_string}", '4-WheelSales', parquet_path_df2)

reading the source file
reading metacolumns file
DF Columns: {'year', 'region', 'segment', 'enginecapacity', 'mileage', 'model', 'model price', 'dealer location', 'oem', 'month', 'noofsales'}
Ref Columns: {'year', 'region', 'segment', 'enginecapacity', 'mileage', 'model', 'model price', 'dealer location', 'oem', 'month', 'noofsales'}
Column orders match. and writing to parquet path
done written to parquet file...
reading the source file
reading metacolumns file
DF Columns: {'no of cylinders', 'body type', 'month of sale', 'fuel capacity (l)', 'mileage', 'brand', 'country', 'model', 'city', 'transmission', 'engine', 'price range', 'seating capacity', 'year of sale', 'fuel type', 'top_speed in km/h'}
Ref Columns: {'no of cylinders', 'body type', 'month of sale', 'fuel capacity (l)', 'mileage', 'brand', 'country', 'model', 'city', 'transmission', 'engine', 'price range', 'seating capacity', 'year of sale', 'fuel type', 'top_speed in km/h'}
Column orders match. and writing to parquet path


In [0]:
# column_comparision(srcpath= 'dbfs:/FileStore/2-WheelSales/2_WheelSales.csv', filename='2-WheelSales', parquet_path = 'dbfs:/project')

In [0]:
# meta_path = 'dbfs:/FileStore/Metacolumns-2.csv'

def schema_comparision(srcpath, filename, parquet_path):
    print("reading the source file")
    
    dfs = spark.read.format('csv').option('Header', True).option('inferSchema', True).load(srcpath)
    
    print("reading metacolumns file")

    # ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load('dbfs:/Metadata/Metacolumns__1_.csv')
    ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load('dbfs:/Metadata/Metacolumns__1_.csv').withColumn('FILECOLUMNS', trim('FILECOLUMNS'))

    ref_filter = ref_df.filter(col('FILENAME') == filename)
 
    for x in ref_filter.collect():
        columnNames = x['FILECOLUMNS']
        # print(columnNames)
        refTypes = x['SourceDataType']
        # print(refTypes)

        columnNamesList = [x.strip().lower() for x in columnNames.split(",")]
                
        refTypesList = [x.strip().lower() for x in refTypes.split(",")]
        #print(refTypesList)

        dfsTypes = dfs.schema[columnNames].dataType.simpleString() #StringType() : string , IntergerType() : int
        
        dfsTypesList = [x.strip().lower() for x in dfsTypes.split(",")]
        
        # columnName : Row id, DataFrameType : int, reftype: int
        
        missmatchedcolumns = [(col_name, df_types, ref_types) for (col_name, df_types, ref_types) in zip(columnNamesList, dfsTypesList, refTypesList) if dfsTypesList != refTypesList]

        if missmatchedcolumns :
            error_msg = "Schema mismatch for {} in the following columns:".format(filename)
            print("schema comparision has been failed or missmatched for this {}".format(filename))
            
            for col_name, df_types, ref_types in missmatchedcolumns:
                print(f"columnName : {col_name}, DataFrameType : {df_types}, referenceType : {ref_types}")
            
            raise ValueError(error_msg)
            # error_message = "Column names don't match for {} and missing columns are: {}".format(filename, missing_columns)
                
                
        else:
            print("Schema comaprision is done and success for {}".format(filename))
            print("writing to the parquet path...")
            timestamp = datetime.now().strftime("%Y%m/%d")
            dfs.coalesce(1).write.mode('overwrite').option("header", "true").format('parquet').save(f"{parquet_path}/{filename}/{timestamp}")
            print("done written to parquet file...")


In [0]:
schema_comparision(df1_path, '2-WheelSales', parquet_path_df1)
schema_comparision(df2_path, '4-WheelSales', parquet_path_df2)

reading the source file
reading metacolumns file
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...
Schema comaprision is done and success for 2-WheelSales
writing to the parquet path...
done written to parquet file...

In [0]:
def fill_nulls_duplicates(srcpath, filename, parquet_path):

    print("reading the source file")
    
    df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load(srcpath)
    
    print("reading metacolumns file")

    # ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load('dbfs:/FileStore/Metacolumns-1.csv')
    ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load('dbfs:/Metadata/Metacolumns__1_.csv').withColumn('FILECOLUMNS', trim('FILECOLUMNS'))

    # Metadata = ref_df.filter((F.col('FILENAME') == filename))

    # total = len(df.columns) + Metadata.select('FILECOLUMNS').count()
    
    # Metadata = Metadata.withColumn("ColumnName", lower(col("ColumnName")))
    # Metadata = Metadata.withColumn("SourceDataType", lower(col("SourceDataType")))
    # Metadata = Metadata.withColumn("TargetDataType", lower(col("TargetDataType")))
    
    for col_name, data_type in zip(df.columns, df.dtypes):
        if data_type[1] == 'string':
            df = df.fillna('unknown', subset=[col_name])
        elif data_type[1] == 'int':
            df = df.fillna(0, subset=[col_name])

    null_counts = {col_name: df.filter(col(col_name).isNull()).count() for col_name in df.columns}
    for col_name, count in null_counts.items():
        print(f"Null count in {col_name}: {count}")

    print("initial df count:", df.count())
    count_duplicates_i = df.groupBy(df.columns).count().filter("count>=1")
    count_duplicates_i.show()
    print('count_duplicates_i', count_duplicates_i.count())
    df = df.dropDuplicates()
    count_duplicates = df.groupBy(df.columns).count().filter("count>1")
    print('Duplicate rows count:')
    count_duplicates.show()

    timestamp = datetime.now().strftime("%Y%m/%d")
    
    df.coalesce(1).write.mode('overwrite').option("header", "true").format('parquet').save(f"{parquet_path}/{filename}/{timestamp}")

    print("return to parquet done...")
    

In [0]:
fill_nulls_duplicates(df1_path, '2-WheelSales', parquet_path_df1)
fill_nulls_duplicates(df2_path, '4-WheelSales', parquet_path_df2)

reading the source file
reading metacolumns file
Null count in OEM: 0
Null count in Model: 0
Null count in Segment: 0
Null count in Month: 0
Null count in Year: 0
Null count in NoOfSales: 0
Null count in EngineCapacity: 0
Null count in Mileage: 0
Null count in Model Price: 0
Null count in Dealer location: 0
Null count in Region: 0
initial df count: 25346
+-------------+---------------+-----------+-----+----+---------+--------------+----------+-------------------+---------------+------+-----+
|          OEM|          Model|    Segment|Month|Year|NoOfSales|EngineCapacity|   Mileage|        Model Price|Dealer location|Region|count|
+-------------+---------------+-----------+-----+----+---------+--------------+----------+-------------------+---------------+------+-----+
|Royal Enfield|         Bullet|Performance|  Aug|2022|     7618|           350|     35-40|1,56,800 - 1,85,800|          Delhi| North|    1|
|        Honda|           Livo| Motorcycle|  Dec|2022|     4587|        109.51|    

In [0]:
def read_and_rename(srcpath, metapath, filename, parquet_path):
    """
    srcpath : source_filepath, 
    metapath : path of metacolumns file 
    filename : is used to filter Rename and filecolumns according to file
    """
    print("reading the source file")
    df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load(srcpath)
    print("reading metacolumns file")
    ref_df = spark.read.format('csv').option('Header', True).option('inferSchema', True).load(metapath)

    reference_df = ref_df.filter((F.col('FILENAME') == filename) & (F.col('RENAMECOLUMNS')!=' ')).select('FILECOLUMNS', 'RENAMECOLUMNS')
    print("define a mapping condition")
    mappings = {row['FILECOLUMNS']: row['RENAMECOLUMNS'] for row in reference_df.collect()}

    for original_col, new_col in mappings.items():
        df = df.withColumnRenamed(original_col, new_col)
        print("done")
    
    timestamp = datetime.now().strftime("%Y%m/%d")
    
    df.coalesce(1).write.mode('overwrite').option("header", "true").format('parquet').save(f"{parquet_path}/{filename}/{timestamp}")
    
    return df

In [0]:
df1 = read_and_rename(df1_path, 'dbfs:/Metadata/Metacolumns__1_.csv', '2-WheelSales',parquet_path_df1)
df2 = read_and_rename(df2_path, 'dbfs:/Metadata/Metacolumns__1_.csv', '4-WheelSales',parquet_path_df2)


reading the source file
reading metacolumns file
define a mapping condition
done
done
done
done
reading the source file
reading metacolumns file
define a mapping condition
done
done


In [0]:
from pyspark.sql.functions import col, trim, when, to_date
from datetime import datetime

def date_comparison(srcpath, metapath, filename):
    try:
        dfs = spark.read.csv(srcpath, header=True, inferSchema=True)
        ref_df = spark.read.csv(metapath, header=True, inferSchema=True).withColumn('FILECOLUMNS', trim('FILECOLUMNS'))
        ref_filter = ref_df.filter(col('FILENAME') == filename)
        date_columns_ref = [row['FILECOLUMNS'] for row in ref_filter.select('FILECOLUMNS').filter(col('FILECOLUMNS').contains('Date')).collect()]
        date_columns_dfs = [x for x in dfs.columns if 'Date' in x]

        if date_columns_ref:
            for date_column in date_columns_ref:
                if date_column in date_columns_dfs:
                    date_format = 'yyyy-MM-dd'
                    dfs = dfs.withColumn('bad_records', when(col(date_column).isNull(), False).when(to_date(col(date_column), date_format).isNotNull(), 'Matched').otherwise('Not Matched'))
                else:
                    print(f"No date column found in the {filename}")

            bad_records_df = dfs.filter(col('bad_records') == 'Not Matched')
            good_records_df = dfs.filter(col('bad_records') == 'Matched')
            dt_string = datetime.now().strftime("%Y%m%d")
            output_bad_path = f'dbfs:/BadRecords/{filename}-DateMismatched/{dt_string}'
            output_good_path = f'dbfs:/GoodRecords/{filename}/{dt_string}'

            if bad_records_df.count() >= 1:
                bad_records_df.coalesce(1).write.mode('overwrite').option("header", "true").parquet(output_bad_path)
            else:
                print("No bad records found.")
            if good_records_df.count() >= 1:
                good_records_df.coalesce(1).write.mode('overwrite').option("header", "true").parquet(output_good_path)
            else:
                print("No good records found.")
        else:
            print("No date columns found in metadata for the given filename.")

    except Exception as e:
        print(f'An error occurred while processing date_comparison: {str(e)}')



In [0]:
df1 = date_comparison(df1_path, 'dbfs:/Metadata/Metacolumns__1_.csv', '2-WheelSales')
df2 = date_comparison(df2_path, 'dbfs:/Metadata/Metacolumns__1_.csv', '4-WheelSales')

No date columns found in metadata for the given filename.
No date columns found in metadata for the given filename.


In [0]:
# # dbutils.fs.mkdirs('BadRecords')
# dbutils.fs.ls('/')
# # path='dbfs:/BadRecords/'

Out[15]: [FileInfo(path='dbfs:/BadRecords/', name='BadRecords/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Bronze/', name='Bronze/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/', name='FileStore/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Gold/', name='Gold/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Landing/', name='Landing/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Metadata/', name='Metadata/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Silver/', name='Silver/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/', name='databricks-datasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-results/', name='databricks-results/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/delta_tables/', name='delta_tables/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/', name='user/', size=0, modificationTime=0)]

In [0]:
# from datetime import datetime 
# dt_string = datetime.now().strftime("%Y/%m/%d")
# # metadata_path = 'dbfs:/Metadata/Metacolumns__1_.csv'
# # Metadata = spark.read.csv(metadata_path, header='True')
# # df1_path = f"dbfs:/Bronze/2_WheelSales/{dt_string}"
# df2_path = spark.read.csv(f"dbfs:/Bronze/4_WheelSales/{dt_string}", header='True', inferSchema= 'True')

# print('df2_path', df2_path.count())

# df3 = df2_path.dropDuplicates()
# print("df3 count::", df3.count())

# # dbfs:/Bronze/4_WheelSales/2024/03/25
# # parquet_path_df1 = 'dbfs:/Silver'
# # parquet_path_df2 = 'dbfs:/Silver'

df2_path 91440
df3 count:: 91440


In [0]:
# column_comparision(df1_path, '2-WheelSales', parquet_path_df1)
# column_comparision(df2_path, '4-WheelSales', parquet_path_df2)

In [0]:
# schema_comparision(df1_path, '2-WheelSales', parquet_path_df1)
# schema_comparision(df2_path, '4-WheelSales', parquet_path_df2)

In [0]:
# fill_nulls_duplicates(df1_path, '2-WheelSales', parquet_path_df1)
# fill_nulls_duplicates(df2_path, '4-WheelSales', parquet_path_df2)

In [0]:
# df1 = read_and_rename(df1_path, metadata_path, '2-WheelSales',parquet_path_df1)
# df2 = read_and_rename(df2_path, metadata_path, '4-WheelSales',parquet_path_df2)

[0;36m  File [0;32m<command-3918072940602230>:2[0;36m[0m
[0;31m    df2 = read_and_rename(df2_path, metadata_path, '4-WheelSales',parquet_path_df2)[0m
[0m    ^[0m
[0;31mIndentationError[0m[0;31m:[0m unexpected indent
