In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(f"nb_test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/22 18:16:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def get_latest_50_weeks_efficient(spark, silver_root, category, current_date):
    """
    Only read the specific 50 weekly directories you need
    """
    from datetime import datetime, timedelta
    import glob
    
    # Calculate the 50 weekly directories you need
    current_dt = datetime.strptime(current_date, "%Y-%m-%d")
    start_date = current_dt - timedelta(weeks=50)
    print(start_date)
    # Generate the 50 weekly directory paths
    weekly_paths = []
    current_week = start_date
    
    while current_week <= current_dt:
        week_str = current_week.strftime('%Y_%m_%d')
        week_path = f"{silver_root}/{category}/{category}_week_{week_str}"
        weekly_paths.append(week_path)
        current_week += timedelta(weeks=1)
    
    # Read only those specific directories
    df = spark.read.parquet(*weekly_paths)
    
    print(f"Read {len(weekly_paths)} weekly directories from {start_date} to {current_dt}")
    return df

In [3]:
from datetime import datetime

df2 = get_latest_50_weeks_efficient(spark, "datamart/gold/", "feature_store", "2023-01-01")
print(f'Feature store records: {df2.count()}')

df3 = get_latest_50_weeks_efficient(spark, "datamart/gold/", "label_store", "2023-01-01")
print(f'Label store records: {df3.count()}')

2022-01-16 00:00:00


                                                                                

Read 51 weekly directories from 2022-01-16 00:00:00 to 2023-01-01 00:00:00


                                                                                

Feature store records: 736135
2022-01-16 00:00:00
Read 51 weekly directories from 2022-01-16 00:00:00 to 2023-01-01 00:00:00


[Stage 7:====>                                                    (1 + 12) / 13]

Label store records: 736135


                                                                                

In [8]:
def analyze_gold_data():
    """Analyze feature store and label store data"""
    
    # Initialize Spark
    spark = SparkSession.builder.appName("GoldDataAnalysis").getOrCreate()
    
    # Paths
    feature_store_path = "/opt/airflow/datamart/gold/feature_store"
    label_store_path = "/opt/airflow/datamart/gold/label_store"
    
    print("=== GOLD LAYER DATA ANALYSIS ===\n")
    
    # Analyze Feature Store
    print("1. FEATURE STORE ANALYSIS")
    print("-" * 50)
    
    # Get all feature store directories
    feature_dirs = sorted(glob.glob(f"{feature_store_path}/feature_store_week_*"))
    print(f"Found {len(feature_dirs)} feature store directories")
    
    feature_data = []
    for dir_path in feature_dirs:
        try:
            # Extract date from directory name
            date_str = dir_path.split("_week_")[-1]
            date_obj = datetime.strptime(date_str, "%Y_%m_%d")
            
            # Read parquet and count records
            df = spark.read.parquet(dir_path)
            record_count = df.count()
            
            # Feature store doesn't have snapshot_date column, so we use directory date
            feature_data.append({
                'directory': dir_path.split('/')[-1],
                'date': date_obj,
                'date_str': date_str,
                'record_count': record_count,
                'snapshot_dates': [date_obj]  # Use directory date as snapshot date
            })
            
            print(f"  {dir_path.split('/')[-1]}: {record_count} records")
            
        except Exception as e:
            print(f"  Error reading {dir_path}: {e}")
    
    # Analyze Label Store
    print("\n2. LABEL STORE ANALYSIS")
    print("-" * 50)
    
    # Get all label store directories
    label_dirs = sorted(glob.glob(f"{label_store_path}/label_store_week_*"))
    print(f"Found {len(label_dirs)} label store directories")
    
    label_data = []
    for dir_path in label_dirs:
        try:
            # Extract date from directory name
            date_str = dir_path.split("_week_")[-1]
            date_obj = datetime.strptime(date_str, "%Y_%m_%d")
            
            # Read parquet and count records
            df = spark.read.parquet(dir_path)
            record_count = df.count()
            
            # Check if label store has snapshot_date column
            has_snapshot_date = 'snapshot_date' in df.columns
            
            if has_snapshot_date:
                # Get distinct snapshot dates in this file
                distinct_dates = df.select("snapshot_date").distinct().collect()
                date_list = [row['snapshot_date'] for row in distinct_dates]
            else:
                # Use directory date if no snapshot_date column
                date_list = [date_obj]
            
            label_data.append({
                'directory': dir_path.split('/')[-1],
                'date': date_obj,
                'date_str': date_str,
                'record_count': record_count,
                'snapshot_dates': date_list,
                'has_snapshot_date': has_snapshot_date
            })
            
            print(f"  {dir_path.split('/')[-1]}: {record_count} records, {len(date_list)} snapshot dates")
            
        except Exception as e:
            print(f"  Error reading {dir_path}: {e}")
    
    # Compare data
    print("\n3. COMPARISON ANALYSIS")
    print("-" * 50)
    
    # Create DataFrames for comparison
    feature_df = pd.DataFrame(feature_data)
    label_df = pd.DataFrame(label_data)
    
    print(f"Feature store directories: {len(feature_df)}")
    print(f"Label store directories: {len(label_df)}")
    
    # Compare dates
    feature_dates = set(feature_df['date_str'].tolist())
    label_dates = set(label_df['date_str'].tolist())
    
    print(f"\nFeature store dates: {len(feature_dates)}")
    print(f"Label store dates: {len(label_dates)}")
    
    # Find mismatches
    feature_only = feature_dates - label_dates
    label_only = label_dates - feature_dates
    common_dates = feature_dates & label_dates
    
    print(f"\nCommon dates: {len(common_dates)}")
    print(f"Feature store only: {len(feature_only)}")
    print(f"Label store only: {len(label_only)}")
    
    if feature_only:
        print(f"\nDates only in feature store: {sorted(feature_only)}")
    if label_only:
        print(f"Dates only in label store: {sorted(label_only)}")
    
    # Compare record counts for common dates
    print("\n4. RECORD COUNT COMPARISON (Common Dates)")
    print("-" * 50)
    
    comparison_data = []
    for date_str in sorted(common_dates):
        feature_row = feature_df[feature_df['date_str'] == date_str].iloc[0]
        label_row = label_df[label_df['date_str'] == date_str].iloc[0]
        
        feature_count = feature_row['record_count']
        label_count = label_row['record_count']
        difference = feature_count - label_count
        
        comparison_data.append({
            'date': date_str,
            'feature_count': feature_count,
            'label_count': label_count,
            'difference': difference,
            'feature_snapshots': len(feature_row['snapshot_dates']),
            'label_snapshots': len(label_row['snapshot_dates'])
        })
        
        print(f"  {date_str}: Feature={feature_count}, Label={label_count}, Diff={difference}")
    
    # Summary statistics
    print("\n5. SUMMARY STATISTICS")
    print("-" * 50)
    
    total_feature_records = feature_df['record_count'].sum()
    total_label_records = label_df['record_count'].sum()
    
    print(f"Total feature store records: {total_feature_records}")
    print(f"Total label store records: {total_label_records}")
    print(f"Total difference: {total_feature_records - total_label_records}")
    
    if comparison_data:
        comp_df = pd.DataFrame(comparison_data)
        print(f"\nAverage difference per week: {comp_df['difference'].mean():.2f}")
        print(f"Max difference: {comp_df['difference'].max()}")
        print(f"Min difference: {comp_df['difference'].min()}")
    
    # Show sample of differences
    print("\n6. SAMPLE DIFFERENCES (First 10 weeks)")
    print("-" * 50)
    
    for i, comp in enumerate(comparison_data[:10]):
        print(f"  Week {i+1} ({comp['date']}): Feature={comp['feature_count']}, Label={comp['label_count']}, Diff={comp['difference']}")
    
    # Check for patterns in differences
    print("\n7. DIFFERENCE PATTERN ANALYSIS")
    print("-" * 50)
    
    if comparison_data:
        comp_df = pd.DataFrame(comparison_data)
        positive_diff = comp_df[comp_df['difference'] > 0]
        negative_diff = comp_df[comp_df['difference'] < 0]
        zero_diff = comp_df[comp_df['difference'] == 0]
        
        print(f"Weeks with more feature records: {len(positive_diff)}")
        print(f"Weeks with more label records: {len(negative_diff)}")
        print(f"Weeks with equal records: {len(zero_diff)}")
        
        if len(positive_diff) > 0:
            print(f"Average excess in feature store: {positive_diff['difference'].mean():.2f}")
        if len(negative_diff) > 0:
            print(f"Average excess in label store: {abs(negative_diff['difference'].mean()):.2f}")
    
    spark.stop()
    return feature_data, label_data, comparison_data

In [38]:
feature_data, label_data, comparison_data = analyze_gold_data()

=== GOLD LAYER DATA ANALYSIS ===

1. FEATURE STORE ANALYSIS
--------------------------------------------------
Found 157 feature store directories


25/06/22 08:56:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


  feature_store_week_2022_01_02: 9855 records
  feature_store_week_2022_01_09: 11963 records
  feature_store_week_2022_01_16: 14544 records
  feature_store_week_2022_01_23: 14588 records
  feature_store_week_2022_01_30: 14352 records
  feature_store_week_2022_02_06: 14370 records
  feature_store_week_2022_02_13: 14570 records
  feature_store_week_2022_02_20: 14358 records
  feature_store_week_2022_02_27: 14288 records
  feature_store_week_2022_03_06: 14541 records
  feature_store_week_2022_03_13: 14462 records
  feature_store_week_2022_03_20: 14444 records
  feature_store_week_2022_03_27: 14433 records
  feature_store_week_2022_04_03: 14330 records
  feature_store_week_2022_04_10: 14429 records
  feature_store_week_2022_04_17: 14347 records
  feature_store_week_2022_04_24: 14471 records
  feature_store_week_2022_05_01: 14324 records
  feature_store_week_2022_05_08: 14271 records
  feature_store_week_2022_05_15: 14381 records
  feature_store_week_2022_05_22: 14415 records
  feature_stor

In [11]:
import glob
import pandas as pd
feature_data, label_data, comparison_data = analyze_gold_data()

=== GOLD LAYER DATA ANALYSIS ===

1. FEATURE STORE ANALYSIS
--------------------------------------------------
Found 157 feature store directories
  feature_store_week_2022_01_02: 14577 records
  feature_store_week_2022_01_09: 14564 records
  feature_store_week_2022_01_16: 14670 records
  feature_store_week_2022_01_23: 14672 records
  feature_store_week_2022_01_30: 14464 records
  feature_store_week_2022_02_06: 14440 records
  feature_store_week_2022_02_13: 14752 records
  feature_store_week_2022_02_20: 14414 records
  feature_store_week_2022_02_27: 14470 records
  feature_store_week_2022_03_06: 14611 records
  feature_store_week_2022_03_13: 14546 records
  feature_store_week_2022_03_20: 14570 records
  feature_store_week_2022_03_27: 14573 records
  feature_store_week_2022_04_03: 14428 records
  feature_store_week_2022_04_10: 14513 records
  feature_store_week_2022_04_17: 14473 records
  feature_store_week_2022_04_24: 14625 records
  feature_store_week_2022_05_01: 14450 records
  featu

In [18]:
# Read only those specific directories
df_feature = spark.read.parquet("datamart/gold/feature_store/feature_store_week_2024_12_29")
df_label = spark.read.parquet("datamart/gold/label_store/label_store_week_2024_12_29")
print(f"Feature store: {df_feature.count()}")
print(f"Label store: {df_label.count()}")

Feature store: 6267
Label store: 6211


In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

def debug_missing_records():
    """Debug which records are missing from label store compared to feature store"""
    
    # Initialize Spark
    spark = SparkSession.builder.appName("MissingRecordsDebug").getOrCreate()
    
    # Read the data
    df_feature = spark.read.parquet("datamart/gold/feature_store/feature_store_week_2024_12_22")
    df_label = spark.read.parquet("datamart/gold/label_store/label_store_week_2024_12_22")
    
    print("=== DEBUGGING MISSING RECORDS ===")
    print(f"Feature store: {df_feature.count()} records")
    print(f"Label store: {df_label.count()} records")
    print(f"Difference: {df_feature.count() - df_label.count()} records")
    
    # Check if 'id' column exists in both datasets
    print("\n=== COLUMN CHECK ===")
    feature_has_id = 'id' in df_feature.columns
    label_has_id = 'id' in df_label.columns
    
    print(f"Feature store has 'id' column: {feature_has_id}")
    print(f"Label store has 'id' column: {label_has_id}")
    
    if not feature_has_id or not label_has_id:
        print("ERROR: Both datasets must have 'id' column for comparison")
        return
    
    # Get the IDs from both datasets
    feature_ids = df_feature.select("id").distinct()
    label_ids = df_label.select("id").distinct()
    
    print(f"\nFeature store unique IDs: {feature_ids.count()}")
    print(f"Label store unique IDs: {label_ids.count()}")
    
    # Find IDs that are in feature store but not in label store
    missing_in_label = feature_ids.subtract(label_ids)
    missing_in_feature = label_ids.subtract(feature_ids)
    
    print(f"\nIDs in feature store but missing in label store: {missing_in_label.count()}")
    print(f"IDs in label store but missing in feature store: {missing_in_feature.count()}")
    
    # Show some examples of missing IDs
    if missing_in_label.count() > 0:
        print("\n=== SAMPLE MISSING IDs (in feature but not in label) ===")
        missing_in_label.show(10, truncate=False)
        
        # Get the full records for some missing IDs from feature store
        print("\n=== SAMPLE MISSING RECORDS FROM FEATURE STORE ===")
        sample_missing_ids = missing_in_label.limit(5)
        missing_records = df_feature.join(sample_missing_ids, "id", "inner")
        missing_records.show(5, truncate=False)
    
    if missing_in_feature.count() > 0:
        print("\n=== SAMPLE MISSING IDs (in label but not in feature) ===")
        missing_in_feature.show(10, truncate=False)
        
        # Get the full records for some missing IDs from label store
        print("\n=== SAMPLE MISSING RECORDS FROM LABEL STORE ===")
        sample_missing_ids = missing_in_feature.limit(5)
        missing_records = df_label.join(sample_missing_ids, "id", "inner")
        missing_records.show(5, truncate=False)
    
    # Check for any patterns in the missing records
    print("\n=== PATTERN ANALYSIS ===")
    
    # Check if missing records have any common characteristics
    if missing_in_label.count() > 0:
        print("Analyzing patterns in records missing from label store...")
        
        # Get all missing records from feature store
        all_missing_records = df_feature.join(missing_in_label, "id", "inner")
        
        # Check for null values in key columns
        print("\nNull value analysis for missing records:")
        for col_name in all_missing_records.columns[:10]:  # Check first 10 columns
            null_count = all_missing_records.filter(col(col_name).isNull()).count()
            total_count = all_missing_records.count()
            if total_count > 0:
                null_percentage = (null_count / total_count) * 100
                print(f"{col_name}: {null_count} nulls ({null_percentage:.2f}%)")
    
    spark.stop()

if __name__ == "__main__":
    debug_missing_records() 

=== DEBUGGING MISSING RECORDS ===
Feature store: 14407 records
Label store: 14407 records
Difference: 0 records

=== COLUMN CHECK ===
Feature store has 'id' column: True
Label store has 'id' column: True

Feature store unique IDs: 14407
Label store unique IDs: 14407

IDs in feature store but missing in label store: 0
IDs in label store but missing in feature store: 0

=== PATTERN ANALYSIS ===


In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

def find_duplicate_ids():
    """Finds and displays records with duplicate IDs in the feature store,
       and analyzes which columns are unique vs. non-unique."""
    
    spark = SparkSession.builder.appName("FindDuplicateIDs").getOrCreate()
    
    feature_store_path = "datamart/gold/feature_store/feature_store_week_2024_12_29"
    print(f"Reading feature store from: {feature_store_path}")
    
    df_feature = spark.read.parquet(feature_store_path)
    df_feature.cache() # Cache for better performance
    
    print(f"\nTotal records in feature store: {df_feature.count()}")
    
    # Group by 'id' and count occurrences
    id_counts = df_feature.groupBy("id").count()
    
    # Filter for IDs that appear more than once
    duplicate_ids_df = id_counts.filter(col("count") > 1)
    
    num_duplicate_ids = duplicate_ids_df.count()
    print(f"Number of IDs with duplicates: {num_duplicate_ids}")
    
    if num_duplicate_ids > 0:
        print("\n--- IDs with Duplicate Records ---")
        duplicate_ids_df.orderBy(col("count").desc()).show()
        
        print("\n--- Analysis of Duplicate Records (Sample) ---")
        # Take a sample of 5 duplicate IDs to inspect
        duplicate_ids_to_check = [row.id for row in duplicate_ids_df.limit(5).collect()]
        
        for dup_id in duplicate_ids_to_check:
            print(f"\n--- Analyzing duplicates for ID: {dup_id} ---")
            
            # Get all records for this specific duplicate ID
            duplicate_records = df_feature.filter(col("id") == dup_id).collect()
            
            if not duplicate_records:
                continue
                
            # Display the duplicate records
            print("Duplicate rows found:")
            df_feature.filter(col("id") == dup_id).show(truncate=False)

            first_record = duplicate_records[0].asDict()
            all_columns = list(first_record.keys())
            non_unique_cols = set()
            
            # Compare the first record with all other duplicate records for this ID
            for i in range(1, len(duplicate_records)):
                current_record = duplicate_records[i].asDict()
                for col_name in all_columns:
                    if first_record[col_name] != current_record[col_name]:
                        non_unique_cols.add(col_name)

            unique_cols = set(all_columns) - non_unique_cols
            
            print(f"For ID {dup_id}:")
            if non_unique_cols:
                print(f"  Columns with DIFFERENT values: {sorted(list(non_unique_cols))}")
            else:
                print("  All columns have the same values (exact duplicates).")

            if unique_cols:
                print(f"  Columns with the SAME values: {sorted(list(unique_cols))}")

    else:
        print("\nNo duplicate IDs found in the feature store.")
        
    df_feature.unpersist()
    spark.stop()

if __name__ == "__main__":
    find_duplicate_ids()

Reading feature store from: datamart/gold/feature_store/feature_store_week_2024_12_29

Total records in feature store: 6211
Number of IDs with duplicates: 0

No duplicate IDs found in the feature store.


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(f"nb_test").getOrCreate()

df_feature = spark.read.parquet("datamart/gold/feature_store/feature_store_week_2023_01_01")
df_feature.show(1)

+-----------+---------+-----------+---------------+--------+-----------+------+------+------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+------------+------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+----------+----------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+--------------------+-------------------------+----------------+----------------------+--------------------+--------------------+---