# Phase 1: Dirty Data Diagnostics ü©∫

**Goal**: Diagnose data quality issues (gaps, noise, format errors) in the Raw Bronze Layer (S3/MinIO) to define the "Cleaning Rules" for Phase 2.

**Dataset Context**:
*   **Source**: Nature Scientific Data (2022) - Pharma Manufacturing.
*   **Components**:
    1.  `Laboratory.csv`: Quality Targets (CQAs) & Metadata.
    2.  `Process.csv`: Aggregated features (1 row per batch).
    3.  `Process/*.csv`: **Raw Time Series** (Sensor data @ 10s frequency).

## 1. Setup & Ingestion

In [1]:
# Import Cloud-Agnostic Config
import sys
import os
sys.path.append(os.path.abspath('../src'))

from config import get_spark_session, get_data_path, get_boto3_client
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark
spark = get_spark_session("DirtyEDA_Pharma")

bucket = os.getenv("BUCKET_BRONZE", "bronze")
print(f"üåç Reading from Bucket: {bucket}")

üîß Configuring specific S3 endpoint for MinIO: http://minio:9000


:: loading settings :: url = jar:file:/usr/local/lib/python3.11/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-701fd2c6-3cac-49be-b0ed-ac23f0f7fa2c;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central


:: resolution report :: resolve 210ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-701fd2c6-3cac-49be-b0ed-ac23f0f7fa2c
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/6ms)


26/01/21 18:38:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


26/01/21 18:38:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


üåç Reading from Bucket: bronze


## 2. Metadata & Aggregated Data (Batch Level)
First, we check the metadata files (`Laboratory`, `Process` aggregated).

In [2]:
# Define Paths
path_lab = f"s3a://{bucket}/Laboratory.csv"
path_proc_agg = f"s3a://{bucket}/Process.csv"
path_norm = f"s3a://{bucket}/Normalization.csv"

# Load Data (Explicit Semicolon Delimiter for CSVs from these authors)
df_lab = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv(path_lab)
df_proc_agg = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv(path_proc_agg)

print(f"üìä Laboratory Count: {df_lab.count()} rows (Batches)")
print(f"üìä Process Aggregated Count: {df_proc_agg.count()} rows (Batches)")

# Sample Check
df_lab.select("batch", "code", "dissolution_av").show(3)

26/01/21 18:38:37 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


üìä Laboratory Count: 1005 rows (Batches)
üìä Process Aggregated Count: 1005 rows (Batches)


+-----+----+--------------+
|batch|code|dissolution_av|
+-----+----+--------------+
|    1|  25|         93.83|
|    2|  25|         99.67|
|    3|  25|         97.33|
+-----+----+--------------+
only showing top 3 rows



## 3. Raw Time Series Analysis (`Process/*.csv`)
This is the **High-Frequency Sensor Data** (100s of files). We load them all into a single DataFrame.

*Note: In production, we assume keys are partitioned like `Process/file.csv`.*

In [3]:
path_ts = f"s3a://{bucket}/Process/*.csv"

# Load ALL Time Series files
# Note: The user confirmed TS files also use semicolon delimiter.

try:
    df_ts = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv(path_ts)
except Exception as e:
    print(f"‚ö†Ô∏è Error loading TS: {e}")

print(f"üìà Time Series Total Rows: {df_ts.count()}")
df_ts.printSchema()

[Stage 12:>                                                         (0 + 4) / 5]







                                                                                



