In [1]:
import os
os.environ['HADOOP_HOME'] = r'C:\hadoop'
os.environ['PATH'] = os.environ['HADOOP_HOME'] + r'\bin;' + os.environ['PATH']

In [2]:
from azure.storage.blob import BlobServiceClient
import pandas as pd
from io import BytesIO
from pathlib import Path
import pyarrow
import tempfile
import gc

from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col, when, isnan

from pyspark.sql.functions import col
from pyspark.sql.types import StringType

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

In [None]:
sas_token = "xxx"

account_url = "xxx"

container_name = "xxx"

In [4]:
# Create a BlobServiceClient using the SAS token
blob_service_client = BlobServiceClient(account_url=account_url, credential=sas_token)

# Get the container client
container_client = blob_service_client.get_container_client(container_name)

# List all blobs in the container
#for blob in container_client.list_blobs():
#    print(blob.name)

## Spark Load (problem)

In [20]:
builder = (
    SparkSession.builder
    .appName("AzureParquetToDelta")
    .config("spark.driver.memory", "16g")      
    .config("spark.executor.memory", "16g")    
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [53]:
# List blobs and read them directly as Spark DataFrames
temp_files = []

for blob in container_client.list_blobs():
    blob_client = container_client.get_blob_client(blob)
    data = blob_client.download_blob().readall()

    # Skip empty downloads
    #if len(data) == 0:
    #    print(f"Skipped empty blob: {blob.name}")
    #    continue

    # Save to temp file
    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
    tmp.write(data)
    tmp.close()
    temp_files.append(tmp.name)

#schema = StructType([
#    StructField("TagName", StringType(), True),
#    StructField("EventTime", TimestampType(), True),
#    StructField("Status", StringType(), True),
#    StructField("Value", StringType(), True)  # <- force string at read time
#])

# Read all temp Parquet files in Spark
spark_df = spark.read.parquet(*temp_files)

In [52]:
#spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
#spark_df = spark.read.option("mergeSchema", "true").parquet(*temp_files)
#spark_df = spark.read.parquet(*temp_files)

In [54]:
spark_df.show(5)
spark_df.printSchema()

+----------------+--------------------+------+----------+
|         TagName|           EventTime|Status|     Value|
+----------------+--------------------+------+----------+
| :V0V8C5DP67P0.P|2024-07-05 18:01:...|  Good|0.06322985|
|O49TPU05CV86.CP:|2024-07-05 20:48:...|  Good|  96.24695|
| 6I.:V9L8P0VCP96|2024-07-06 01:24:...|  Good| 38.797314|
| .LC6P6V:P0I918V|2024-07-05 14:41:...|  Good|  170.4655|
|  V.P301:PV807CX|2024-07-05 10:12:...|  Good| 1002.4146|
+----------------+--------------------+------+----------+
only showing top 5 rows

root
 |-- TagName: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)
 |-- Status: string (nullable = true)
 |-- Value: float (nullable = true)



In [45]:
#spark_df = spark_df.withColumn("Value",col("Value").cast(StringType()))

spark_df = spark_df.withColumn(
    "Value",
    when(col("Value").isNull(), None)
    .when(isnan(col("Value")), "NaN")
    .otherwise(col("Value").cast(StringType()))
)

spark_df.printSchema()

root
 |-- TagName: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)
 |-- Status: string (nullable = true)
 |-- Value: string (nullable = true)



In [46]:
delta_path = os.path.abspath("data/delta/shell_data")
delta_path = Path(delta_path).resolve()
delta_uri = delta_path.as_uri()

print(f"Delta table will be stored at: {delta_path}")

Delta table will be stored at: C:\Users\Agando\Documents\Projects\amos2025ws03-rtdip-timeseries-forecasting\dummy\data\delta\shell_data


In [51]:
#temp_parquet = "C:/temp/intermediate.parquet"
#spark_df.write.mode("overwrite").parquet(temp_parquet)

In [50]:
#spark_df.write.format("delta").mode("overwrite").save(delta_uri)

## Pandas Load & Store as Parquet

In [5]:
dataframes = []

for blob in container_client.list_blobs():
    # Match filenames containing '.parquet'
    if ".parquet" in blob.name.lower():
        print(f"Reading: {blob.name}")
        blob_client = container_client.get_blob_client(blob)
        try:
            stream = blob_client.download_blob()
            data = stream.readall()
            df = pd.read_parquet(BytesIO(data))
            
            # Optional: add filename info
            #df["source_file"] = blob.name
            
            dataframes.append(df)
        except Exception as e:
            print(f"Failed to read {blob.name}: {e}")

