In [46]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['PATH'] = f"{os.environ['JAVA_HOME']}/bin:{os.environ['PATH']}"

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta

In [47]:
def create_spark_session():
    return SparkSession.builder \
        .appName("FieldMillDataProcessing") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .config("spark.sql.session.timeZone", "UTC") \
        .getOrCreate()

In [48]:
def process_file(spark, file_path):
    try:
        # Read the file content
        with open(file_path, 'r') as file:
            lines = file.readlines()
        
        # Parse the base date and time from the header
        date_time = datetime.strptime(lines[0].strip(), "%m/%d/%Y %H:%M")
        base_hour = date_time.hour
        base_date = date_time.date()
        print(f"Base date and time: {date_time}")
        
        # Process the data lines
        data = []
        for line in lines[1:]:
            parts = line.strip().split(',')
            if len(parts) >= 4:
                try:
                    hour_index = int(parts[0])
                    minute = int(parts[1])
                    second = int(parts[2])
                    electric_field = float(parts[3])
                    
                    # Calculate the actual hour
                    actual_hour = base_hour + (hour_index - 1)
                    
                    # Handle day rollover if hour exceeds 23
                    timestamp = datetime.combine(base_date, time(0, 0, 0)) + timedelta(
                        hours=actual_hour, minutes=minute, seconds=second
                    )
                    
                    data.append((timestamp, electric_field))
                except ValueError as ve:
                    print(f"Skipping line due to ValueError: {line.strip()} - {ve}")
            else:
                print(f"Skipping line due to insufficient data: {line.strip()}")
        
        if not data:
            print(f"No valid data in file {file_path}")
            return None
    
        # Create a DataFrame from the processed data
        df = spark.createDataFrame(data, ["DATETIME", "ELECTRIC_FIELD"])
        
        return df
    except Exception as e:
        print(f"Error processing file {file_path}: {str(e)}")
        import traceback
        traceback.print_exc()
        return None

In [49]:
def process_month(spark, year, month):
    base_path = f"data/transformed/field_mill_50hz/{year}/{month:02d}"
    
    if not os.path.exists(base_path):
        print(f"No data found for {year}-{month:02d}")
        return None
    
    all_data = []
    total_files = 0
    processed_files = 0

    for day in sorted(os.listdir(base_path)):
        day_path = os.path.join(base_path, day)
        for hour_folder in sorted(os.listdir(day_path)):
            file_path = os.path.join(day_path, hour_folder, f"{hour_folder}.txt")
            total_files += 1
            df = process_file(spark, file_path)
            if df is not None:
                all_data.append(df)
                processed_files += 1
                print(f"Completed processing file: {file_path}")

    if not all_data:
        print(f"No data was successfully processed for {year}-{month:02d}")
        return None

    print(f"\nProcessed {processed_files} out of {total_files} files for {year}-{month:02d}.")

    monthly_df = all_data[0]
    for df in all_data[1:]:
        monthly_df = monthly_df.union(df)
    
    monthly_df = monthly_df.sort("DATETIME")
    
    return monthly_df

In [50]:
def process_year(spark, year):
    # Create the process folder if it doesn't exist
    os.makedirs(f"data/process/field_mill_data_{year}", exist_ok=True)
    
    for month in range(1, 13):
        monthly_df = process_month(spark, year, month)
        if monthly_df is not None:
            # Save the DataFrame for this month
            output_path = f"data/process/field_mill_data_{year}/month_{month:02d}"
            monthly_df.write.csv(output_path, header=True, mode="overwrite")
            print(f"Data for {year}-{month:02d} has been processed and saved to {output_path}")
        else:
            print(f"Skipping {year}-{month:02d} due to no data")
        
        # Clear cache to free up memory
        spark.catalog.clearCache()

    print(f"Processing complete for year {year}")

In [51]:
# Create Spark session
spark = create_spark_session()

# Process the data for 1996
process_year(spark, 1996)

# Stop the Spark session
spark.stop()

No data found for 1996-01
Skipping 1996-01 due to no data
No data found for 1996-02
Skipping 1996-02 due to no data
No data found for 1996-03
Skipping 1996-03 due to no data
No data found for 1996-04
Skipping 1996-04 due to no data
No data found for 1996-05
Skipping 1996-05 due to no data
No data found for 1996-06
Skipping 1996-06 due to no data
No data found for 1996-07
Skipping 1996-07 due to no data
No data found for 1996-08
Skipping 1996-08 due to no data
No data found for 1996-09
Skipping 1996-09 due to no data
Base date and time: 1996-10-19 17:00:00
Processed DataFrame (first 5 rows):
+-------------------+--------------+
|DATETIME           |ELECTRIC_FIELD|
+-------------------+--------------+
|1996-10-19 17:18:11|292.0         |
|1996-10-19 17:18:12|296.0         |
|1996-10-19 17:18:13|296.0         |
|1996-10-19 17:18:14|288.0         |
|1996-10-19 17:18:15|292.0         |
+-------------------+--------------+
only showing top 5 rows

Completed processing file: data/transformed/

24/10/02 18:14:56 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/02 18:17:00 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/02 18:18:58 WARN DAGScheduler: Broadcasting large task binary with size 1550.6 KiB
                                                                                

Data for 1996-10 has been processed and saved to data/process/field_mill_data_1996/month_10
Base date and time: 1996-11-04 01:30:00
Processed DataFrame (first 5 rows):
+-------------------+--------------+
|DATETIME           |ELECTRIC_FIELD|
+-------------------+--------------+
|1996-11-04 02:01:52|212.0         |
|1996-11-04 02:01:53|212.0         |
|1996-11-04 02:01:54|212.0         |
|1996-11-04 02:01:55|216.0         |
|1996-11-04 02:01:56|220.0         |
+-------------------+--------------+
only showing top 5 rows

Completed processing file: data/transformed/field_mill_50hz/1996/11/04/199611040130/199611040130.txt
Base date and time: 1996-11-04 02:00:00
Processed DataFrame (first 5 rows):
+-------------------+--------------+
|DATETIME           |ELECTRIC_FIELD|
+-------------------+--------------+
|1996-11-04 02:00:00|208.0         |
|1996-11-04 02:00:01|212.0         |
|1996-11-04 02:00:02|216.0         |
|1996-11-04 02:00:03|216.0         |
|1996-11-04 02:00:04|216.0         |
+