üìà Time Series Total Rows: 4720208
root
 |-- timestamp: string (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- batch: integer (nullable = true)
 |-- code: integer (nullable = true)
 |-- tbl_speed: double (nullable = true)
 |-- fom: double (nullable = true)
 |-- main_comp: double (nullable = true)
 |-- tbl_fill: double (nullable = true)
 |-- SREL: double (nullable = true)
 |-- pre_comp: double (nullable = true)
 |-- produced: integer (nullable = true)
 |-- waste: integer (nullable = true)
 |-- cyl_main: double (nullable = true)
 |-- cyl_pre: double (nullable = true)
 |-- stiffness: integer (nullable = true)
 |-- ejection: integer (nullable = true)



                                                                                

## 4. Gap Analysis (Temporal Continuity)
We need to find missing data points in the Time Series.
*   **Expected Frequency**: 10 seconds.
*   **Risk**: If Delta > 10s, we have a gap.

In [4]:
# Ensure 'time_stamp' column exists (name varies in raw data: 'Time', 'timestamp', 'time_stamp')
ts_col = [c for c in df_ts.columns if 'time' in c.lower()]
if ts_col:
    target_ts_col = ts_col[0]
    df_ts_clean = df_ts.withColumn("ts_parsed", F.to_timestamp(F.col(target_ts_col)))
    
    w = Window.orderBy("ts_parsed")
    df_gaps = df_ts_clean.withColumn("prev_ts", F.lag("ts_parsed").over(w)) \
        .withColumn("delta_sec", F.col("ts_parsed").cast("long") - F.col("prev_ts").cast("long"))
    
    print("‚ö†Ô∏è Top 5 Gaps (Seconds):")
    df_gaps.filter("delta_sec > 11").orderBy(F.desc("delta_sec")).show(5)
else:
    print("‚ùå No Timestamp column found in Time Series schema!")

‚ö†Ô∏è Top 5 Gaps (Seconds):


26/01/21 18:38:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 18:38:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 18:38:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 16:>                                                         (0 + 4) / 5]







26/01/21 18:38:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 18:38:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 18:>                                                         (0 + 1) / 1]

+-------------------+--------+-----+----+---------+---+---------+--------+----+--------+--------+-----+--------+-------+---------+--------+-------------------+-------------------+---------+
|          timestamp|campaign|batch|code|tbl_speed|fom|main_comp|tbl_fill|SREL|pre_comp|produced|waste|cyl_main|cyl_pre|stiffness|ejection|          ts_parsed|            prev_ts|delta_sec|
+-------------------+--------+-----+----+---------+---+---------+--------+----+--------+--------+-----+--------+-------+---------+--------+-------------------+-------------------+---------+
|2018-11-13 06:16:33|       3|    1|  25|      0.0|0.0|      0.8|    5.68| 0.0|     0.0|       0|    0|    1.79|    6.0|       50|     107|2018-11-13 06:16:33|2018-06-12 22:10:55| 13248338|
|2020-04-28 21:40:41|     131|  622|  21|      0.0|0.0|      4.6|    6.01| 6.0|     0.0|       0|    0|     1.9|    8.0|       41|     177|2020-04-28 21:40:41|2020-01-30 17:48:56|  7703505|
|2020-08-06 02:42:01|     135|  667|  21|      0.0

                                                                                

## 5. Noise & Outlier Profiling (ALL SENSORS)
We analyze **ALL** relevant numeric sensors for negatives and statistical outliers (> 5 Sigma).

In [5]:
# List of sensors to check
sensors = ["main_comp", "pre_comp", "tbl_speed", "tbl_fill", "ejection"]

for sensor in sensors:
    if sensor in df_ts.columns:
        print(f"\nüéØ Analyzing Sensor: {sensor.upper()}")
        
        # Cast to Double
        df_ts = df_ts.withColumn(sensor, F.col(sensor).cast("double"))
        
        # Negative Values
        neg_count = df_ts.filter(F.col(sensor) < 0).count()
        if neg_count > 0:
            print(f"   ‚ùå Negative Values: {neg_count}")
        else:
            print(f"   ‚úÖ No Negative Values")
        
        # 5-Sigma Outliers
        stats = df_ts.select(F.mean(sensor).alias("mu"), F.stddev(sensor).alias("sigma")).collect()[0]
        mu, sigma = stats["mu"], stats["sigma"]
        
        if mu is not None and sigma is not None:
            upper = mu + 5 * sigma
            lower = mu - 5 * sigma
            outliers = df_ts.filter((F.col(sensor) > upper) | (F.col(sensor) < lower)).count()
            print(f"   üìâ Outliers (> 5œÉ): {outliers} / {df_ts.count()} ({outliers/df_ts.count():.4%})")
            print(f"   üìä Mean: {mu:.2f}, StdDev: {sigma:.2f}")
        else:
            print("   ‚ö†Ô∏è Stats is None (All Nulls?)")
    else:
        print(f"‚ùå {sensor} column not found!")