print(f"\nLoaded {len(dataframes)} dataframes.")

if dataframes:
    pd_df = pd.concat(dataframes, ignore_index=True)
    print("Combined dataframe shape:", pd_df.shape)
else:
    print("No dataframes were loaded.")

Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_1
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_10
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_11
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_12
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_13
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_14
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_15
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_16
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_17
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_18
Reading: Data/2024/TagMeasurements_float_2024-10-10 06:02:10.442377.parquet_DataFrame_19
Reading: Data/2024/Tag

In [6]:
pd_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 214991102 entries, 0 to 214991101
Data columns (total 4 columns):
 #   Column     Dtype         
---  ------     -----         
 0   TagName    object        
 1   EventTime  datetime64[ns]
 2   Status     object        
 3   Value      object        
dtypes: datetime64[ns](1), object(3)
memory usage: 6.4+ GB


In [7]:
pd_df.head(5)

Unnamed: 0,TagName,EventTime,Status,Value
0,A2PS64V0J.:ZUX09R,2024-01-02 20:03:46,Good,0.34
1,A2PS64V0J.:ZUX09R,2024-01-02 16:00:12,Good,0.15
2,A2PS64V0J.:ZUX09R,2024-01-02 11:56:42,Good,0.13
3,A2PS64V0J.:ZUX09R,2024-01-02 07:53:11,Good,0.12
4,A2PS64V0J.:ZUX09R,2024-01-02 03:49:45,Good,0.13


In [8]:
non_numeric = pd_df[pd.to_numeric(pd_df['Value'], errors='coerce').isna()]

print(f"Non-numeric values count: {len(non_numeric):,}")
print(f"\nUnique non-numeric values:")
print(non_numeric['Value'].value_counts())

print(f"\nSample rows with non-numeric values:")
non_numeric[['TagName', 'EventTime', 'Status', 'Value']].head(10)

Non-numeric values count: 565,977

Unique non-numeric values:
Value
Calc Failed      231627
Bad Input         98155
No Result         47516
Failed            47516
Out of Serv       43917
Bad               19920
Scan Timeout       1452
Comm Fail           694
I/O Timeout         208
Doubtful             41
Not Connect          34
Pt Created           12
Invalid Float         8
Scan Off              2
Name: count, dtype: int64

Sample rows with non-numeric values:


Unnamed: 0,TagName,EventTime,Status,Value
575872,1530P2X:VUC0.HB,2024-01-15 09:24:00,Bad,
575924,1530P2X:VUC0.HB,2024-01-15 09:22:00,Bad,
577989,1530P2X:VUC0.HB,2024-01-15 12:59:00,Bad,
577992,1530P2X:VUC0.HB,2024-01-15 12:57:00,Bad,
580091,1530P2X:VUC0.HB,2024-01-15 12:58:00,Bad,
580298,1530P2X:VUC0.HB,2024-01-15 09:20:00,Bad,
581125,1530P2X:VUC0.HB,2024-01-15 09:23:00,Bad,
581209,1530P2X:VUC0.HB,2024-01-15 09:18:00,Bad,
581908,1530P2X:VUC0.HB,2024-01-15 07:04:00,Bad,
582059,1530P2X:VUC0.HB,2024-01-15 07:05:00,Bad,


In [9]:
print("Missing values:")
df.isnull().sum()

Missing values:


TagName      0
EventTime    0
Status       0
Value        0
dtype: int64

In [14]:
#pd_df = pd.read_parquet("data/shell.parquet")

In [15]:
# To prevent any type errors / conflicts with spark
pd_df = pd_df.astype(str)

In [17]:
pd_df.to_parquet("data/shell.parquet")

In [10]:
del pd_df
gc.collect()

4914

## EDA in Spark

