In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

expected_schema = StructType([
    StructField("ACCT_CD", IntegerType(), nullable=True),
    StructField("EXT_SEC_ID", IntegerType(), nullable=True),
    StructField("ASOF_DATE", StringType(), nullable=True),
    StructField("LONG_SHORT_IND", StringType(), nullable=True),
    StructField("QTY_SOD", IntegerType(), nullable=True),
    StructField("COST", DoubleType(), nullable=True),
    StructField("UNIT_COST", DoubleType(), nullable=True),
    StructField("MKT_VAL_SOD", DoubleType(), nullable=True),
    StructField("UNREALIZED_GAIN_LOSS", StringType(), nullable=True)
])

df = spark.read.csv("CRD_Position_202212124.csv", nullValue="", header=True, inferSchema=True)

print('Data frame schema: ', df.schema)
print('Expected Schema: ', expected_schema)

if df.schema == expected_schema:
    print("Schema validation successful")
else:
    mismatched_row = df.filter(~(df["ACCT_CD"].cast(IntegerType()).isNotNull() &
                                 df["EXT_SEC_ID"].cast(StringType()).isNotNull() &
                                 df["ASOF_DATE"].cast(IntegerType()).isNotNull()&
                                 df["LONG_SHORT_IND"].cast(IntegerType()).isNotNull() &
                                 df["QTY_SOD"].cast(StringType()).isNotNull() &
                                 df["COST"].cast(IntegerType()).isNotNull()&
                                 df["UNIT_COST"].cast(IntegerType()).isNotNull() &
                                 df["MKT_VAL_SOD"].cast(StringType()).isNotNull() &
                                 df["UNREALIZED_GAIN_LOSS"].cast(IntegerType()).isNotNull()))
    if  mismatched_row.count() > 0:
        print("Schema validation failed.")
        print("Mismatched row:")
        mismatched_row.show()
        mismatched_columns = set(df.schema.fieldNames()).symmetric_difference(set(expected_schema.fieldNames()))
    if  len(mismatched_columns)>0:
        print("Schema validation failed.")
        print("Mismatched columns: ", mismatched_columns)
        print("Sample data from mismatched columns:")
        # Show sample data from mismatched columns
        for column_name in mismatched_columns:
            print(f"Column: {column_name}")
            df.select(column_name).show(10, truncate=False)

# if all(col in df.columns for col in expected_schema.fieldNames()):
#     mismatched_row = df.filter(~(df["Id"].cast(IntegerType()).isNotNull() &
#                                  df["Name"].cast(StringType()).isNotNull() &
#                                  df["Name_id"].cast(IntegerType()).isNotNull()))

#     if mismatched_row.count() > 0:
#         print("Schema validation failed.")
#         print("Mismatched row:")
#         mismatched_row.show()
#     else:
#         print("Schema validation successful")
# else:
#     print("Schema validation failed. Mismatched columns.")


Data frame schema:  StructType([StructField('ACCT_CD', IntegerType(), True), StructField('EXT_SEC_ID', IntegerType(), True), StructField('ASOF_DATE', StringType(), True), StructField('LONG_SHORT_IND', StringType(), True), StructField('QTY_SOD', IntegerType(), True), StructField('COST', DoubleType(), True), StructField('UNIT_COST', DoubleType(), True), StructField('MKT_VAL_SOD', DoubleType(), True), StructField('UNREALIZED_GAIN_LOSS', StringType(), True), StructField('ACCRUED_INTEREST', StringType(), True)])
Expected Schema:  StructType([StructField('ACCT_CD', IntegerType(), True), StructField('EXT_SEC_ID', IntegerType(), True), StructField('ASOF_DATE', StringType(), True), StructField('LONG_SHORT_IND', StringType(), True), StructField('QTY_SOD', IntegerType(), True), StructField('COST', DoubleType(), True), StructField('UNIT_COST', DoubleType(), True), StructField('MKT_VAL_SOD', DoubleType(), True), StructField('UNREALIZED_GAIN_LOSS', StringType(), True)])
Schema validation failed.
Mis