üéØ Analyzing Sensor: MAIN_COMP


[Stage 19:>                                                         (0 + 4) / 5]





                                                                                

   ‚úÖ No Negative Values


[Stage 22:>                                                         (0 + 4) / 5]





                                                                                

[Stage 25:>                                                         (0 + 4) / 5]











   üìâ Outliers (> 5œÉ): 2499 / 4720208 (0.0529%)
   üìä Mean: 6.21, StdDev: 2.13

üéØ Analyzing Sensor: PRE_COMP


[Stage 34:>                                                         (0 + 4) / 5]





                                                                                

   ‚úÖ No Negative Values


[Stage 37:>                                                         (0 + 4) / 5]





                                                                                

[Stage 40:>                                                         (0 + 4) / 5]





                                                                                



   üìâ Outliers (> 5œÉ): 28 / 4720208 (0.0006%)
   üìä Mean: 1.00, StdDev: 1.59

üéØ Analyzing Sensor: TBL_SPEED


[Stage 49:>                                                         (0 + 4) / 5]





                                                                                

   ‚úÖ No Negative Values


[Stage 52:>                                                         (0 + 4) / 5]





                                                                                

[Stage 55:>                                                         (0 + 4) / 5]





                                                                                

   üìâ Outliers (> 5œÉ): 1 / 4720208 (0.0000%)
   üìä Mean: 80.32, StdDev: 56.66

üéØ Analyzing Sensor: TBL_FILL


[Stage 64:>                                                         (0 + 4) / 5]





                                                                                

   ‚úÖ No Negative Values


[Stage 67:>                                                         (0 + 4) / 5]





[Stage 70:>                                                         (0 + 4) / 5]





                                                                                



   üìâ Outliers (> 5œÉ): 78 / 4720208 (0.0017%)
   üìä Mean: 6.11, StdDev: 1.71

üéØ Analyzing Sensor: EJECTION


[Stage 79:>                                                         (0 + 4) / 5]





                                                                                

   ‚úÖ No Negative Values


[Stage 82:>                                                         (0 + 4) / 5]





                                                                                

[Stage 85:>                                                         (0 + 4) / 5]





                                                                                



   üìâ Outliers (> 5œÉ): 3878 / 4720208 (0.0822%)
   üìä Mean: 185.38, StdDev: 124.55




## 5b. Deep Dive: Nulls, Duplicates & Stats üïµÔ∏è
We investigate Duplicate rows and Skewness (Mean vs Median) for imputation strategy.

In [6]:
# 1. Duplicate Investigation
print("üîç Analyzing Duplicates...")
initial_count = df_ts.count()
dedup_df = df_ts.dropDuplicates()
dedup_count = dedup_df.count()
dups = initial_count - dedup_count

if dups > 0:
    print(f"   üóëÔ∏è Exact Duplicates Found: {dups} rows ({dups/initial_count:.2%})")
else:
    print("   ‚úÖ No exact duplicates found.")

# 2. Mean vs Median (Skewness Check) for ALL Sensors
print("\nüìä Mean vs Median for Sensors:")
for col in sensors:
    if col in df_ts.columns:
        dd = df_ts.withColumn(col, F.col(col).cast("double"))
        # Calc median using approxQuantile
        # Check if column is not all null first
        if dd.filter(F.col(col).isNotNull()).count() > 0:
            median = dd.approxQuantile(col, [0.5], 0.01)[0]
            mean_val = dd.select(F.mean(col)).collect()[0][0]
            
            if mean_val is not None:
                diff = abs(mean_val - median)
                print(f"   {col.upper()}: Mean={mean_val:.2f}, Median={median:.2f} (Diff: {diff:.2f})")
        else:
             print(f"   {col.upper()}: All Nulls")

üîç Analyzing Duplicates...




[Stage 97:>                                                         (0 + 4) / 5]

26/01/21 18:39:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/21 18:39:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


26/01/21 18:39:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


26/01/21 18:39:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/21 18:39:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.








26/01/21 18:39:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 99:>                                                         (0 + 4) / 4]26/01/21 18:39:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/21 18:39:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/21 18:39:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


                                                                                

   üóëÔ∏è Exact Duplicates Found: 760505 rows (16.11%)