In [11]:
builder = (
    SparkSession.builder
    .appName("AzureParquetToDelta")
    .config("spark.driver.memory", "16g")      
    .config("spark.executor.memory", "16g")    
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [49]:
spark_df = spark.read.parquet("data/shell.parquet")

In [None]:
# ============================================================================
# BASIC INFO & SHAPE
# ============================================================================

print(f"Total rows: {spark_df.count():,}")
print(f"\nSchema:")
spark_df.printSchema()
print(f"\nDataFrame info:")
spark_df.rdd.getNumPartitions()
print(f"Number of partitions: {spark_df.rdd.getNumPartitions()}")

Total rows: 214,991,102

Schema:
root
 |-- TagName: string (nullable = true)
 |-- EventTime: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Value: string (nullable = true)


DataFrame info:
Number of partitions: 22


In [None]:
# ============================================================================
# FIRST LOOK AT DATA
# ============================================================================

print("First 10 rows:")
spark_df.show(10, truncate=False)

print("\nData types:")
print(spark_df.dtypes)

First 10 rows:
+-----------------------------+-----------------------+------+-------------------+
|TagName                      |EventTime              |Status|Value              |
+-----------------------------+-----------------------+------+-------------------+
|A2PS64V0J.:ZUX09R            |2024-01-02 20:03:46.000|Good  |0.3400000035762787 |
|A2PS64V0J.:ZUX09R            |2024-01-02 16:00:12.000|Good  |0.15000000596046448|
|A2PS64V0J.:ZUX09R            |2024-01-02 11:56:42.000|Good  |0.12999999523162842|
|A2PS64V0J.:ZUX09R            |2024-01-02 07:53:11.000|Good  |0.11999999731779099|
|A2PS64V0J.:ZUX09R            |2024-01-02 03:49:45.000|Good  |0.12999999523162842|
|-4O7LSSAM_3EA02:2GT7E02I_R_MP|2024-01-02 20:09:58.053|Good  |7107.82080078125   |
|_LT2EPL-9PM0.OROTENV3:       |2024-01-02 12:27:10.518|Good  |19407.0            |
|_LT2EPL-9PM0.OROTENV3:       |2024-01-02 05:23:10.143|Good  |19403.0            |
|_LT2EPL-9PM0.OROTENV3:       |2024-01-02 01:31:10.086|Good  |19399.0   

In [None]:
# ============================================================================
# MISSING VALUES
# ============================================================================

from pyspark.sql.functions import col, count, when

print("Missing values (NULL and NaN):")
missing_stats = spark_df.select([
    count(when(col(c).isNull(), 1)).alias(f"{c}_null_count")
    for c in spark_df.columns
])
missing_stats.show()

# Also check for NaN in Value column
nan_count = spark_df.filter(isnan(col("Value"))).count()
print(f"\nValue column NaN count: {nan_count:,}")

print("\nMissing values percentage:")
missing_pct = spark_df.select([
    (count(when(col(c).isNull(), 1)) / count("*") * 100).alias(f"{c}_null_pct")
    for c in spark_df.columns
])
missing_pct.show()

nan_pct = (nan_count / spark_df.count() * 100)
print(f"Value column NaN percentage: {nan_pct:.2f}%")

Missing values (NULL and NaN):
+------------------+--------------------+-----------------+----------------+
|TagName_null_count|EventTime_null_count|Status_null_count|Value_null_count|
+------------------+--------------------+-----------------+----------------+
|                 0|                   0|                0|               0|
+------------------+--------------------+-----------------+----------------+


Value column NaN count: 74,875

Missing values percentage:
+----------------+------------------+---------------+--------------+
|TagName_null_pct|EventTime_null_pct|Status_null_pct|Value_null_pct|
+----------------+------------------+---------------+--------------+
|             0.0|               0.0|            0.0|           0.0|
+----------------+------------------+---------------+--------------+

Value column NaN percentage: 0.03%


In [53]:
print("\nExample rows with NaN values in Value column:")
spark_df.filter(isnan(col("Value"))).select(
    "TagName", "EventTime", "Status", "Value"
).show(10, truncate=False)


Example rows with NaN values in Value column:
+---------------+-----------------------+------+-----+
|TagName        |EventTime              |Status|Value|
+---------------+-----------------------+------+-----+
|1530P2X:VUC0.HB|2024-01-15 09:24:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 09:22:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 12:59:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 12:57:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 12:58:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 09:20:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 09:23:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 09:18:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 07:04:00.000|Bad   |nan  |
|1530P2X:VUC0.HB|2024-01-15 07:05:00.000|Bad   |nan  |
+---------------+-----------------------+------+-----+
only showing top 10 rows



In [54]:
### create new column value_status to perform statistics

# Create Value_Status column (capture non-numeric values before converting)
spark_df = spark_df.withColumn(
    "Value_Status",
    when(col("Value").cast("float").isNull() & col("Value").isNotNull(), col("Value"))
    .otherwise("Valid")
)

# Convert Value to float (non-numeric become null)
spark_df = spark_df.withColumn("Value", col("Value").cast("float"))

# Convert EventTime to proper timestamp format
spark_df = spark_df.withColumn("EventTime", col("EventTime").cast("timestamp"))

print("Schema:")
spark_df.printSchema()

#print("\nValue_Status distribution:")
#spark_df.groupBy("Value_Status").count().show()


Schema:
root
 |-- TagName: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)
 |-- Status: string (nullable = true)
 |-- Value: float (nullable = true)
 |-- Value_Status: string (nullable = true)