In [11]:
ACCT_CD	EXT_SEC_ID	ASOF_DATE	LONG_SHORT_IND	QTY_SOD	COST	UNIT_COST	MKT_VAL_SOD	UNREALIZED_GAIN_LOSS	ACCRUED_INTEREST


SyntaxError: invalid syntax (726517269.py, line 1)

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

expected_schema = StructType([
    StructField("ACCT_CD", IntegerType(), nullable=True),
    StructField("EXT_SEC_ID", IntegerType(), nullable=True),
    StructField("ASOF_DATE", StringType(), nullable=True),
    StructField("LONG_SHORT_IND", StringType(), nullable=True),
    StructField("QTY_SOD", IntegerType(), nullable=True),
    StructField("COST", DoubleType(), nullable=True),
    StructField("UNIT_COST", DoubleType(), nullable=True),
    StructField("MKT_VAL_SOD", DoubleType(), nullable=True),
    StructField("UNREALIZED_GAIN_LOSS", StringType(), nullable=True)
])

df = spark.read.csv("CRD_Position_202212124.csv", nullValue="", header=True, inferSchema=True)

print('Data frame schema: ', df.schema)
print('Expected Schema: ', expected_schema)

try:
    if (col in df.columns for col in expected_schema.fieldNames()):
        mismatched_row = df.filter(~(df["ACCT_CD"].cast(IntegerType()).isNotNull() &
                                    df["EXT_SEC_ID"].cast(IntegerType()).isNotNull() &
                                    df["ASOF_DATE"].cast(StringType()).isNotNull()&
                                    df["LONG_SHORT_IND"].cast(StringType()).isNotNull() &
                                    df["QTY_SOD"].cast(IntegerType()).isNotNull() &
                                    df["COST"].cast(DoubleType()).isNotNull()&
                                    df["UNIT_COST"].cast(DoubleType()).isNotNull() &
                                    df["MKT_VAL_SOD"].cast(DoubleType()).isNotNull() &
                                    df["UNREALIZED_GAIN_LOSS"].cast(StringType()).isNotNull()))
        if  mismatched_row.count() > 0:
            print("Schema validation failed.")
            print("Mismatched row:")
            mismatched_row.show()
        else:
            print("Schema validation successful.")
    else:
        print("Schema validation failed.")
except : 
    print("")


Data frame schema:  StructType([StructField('ACCT_CD', IntegerType(), True), StructField('ASOF_DATE', IntegerType(), True), StructField('LONG_SHORT_IND', StringType(), True), StructField('QTY_SOD', StringType(), True), StructField('COST', IntegerType(), True), StructField('UNIT_COST', DoubleType(), True), StructField('MKT_VAL_SOD', DoubleType(), True), StructField('UNREALIZED_GAIN_LOSS', DoubleType(), True), StructField('ACCRUED_INTEREST', StringType(), True)])
Expected Schema:  StructType([StructField('ACCT_CD', IntegerType(), True), StructField('EXT_SEC_ID', IntegerType(), True), StructField('ASOF_DATE', StringType(), True), StructField('LONG_SHORT_IND', StringType(), True), StructField('QTY_SOD', IntegerType(), True), StructField('COST', DoubleType(), True), StructField('UNIT_COST', DoubleType(), True), StructField('MKT_VAL_SOD', DoubleType(), True), StructField('UNREALIZED_GAIN_LOSS', StringType(), True)])


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `EXT_SEC_ID` cannot be resolved. Did you mean one of the following? [`ACCT_CD`, `ASOF_DATE`, `LONG_SHORT_IND`, `QTY_SOD`, `COST`, `UNIT_COST`, `MKT_VAL_SOD`, `UNREALIZED_GAIN_LOSS`, `ACCRUED_INTEREST`].

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import to_date
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

constraint_expr = functions.expr("LONG_SHORT_IND IN ('L', 'S')")

expected_schema = StructType([
    StructField("ACCT_CD", IntegerType(), nullable=True),
    StructField("EXT_SEC_ID", IntegerType(), nullable=True),
    # StructField("ASOF_DATE", StringType(), nullable=True),
    StructField("ASOF_DATE", DateType(), nullable=True),
    StructField("LONG_SHORT_IND", StringType(), nullable=True, metadata={"constraint": constraint_expr}),
    StructField("QTY_SOD", IntegerType(), nullable=True),
    StructField("COST", DoubleType(), nullable=True),
    StructField("UNIT_COST", DoubleType(), nullable=True),
    StructField("MKT_VAL_SOD", DoubleType(), nullable=True),
    StructField("UNREALIZED_GAIN_LOSS", StringType(), nullable=True)
])