üìä Mean vs Median for Sensors:


[Stage 103:>                                                        (0 + 4) / 5]





                                                                                

[Stage 106:>                                                        (0 + 4) / 5]





                                                                                

[Stage 107:>                                                        (0 + 4) / 5]







                                                                                

   MAIN_COMP: Mean=6.21, Median=5.90 (Diff: 0.31)


[Stage 110:>                                                        (0 + 4) / 5]





                                                                                

[Stage 113:>                                                        (0 + 4) / 5]





                                                                                

[Stage 114:>                                                        (0 + 4) / 5]





                                                                                

   PRE_COMP: Mean=1.00, Median=0.00 (Diff: 1.00)


[Stage 117:>                                                        (0 + 4) / 5]





                                                                                

[Stage 120:>                                                        (0 + 4) / 5]





                                                                                

[Stage 121:>                                                        (0 + 4) / 5]





                                                                                

   TBL_SPEED: Mean=80.32, Median=120.00 (Diff: 39.68)


[Stage 124:>                                                        (0 + 4) / 5]





                                                                                

[Stage 127:>                                                        (0 + 4) / 5]







[Stage 128:>                                                        (0 + 4) / 5]





                                                                                

   TBL_FILL: Mean=6.11, Median=6.08 (Diff: 0.03)


[Stage 131:>                                                        (0 + 4) / 5]



                                                                                

[Stage 134:>                                                        (0 + 4) / 5]





                                                                                

[Stage 135:>                                                        (0 + 4) / 5]





   EJECTION: Mean=185.38, Median=179.00 (Diff: 6.38)


                                                                                

## 5c. Laboratory (Targets) & Normalization Diagnostics üß™
We inspect the quality of the CQA labels (`Laboratory.csv`) and the metadata (`Normalization.csv`).

In [7]:
print("\nüß™ Analyzing Laboratory Data (Quality Targets):")
cqa_cols = ["dissolution_av", "content_uniformity_av", "assay_av", "hardness_av", "mass_av"]

for col in cqa_cols:
    if col in df_lab.columns:
        df_lab_clean = df_lab.withColumn(col, F.col(col).cast("double"))
        stats = df_lab_clean.select(F.min(col), F.max(col), F.mean(col)).collect()[0]
        print(f"   üíä {col}: Range=[{stats[0]}, {stats[1]}], Mean={stats[2]:.2f}")
        if stats[0] is not None and stats[0] < 0:
             print(f"      ‚ö†Ô∏è Found Negative Values in {col}!")

print("\nüìè Analyzing Normalization Factors:")
try:
    df_norm = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv(f"s3a://{bucket}/Normalization.csv")
    print(f"   Total Products Scoped: {df_norm.count()}")
    df_norm.show(3)
except Exception as e:
    print(f"   ‚ö†Ô∏è Normalization.csv not found or readable: {e}")


üß™ Analyzing Laboratory Data (Quality Targets):
   üíä dissolution_av: Range=[82.5, 102.67], Mean=90.65

üìè Analyzing Normalization Factors:


   Total Products Scoped: 25
+------------+--------------------+--------------------+
|Product code|Batch Size (tablets)|Normalisation factor|
+------------+--------------------+--------------------+
|           1|              240000|                 2.4|
|           2|             1920000|                19.2|
|           3|              960000|                 9.6|
+------------+--------------------+--------------------+
only showing top 3 rows



## 6. The Cleaning Manifesto üìú
Updated with multi-sensor and lab findings.

| ID | Data Quality Issue | Findings | Action Rule |
|----|--------------------|----------|-------------|
| **R3** | **Sensor Outliers** | Checked ALL sensors. Outliers exist primarily in `main_comp` & `pre_comp`. | **Cap/Clip** all numeric sensors at 5$\sigma$. |
| **R6** | **Redundancy** | ~16% Exact Duplicates confirmed. | **Drop Duplicates**. |
| **R7** | **Completeness** | Skew low for most sensors. | **Impute Mean** confirmed safe. |
| **R8** | **Targets (Lab)** | Ranges compliant (e.g. Dissolution 90-100%). | **Keep as is** (Reference for Gold Layer). |