In [None]:
# ============================================================================
# CATEGORICAL COLUMNS (TagName, Status, Value_Status)
# ============================================================================

print("Unique TagNames:")
tag_counts = spark_df.groupBy("TagName").count().orderBy(col("count").desc())
tag_counts.show(10, truncate=False)
print(f"Total unique TagNames: {tag_counts.count()}")

print("\nUnique Status values:")
status_counts = spark_df.groupBy("Status").count().orderBy(col("count").desc())
status_counts.show(truncate=False)
print(f"Total unique Status: {status_counts.count()}")

print("\nValue_Status distribution:")
value_status_counts = spark_df.groupBy("Value_Status").count().orderBy(col("count").desc())
value_status_counts.show(truncate=False)
print(f"Total unique Value_Status: {value_status_counts.count()}")

Unique TagNames:
+---------------------------+------+
|TagName                    |count |
+---------------------------+------+
|T:4FU4PRBV3NO0V_0PLM40R-.34|553540|
|P0TON:H53Y3S.C1CL          |518277|
|0.3HN:CPOC5Y18BTPS         |518277|
|HO.T:C4F5C0O5U1YN          |518277|
|5H4C:S.YPLOTC1N00          |518277|
|S.LY7O:3PC0N5T1HC          |518277|
|OTHS1:5.TP3C0N9CY          |518277|
|4:HTSN1C4T0.CPY5O          |518277|
|5.CF:1H45CP0OYSNT          |518277|
|H951ASC.MO:YNEQ1I1         |518277|
+---------------------------+------+
only showing top 10 rows

Total unique TagNames: 7759

Unique Status values:
+-----------------+---------+
|Status           |count    |
+-----------------+---------+
|Good             |213642667|
|Bad              |1347992  |
|Substituted, Good|253      |
|Questionable     |190      |
+-----------------+---------+

Total unique Status: 4

Value_Status distribution:
+-------------+---------+
|Value_Status |count    |
+-------------+---------+
|Valid        |214

In [None]:
# ============================================================================
# CREATE CLEAN DATAFRAME (remove NULL and NaN from Value column)
# ============================================================================

spark_df_clean = spark_df.filter(col("Value").isNotNull()).drop("Value_Status")
spark_df_clean = spark_df_clean.withColumn("Value", col("Value").cast("float"))

# Remove NaN values
spark_df_clean = spark_df_clean.filter(~isnan(col("Value")))

print(f"\nOriginal rows: {spark_df.count():,}")
print(f"Clean rows: {spark_df_clean.count():,}")
print(f"Rows dropped: {spark_df.count() - spark_df_clean.count():,}")

print("\nClean schema:")
spark_df_clean.printSchema()

print("\nSample clean data:")
spark_df_clean.show(10)

Original rows: 214,991,102
Clean rows: 214,500,000
Rows dropped: 491,102

Clean schema:
root
 |-- TagName: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)
 |-- Status: string (nullable = true)
 |-- Value: float (nullable = true)