df = spark.read.csv("CRD_Position_202212124.csv", nullValue="", header=True, inferSchema=True)

print('Data frame schema: ', df.schema)
print('Expected Schema: ', expected_schema)

# Check if all expected columns exist in the DataFrame
expected_columns = set([field.name for field in expected_schema.fields])
df_columns = set(df.columns)

if expected_columns.issubset(df_columns):
    # Perform schema validation
    allowed_values = ['L','S']
    mismatched_row = df.filter(~(
        df["ACCT_CD"].cast(IntegerType()).isNotNull() &
        df["EXT_SEC_ID"].cast(IntegerType()).isNotNull() &
        to_date(df["ASOF_DATE"], "dd/MM/yyyy").isNotNull()&
        # df["ASOF_DATE"].cast(StringType()).isNotNull() &
        df["LONG_SHORT_IND"].cast(StringType()).isin(allowed_values) &
        df["QTY_SOD"].cast(IntegerType()).isNotNull() &
        df["COST"].cast(DoubleType()).isNotNull() &
        df["UNIT_COST"].cast(DoubleType()).isNotNull() &
        df["MKT_VAL_SOD"].cast(DoubleType()).isNotNull() &
        df["UNREALIZED_GAIN_LOSS"].cast(StringType()).isNotNull()
    ))
    
    if mismatched_row.count() > 0:
        print("Schema validation failed.")
        print("Mismatched row:")
        mismatched_row.show()
        mismatched_rows_str = "\n".join([str(row) for row in mismatched_row.collect()])
        mismatched_row.write.csv("erroneous_rows.csv", header=True, mode="overwrite")
        error_message = f"Schema validation failed. Mismatched rows:\n{mismatched_rows_str}"
        with open("error_message.txt", "w+") as file:
            file.write(error_message)
        cleaned_df = df.exceptAll(mismatched_row)
        # cleaned_df = df.dropna(subset=[col.name for col in expected_schema.fields])
        print(cleaned_df.show())

    else:
        print("Schema validation successful.")
else:
    print("Schema validation failed. Column mismatch between DataFrame and expected schema.")


Data frame schema:  StructType([StructField('ACCT_CD', IntegerType(), True), StructField('EXT_SEC_ID', StringType(), True), StructField('ASOF_DATE', StringType(), True), StructField('LONG_SHORT_IND', StringType(), True), StructField('QTY_SOD', IntegerType(), True), StructField('COST', DoubleType(), True), StructField('UNIT_COST', DoubleType(), True), StructField('MKT_VAL_SOD', DoubleType(), True), StructField('UNREALIZED_GAIN_LOSS', StringType(), True), StructField('ACCRUED_INTEREST', StringType(), True)])
Expected Schema:  StructType([StructField('ACCT_CD', IntegerType(), True), StructField('EXT_SEC_ID', IntegerType(), True), StructField('ASOF_DATE', DateType(), True), StructField('LONG_SHORT_IND', StringType(), True), StructField('QTY_SOD', IntegerType(), True), StructField('COST', DoubleType(), True), StructField('UNIT_COST', DoubleType(), True), StructField('MKT_VAL_SOD', DoubleType(), True), StructField('UNREALIZED_GAIN_LOSS', StringType(), True)])
Schema validation failed.
Mismat

In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

directory_path = "testcsv/"
file_pattern = "De*.csv"
full_path = f"{directory_path}/{file_pattern}"
df = spark.read.csv(full_path, header=True, inferSchema=True)
df.count()

100

In [8]:
import pandas as pd
import datetime as dt
from pandas_schema import Column, Schema
from pandas_schema.validation import CanConvertValidation, DateFormatValidation, InListValidation, CustomElementValidation

start_time = dt.datetime.now()

df = pd.read_csv("erroneous_CRD_Position_202212124_200.csv")

