In [1]:
import pandas as pd
import os
import dask.bag as db
from dask.distributed import Client

# Define the directory containing the CSV files
directory = r'D:\SamsungSTF\Processed_Data\TripByTrip'
output_path = r'C:\Users\BSL\Desktop\result_files.csv'

# Function to check if a file contains a period where speed is 0 for 100 seconds or more
def check_file(filepath):
    try:
        df = pd.read_csv(filepath)
        if 'speed' in df.columns and 'time' in df.columns:
            # Convert the time column to datetime
            df['time'] = pd.to_datetime(df['time'])
            
            # Ensure the DataFrame is sorted by time
            df = df.sort_values(by='time')
            
            # Initialize variables to track duration of speed == 0
            zero_speed_start = None
            zero_speed_periods = []
            
            for i in range(len(df)):
                if df.iloc[i]['speed'] == 0:
                    if zero_speed_start is None:
                        zero_speed_start = df.iloc[i]['time']
                else:
                    if zero_speed_start is not None:
                        zero_speed_duration = (df.iloc[i]['time'] - zero_speed_start).total_seconds()
                        if zero_speed_duration >= 300:
                            zero_speed_periods.append(zero_speed_duration)
                        zero_speed_start = None

            # Check if the last period of zero speed met the criteria
            if zero_speed_start is not None:
                zero_speed_duration = (df.iloc[-1]['time'] - zero_speed_start).total_seconds()
                if zero_speed_duration >= 300:
                    zero_speed_periods.append(zero_speed_duration)
            
            if zero_speed_periods:
                return os.path.basename(filepath)
    except Exception as e:
        print(f"Error processing file {filepath}: {e}")
    return None

def main():
    # Start a Dask client with a specified port
    client = Client(dashboard_address=':7868')  # You can choose any available port here

    # Get list of all CSV files in the directory
    csv_files = [os.path.join(directory, filename) for filename in os.listdir(directory) if filename.endswith(".csv")]

    # Create a Dask bag of the file paths
    file_bag = db.from_sequence(csv_files, npartitions=32)
    
    # Map the check_file function to the file paths
    results = file_bag.map(check_file).compute()

    # Filter out None results
    files_with_long_stops = [result for result in results if result]
    
    # Save results to a CSV file
    result_df = pd.DataFrame(files_with_long_stops, columns=['filename'])
    result_df.to_csv(output_path, index=False)

    return files_with_long_stops

if __name__ == "__main__":
    result_files = main()