Sample clean data:
+--------------------+--------------------+------+--------+
|             TagName|           EventTime|Status|   Value|
+--------------------+--------------------+------+--------+
|   A2PS64V0J.:ZUX09R| 2024-01-02 20:03:46|  Good|    0.34|
|   A2PS64V0J.:ZUX09R| 2024-01-02 16:00:12|  Good|    0.15|
|   A2PS64V0J.:ZUX09R| 2024-01-02 11:56:42|  Good|    0.13|
|   A2PS64V0J.:ZUX09R| 2024-01-02 07:53:11|  Good|    0.12|
|   A2PS64V0J.:ZUX09R| 2024-01-02 03:49:45|  Good|    0.13|
|-4O7LSSAM_3EA02:2...|2024-01-02 20:09:...|  Good|7107.821|
|_LT2EPL-9PM0.OROT...|2024-01-02 12:27:...|  Good| 19407.0|
|_LT2EPL-9PM0.OROT...|2024-01-02 05:23:...|  Good| 19403.0|
|_LT2EPL-9PM0.OROT...|2024-01-02 01:31:...|  Good| 19399.0|
|1N325T3M

In [56]:
# ============================================================================
# CATEGORICAL COLUMNS (TagName, Status) after cleaning
# ============================================================================

print("Unique TagNames:")
tag_counts = spark_df_clean.groupBy("TagName").count().orderBy(col("count").desc())
tag_counts.show(10, truncate=False)
print(f"Total unique TagNames: {tag_counts.count()}")

print("\nUnique Status values:")
status_counts = spark_df_clean.groupBy("Status").count().orderBy(col("count").desc())
status_counts.show(truncate=False)
print(f"Total unique Status: {status_counts.count()}")


Unique TagNames:
+---------------------------+------+
|TagName                    |count |
+---------------------------+------+
|T:4FU4PRBV3NO0V_0PLM40R-.34|553540|
|P0TON:H53Y3S.C1CL          |518277|
|0.3HN:CPOC5Y18BTPS         |518277|
|HO.T:C4F5C0O5U1YN          |518277|
|5H4C:S.YPLOTC1N00          |518277|
|S.LY7O:3PC0N5T1HC          |518277|
|OTHS1:5.TP3C0N9CY          |518277|
|4:HTSN1C4T0.CPY5O          |518277|
|5.CF:1H45CP0OYSNT          |518277|
|H951ASC.MO:YNEQ1I1         |518277|
+---------------------------+------+
only showing top 10 rows

Total unique TagNames: 6529

Unique Status values:
+-----------------+---------+
|Status           |count    |
+-----------------+---------+
|Good             |213642667|
|Bad              |782015   |
|Substituted, Good|253      |
|Questionable     |190      |
+-----------------+---------+

Total unique Status: 4


In [58]:
# ============================================================================
# NUMERIC COLUMN (Value) - DESCRIPTIVE STATS
# ============================================================================

from pyspark.sql.functions import mean, stddev, min as spark_min, max as spark_max, when

print("Value statistics (clean data):")
spark_df_clean.describe("Value").show()


Value statistics (clean data):
+-------+--------------------+
|summary|               Value|
+-------+--------------------+
|  count|           214425125|
|   mean|-6.90248730437452...|
| stddev|7.114647230934678...|
|    min|        -7.366718E20|
|    max|        6.2658613E12|
+-------+--------------------+



In [59]:
# ============================================================================
# TIME SERIES INFO
# ============================================================================

from pyspark.sql.functions import min as spark_min, max as spark_max, datediff

print("EventTime range:")
time_stats = spark_df_clean.select(
    spark_min("EventTime").alias("earliest"),
    spark_max("EventTime").alias("latest")
)
time_stats.show(truncate=False)

earliest = spark_df_clean.agg(spark_min("EventTime")).collect()[0][0]
latest = spark_df_clean.agg(spark_max("EventTime")).collect()[0][0]
duration_days = (latest - earliest).days
print(f"Duration: {duration_days} days")

EventTime range:
+-------------------+-----------------------+
|earliest           |latest                 |
+-------------------+-----------------------+
|2023-12-31 00:00:00|2024-07-27 23:59:57.573|
+-------------------+-----------------------+

Duration: 209 days


In [60]:
# ============================================================================
# RECORDS PER TAG
# ============================================================================

print("Records per TagName:")
records_per_tag = spark_df_clean.groupBy("TagName").count().orderBy(col("count").desc())
records_per_tag.show(15, truncate=False)