def validate_not_empty(value):
    if isinstance(value, str):
        return value.strip() != '' 
    else:
        return pd.notnull(value)

validation_functions = CustomElementValidation(validate_not_empty, 'Mandatory col should not be empty')

expected_schema = Schema([
    Column("ACCT_CD", [CanConvertValidation(int), validation_functions]),
    Column("EXT_SEC_ID", [CanConvertValidation(int)]),
    Column("ASOF_DATE", [DateFormatValidation(date_format='%Y-%m-%d %H:%M:%S.%f')]),
    Column("LONG_SHORT_IND", [InListValidation(['L','S']), validation_functions]),
    Column("QTY_SOD", [CanConvertValidation(int)]),
    Column("COST", [CanConvertValidation(float)]),
    Column("UNIT_COST", [CanConvertValidation(float)]),
    Column("MKT_VAL_SOD", [CanConvertValidation(float)])
])

# Validate DataFrame against the expected schema
errors = expected_schema.validate(df, columns=['ACCT_CD', 'EXT_SEC_ID', 'ASOF_DATE', 'LONG_SHORT_IND', 'QTY_SOD', 'COST', 'UNIT_COST', 'MKT_VAL_SOD'])
# erdf = pd.DataFrame(errors)
# print(erdf)
if errors:
    erdf = pd.DataFrame(errors)
    print(erdf)
    # erdf.to_('err_message_csv.csv')
    print(erdf.loc[:])
    # erstr = ''+f"Row: {erdf[0].loc[:]}, Column: {erdf[1].loc[:]} - Error: {erdf[2].loc[:]}"
    # erstr = ''+ erdf.to_string
    # error_file = open("err_mes.txt", "w")
    # error_file.write(erstr + "\n")
    
    # error_message = f"Schema validation failed. Mismatched rows:\n{erdf}"
    with open("err_mes.txt", "w+") as file:
        file.write(erdf.to_string())
    # Create a file for writing errors into
    error_file = open("err_messages.txt", "w")
    
    # Create an empty DataFrame to store erroneous rows
    erroneous_rows = pd.DataFrame(columns=df.columns)
    
    # Create a set to store the indices of erroneous rows
    erroneous_indices = set()
    
    print("Schema validation failed. Mismatched rows:")
    print(type(errors))
    for error in errors:
        error_message = f"Row: {error.row}, Column: {error.column} - Error: {error.message}"
        print(error_message)
        
        error_file.write(error_message + "\n")
        
        if error.row not in erroneous_indices:
            # Collect the erroneous row into the DataFrame
            erroneous_rows = erroneous_rows._append(df.iloc[error.row])
            erroneous_indices.add(error.row)
            
    erroneous_rows.to_csv("er_rows.csv", index=False)
    error_file.close()
    
    df = df.drop(list(erroneous_indices))
    df = df.reset_index(drop=True)

else:
    print("Schema validation successful")
    
print("Dataframe after dropping erroneous rows: ")
print(df)  
    
print(f"Validation Time using pandas: {(dt.datetime.now() - start_time).total_seconds()} seconds")


# git pe repo
# timesheet

                                                     0
0    {row: 1, column: "ACCT_CD"}: "nan" cannot be c...
1    {row: 1, column: "ACCT_CD"}: "nan" Mandatory c...
2    {row: 2, column: "ASOF_DATE"}: "1970-01-01" do...
3    {row: 2, column: "LONG_SHORT_IND"}: "M" is not...
4    {row: 5, column: "ASOF_DATE"}: "1970-01-01 05:...
..                                                 ...
211  {row: 195, column: "ACCT_CD"}: "nan" Mandatory...
212  {row: 195, column: "LONG_SHORT_IND"}: "K" is n...
213  {row: 196, column: "ASOF_DATE"}: "1970-01-01 0...
214  {row: 197, column: "ASOF_DATE"}: "19-01-01 05:...
215  {row: 198, column: "ASOF_DATE"}: "19-01-01 05:...

[216 rows x 1 columns]
                                                     0