Records per TagName:
+---------------------------+------+
|TagName                    |count |
+---------------------------+------+
|T:4FU4PRBV3NO0V_0PLM40R-.34|553540|
|CHP.1TSA:OCN0P358Y         |518277|
|5H4C:S.YPLOTC1N00          |518277|
|2C4O5H01TS:Y.NPCF          |518277|
|C.OH0TU:C1Y4HON35          |518277|
|0:HC5Y013CP.SOTTN          |518277|
|H951ASC.MO:YNEQ1I1         |518277|
|YOP3T5.NC:112HSCT          |518277|
|5.CF:1H45CP0OYSNT          |518277|
|4:HTSN1C4T0.CPY5O          |518277|
|0.3HN:CPOC5Y18BTPS         |518277|
|:PCT01SN0LH.5Y3CO          |518277|
|P0TON:H53Y3S.C1CL          |518277|
|S:CF5PO.5H03Y1NTC          |518277|
|10YLCH3NS:.PTOC55          |518277|
+---------------------------+------+
only showing top 15 rows



In [61]:
# ============================================================================
# RECORDS OVER TIME
# ============================================================================

from pyspark.sql.functions import year, month, dayofmonth, hour

print("Records by year:")
spark_df_clean.groupBy(year("EventTime")).count().orderBy("year(EventTime)").show()

print("\nRecords by month (last 12 months):")
spark_df_clean.groupBy(year("EventTime"), month("EventTime")).count().orderBy(col("year(EventTime)").desc(), col("month(EventTime)").desc()).show(12)

Records by year:
+---------------+---------+
|year(EventTime)|    count|
+---------------+---------+
|           2023|  1086413|
|           2024|213338712|
+---------------+---------+


Records by month (last 12 months):
+---------------+----------------+--------+
|year(EventTime)|month(EventTime)|   count|
+---------------+----------------+--------+
|           2024|               7|27521992|
|           2024|               6|29788540|
|           2024|               5|31597385|
|           2024|               4|30520851|
|           2024|               3|31533923|
|           2024|               2|29902461|
|           2024|               1|32473560|
|           2023|              12| 1086413|
+---------------+----------------+--------+



In [62]:
# ============================================================================
# VALUE DISTRIBUTION BY TAG
# ============================================================================

print("Average Value by TagName:")
value_by_tag = spark_df_clean.groupBy("TagName").agg(
    mean("Value").alias("avg_value"),
    spark_min("Value").alias("min_value"),
    spark_max("Value").alias("max_value"),
    stddev("Value").alias("stddev_value")
).orderBy(col("avg_value").desc())
value_by_tag.show(10, truncate=False)

Average Value by TagName:
+-----------------------+--------------------+---------+------------+---------------------+
|TagName                |avg_value           |min_value|max_value   |stddev_value         |
+-----------------------+--------------------+---------+------------+---------------------+
|.3XVL3TP3:295PFX       |1.4524630515993804E8|3.3175936|6.2658613E12|3.0161312472654926E10|
|COG3NF02RQA7P0D.1:TV_CT|7132779.659987816   |5852479.0|8318120.5   |733436.6184894337    |
|C_TGD00:R7QA2VF1T.OCPN3|5910953.948570976   |302.0    |7861627.5   |1112582.0924773156   |
|TCF3O.RQ_:2AV01GP07DCNT|4098994.2432386125  |313.0    |5852479.0   |1061281.3811492713   |
|TA7F_2TPGCQ3DC:.OR1N0V0|3547827.2700363146  |0.0      |9022103.0   |3511438.738997856    |
|3QDRNC._:A7F2CGTOT0V0P1|3297653.7527593696  |0.0      |9024536.0   |3520902.7952661323   |
|NCAF_7P0T.CRDT103V2:GQO|3195330.259663534   |1956780.5|4424218.5   |721799.0720803908    |
|NG1VATP2DFO_C3T00.7:RCQ|2465483.455718564   |881127.2

In [63]:
# ============================================================================
# VALUE DISTRIBUTION BY STATUS
# ============================================================================

print("Value statistics by Status:")
value_by_status = spark_df_clean.groupBy("Status").agg(
    count("Value").alias("count"),
    mean("Value").alias("avg_value"),
    spark_min("Value").alias("min_value"),
    spark_max("Value").alias("max_value"),
    stddev("Value").alias("stddev_value")
).orderBy(col("count").desc())
value_by_status.show(truncate=False)

Value statistics by Status:
+-----------------+---------+---------------------+------------+------------+---------------------+
|Status           |count    |avg_value            |min_value   |max_value   |stddev_value         |
+-----------------+---------+---------------------+------------+------------+---------------------+
|Good             |213642667|-6.927767397004301E12|-7.366718E20|6.2658613E12|7.1276638835902336E16|
|Bad              |782015   |281.0936158513583    |238.0       |317.0       |30.63950909112452    |
|Substituted, Good|253      |27558.52905172887    |-43.131046  |381496.12   |90485.9167599752     |
|Questionable     |190      |15.39002905770352    |0.0         |202.5       |21.104189224728955   |
+-----------------+---------+---------------------+------------+------------+---------------------+



In [64]:
# ============================================================================
# OUTLIER DETECTION (Values > 3 std devs from mean)
# ============================================================================

from pyspark.sql.functions import lit

mean_val = spark_df_clean.agg(mean("Value")).collect()[0][0]
stddev_val = spark_df_clean.agg(stddev("Value")).collect()[0][0]

outlier_threshold_high = mean_val + (3 * stddev_val)
outlier_threshold_low = mean_val - (3 * stddev_val)

outliers = spark_df_clean.filter((col("Value") > outlier_threshold_high) | (col("Value") < outlier_threshold_low))
print(f"Outliers (>3 std devs): {outliers.count():,}")
print(f"Outlier percentage: {outliers.count() / spark_df_clean.count() * 100:.2f}%")
print("\nSample outliers:")
outliers.select("TagName", "EventTime", "Value", "Status").show(10)

Outliers (>3 std devs): 6
Outlier percentage: 0.00%

Sample outliers:
+---------------+-------------------+--------------+------+
|        TagName|          EventTime|         Value|Status|
+---------------+-------------------+--------------+------+
|V0R0.:0X715PNSX|2024-02-08 02:30:22|-1.64307321E18|  Good|
|0N:P7RXV031S0.X|2024-02-08 02:31:13|-1.64307321E18|  Good|
|0N:P7RXV031S0.X|2024-02-08 02:30:17|-1.64307321E18|  Good|
|VX0XN07P1.1:3RS|2024-02-08 02:31:13|  -7.366718E20|  Good|
|V0R0.:0X715PNSX|2024-02-08 02:31:18|-1.64307321E18|  Good|
|VX0XN07P1.1:3RS|2024-02-08 02:30:17|  -7.366718E20|  Good|
+---------------+-------------------+--------------+------+



In [65]:
# ============================================================================
# DATA QUALITY ISSUES
# ============================================================================

print("Data Quality Report:")
print(f"Total records: {spark_df_clean.count():,}")
print(f"Duplicate records (all columns): {spark_df_clean.count() - spark_df_clean.dropDuplicates().count():,}")
print(f"Records with null Value: {spark_df_clean.filter(col('Value').isNull()).count():,}")
print(f"Records with null EventTime: {spark_df_clean.filter(col('EventTime').isNull()).count():,}")
print(f"Records with null TagName: {spark_df_clean.filter(col('TagName').isNull()).count():,}")
print(f"Records with null Status: {spark_df_clean.filter(col('Status').isNull()).count():,}")

Data Quality Report:
Total records: 214,425,125
Duplicate records (all columns): 1,572
Records with null Value: 0
Records with null EventTime: 0
Records with null TagName: 0
Records with null Status: 0


In [66]:
# ============================================================================
# CORRELATION BETWEEN VALUE AND TIME (by tag)
# ============================================================================

from pyspark.sql.functions import unix_timestamp

print("Sample of data with unix timestamp for correlation analysis:")
spark_df_ts = spark_df_clean.withColumn("timestamp_unix", unix_timestamp("EventTime"))
spark_df_ts.select("TagName", "EventTime", "timestamp_unix", "Value").show(5)

Sample of data with unix timestamp for correlation analysis:
+-----------------+-------------------+--------------+-----+
|          TagName|          EventTime|timestamp_unix|Value|
+-----------------+-------------------+--------------+-----+
|A2PS64V0J.:ZUX09R|2024-01-02 20:03:46|    1704222226| 0.34|
|A2PS64V0J.:ZUX09R|2024-01-02 16:00:12|    1704207612| 0.15|
|A2PS64V0J.:ZUX09R|2024-01-02 11:56:42|    1704193002| 0.13|
|A2PS64V0J.:ZUX09R|2024-01-02 07:53:11|    1704178391| 0.12|
|A2PS64V0J.:ZUX09R|2024-01-02 03:49:45|    1704163785| 0.13|
+-----------------+-------------------+--------------+-----+
only showing top 5 rows