0    {row: 1, column: "ACCT_CD"}: "nan" cannot be c...
1    {row: 1, column: "ACCT_CD"}: "nan" Mandatory c...
2    {row: 2, column: "ASOF_DATE"}: "1970-01-01" do...
3    {row: 2, column: "LONG_SHORT_IND"}: "M" is not...
4    {row: 5, column: "ASOF_DATE"}: "1970

In [24]:
l=[1,2,3,4,5]
print(l[:])

[1, 2, 3, 4, 5]


In [30]:
import pandas as pd
import datetime as dt
from pandas_schema import Column, Schema
from pandas_schema.validation import CanConvertValidation, DateFormatValidation, InListValidation, CustomElementValidation

start_time = dt.datetime.now()

df = pd.read_csv("erroneous_CRD_Position_202212124_200.csv")

def validate_not_empty(value):
    if isinstance(value, str):
        return value.strip() != ''
    else:
        return pd.notnull(value)

validation_functions = CustomElementValidation(validate_not_empty, 'Mandatory col should not be empty')

expected_schema = Schema([
    Column("ACCT_CD", [CanConvertValidation(int), validation_functions]),
    Column("EXT_SEC_ID", [CanConvertValidation(int)]),
    Column("ASOF_DATE", [DateFormatValidation(date_format='%Y-%m-%d %H:%M:%S.%f')]),
    Column("LONG_SHORT_IND", [InListValidation(['L','S']), validation_functions]),
    Column("QTY_SOD", [CanConvertValidation(int)]),
    Column("COST", [CanConvertValidation(float)]),
    Column("UNIT_COST", [CanConvertValidation(float)]),
    Column("MKT_VAL_SOD", [CanConvertValidation(float)])
])

# Validate DataFrame against the expected schema
errors = expected_schema.validate(df, columns=['ACCT_CD', 'EXT_SEC_ID', 'ASOF_DATE', 'LONG_SHORT_IND', 'QTY_SOD', 'COST', 'UNIT_COST', 'MKT_VAL_SOD'])

if errors:
    # Create a file for writing errors into
    error_file = open("er_messages.txt", "w")

    # Create a list of error messages using list comprehension
    error_messages = [f"Row: {error.row}, Column: {error.column} - Error: {error.message}" for error in errors]

    # Join the error messages using newline character
    error_message_text = "\n".join(error_messages)

    # Write the error messages to the file
    error_file.write(error_message_text)

    print("Schema validation failed. Mismatched rows:")
    print(error_message_text)

    error_file.close()

    # Get the indices of erroneous rows
    erroneous_indices = {error.row for error in errors}

    # Filter erroneous rows using boolean indexing
    erroneous_rows = df[df.index.isin(erroneous_indices)]

    erroneous_rows.to_csv("er_rows.csv", index=False)
    
    # Drop erroneous rows from the original DataFrame
    df = df.drop(list(erroneous_indices))
    df = df.reset_index(drop=True)
else:
    print("Schema validation successful")

print("Dataframe after dropping erroneous rows: ")
print(df)  

print(f"Validation Time using pandas: {(dt.datetime.now() - start_time).total_seconds()} seconds")


Schema validation failed. Mismatched rows:
Row: 1, Column: ACCT_CD - Error: cannot be converted to type <class 'int'>
Row: 1, Column: ACCT_CD - Error: Mandatory col should not be empty
Row: 2, Column: ASOF_DATE - Error: does not match the date format string "%Y-%m-%d %H:%M:%S.%f"
Row: 2, Column: LONG_SHORT_IND - Error: is not in the list of legal options (L, S)
Row: 5, Column: ASOF_DATE - Error: does not match the date format string "%Y-%m-%d %H:%M:%S.%f"
Row: 5, Column: LONG_SHORT_IND - Error: is not in the list of legal options (L, S)
Row: 5, Column: LONG_SHORT_IND - Error: Mandatory col should not be empty
Row: 7, Column: LONG_SHORT_IND - Error: is not in the list of legal options (L, S)
Row: 7, Column: LONG_SHORT_IND - Error: Mandatory col should not be empty
Row: 9, Column: ACCT_CD - Error: cannot be converted to type <class 'int'>
Row: 9, Column: ACCT_CD - Error: Mandatory col should not be empty
Row: 9, Column: LONG_SHORT_IND - Error: is not in the list of legal options (L, S)
R