# Information:
## Data Download: 

wget -r -N -c -np https://physionet.org/files/tappy/1.0.0/

## Data explanation:

https://physionet.org/content/tappy/1.0.0/

## Original Article:

https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0188226#pone-0188226-t003

# Importing stuff

In [37]:
import os
import pandas as pd
import glob
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType, StringType, IntegerType, FloatType


In [38]:
!pwd

/home/biomedmax/final_project_ironhack


In [39]:
!pwd



/home/biomedmax/final_project_ironhack


In [40]:
# Same but in form of function

def read_data(filepath):
    data = pd.read_csv(filepath, delimiter="\t", header = None)
    data.columns = ['UserKey', 'Date', 'Timestamp', 'Hand', 'HoldTime', 'Direction', 'LatencyTime', 'FlightTime', "Remove"]

    data.drop("Remove", axis =1, inplace = True)
    return data

In [41]:
#data = read_data("physionetdotorg/files/tappy/1/extracted/TappyData/ZWBPPNQCUX_1608.txt")
data = read_data("physionetdotorg/files/maksymdata/MAKSYM0000_1708.txt")

In [42]:
data

Unnamed: 0,UserKey,Date,Timestamp,Hand,HoldTime,Direction,LatencyTime,FlightTime
0,MAKSYM0000,170824,10:10:10.111,R,77.96,LR,169.87,96.40
1,MAKSYM0000,170824,10:10:10.111,R,76.26,RR,375.79,297.83
2,MAKSYM0000,170824,10:10:10.111,S,101.36,LS,88.34,132.53
3,MAKSYM0000,170824,10:10:10.111,S,93.00,LS,88.34,132.53
4,MAKSYM0000,170824,10:10:10.111,L,63.72,SL,185.22,92.22
...,...,...,...,...,...,...,...,...
171,MAKSYM0000,170824,10:10:10.111,L,73.77,SL,246.07,150.86
172,MAKSYM0000,170824,10:10:10.111,L,84.69,LL,67.18,153.42
173,MAKSYM0000,170824,10:10:10.111,L,123.19,LL,67.18,153.42
174,MAKSYM0000,170824,10:10:10.111,L,99.03,LL,147.24,24.05


In [43]:
# Function to remove leading zeros from a float represented as a string
def remove_leading_zeros_from_float(value):
    # Check if the value is a string and if it contains a leading zero
    if isinstance(value, str) and value.lstrip('0').replace('.', '').isdigit():
        return float(value.lstrip('0'))
    return value

# Apply changes to the columns that need it
def apply_remove_leading_zeros_from_float(data):
    if 'HoldTime' in data.columns:
        data['HoldTime'] = data['HoldTime'].apply(remove_leading_zeros_from_float)
    
    if 'LatencyTime' in data.columns:
        data['LatencyTime'] = data['LatencyTime'].apply(remove_leading_zeros_from_float)
    
    if 'FlightTime' in data.columns:
        data['FlightTime'] = data['FlightTime'].apply(remove_leading_zeros_from_float)

apply_remove_leading_zeros_from_float(data)



In [44]:
print(data.head())


      UserKey    Date     Timestamp Hand  HoldTime Direction  LatencyTime  \
0  MAKSYM0000  170824  10:10:10.111    R     77.96        LR       169.87   
1  MAKSYM0000  170824  10:10:10.111    R     76.26        RR       375.79   
2  MAKSYM0000  170824  10:10:10.111    S    101.36        LS        88.34   
3  MAKSYM0000  170824  10:10:10.111    S     93.00        LS        88.34   
4  MAKSYM0000  170824  10:10:10.111    L     63.72        SL       185.22   

   FlightTime  
0       96.40  
1      297.83  
2      132.53  
3      132.53  
4       92.22  


In [45]:
data.dtypes

UserKey         object
Date             int64
Timestamp       object
Hand            object
HoldTime       float64
Direction       object
LatencyTime    float64
FlightTime     float64
dtype: object

In [46]:
len(data)

176

### Add quality control to each row. Delete those that dont qualify

In [47]:
# Function to validate each row
def validate_row(row):
    if not (isinstance(row['UserKey'], str) and len(row['UserKey']) == 10 and row['UserKey'].isalnum()):
        return False

    
    
    if not (isinstance(row['Date'], int) and len(str(row['Date'])) == 6):
        return False



    try:
        pd.to_datetime(str(row['Date']), format='%d%m%y')
    except ValueError:
        return False
    if not (isinstance(row['Timestamp'], str) and pd.to_datetime(row['Timestamp'], format='%H:%M:%S.%f', errors='coerce') is not pd.NaT):
        return False
    if not (row['Hand'] in ['L', 'R', 'S']):
        return False
    if not (isinstance(row['HoldTime'], float) or isinstance(row['HoldTime'], int)):
        return False
    if not (isinstance(row['Direction'], str) and len(row['Direction']) == 2 and all(char in ['L', 'R', 'S'] for char in row['Direction'])):
        return False
    if not (isinstance(row['LatencyTime'], float) or isinstance(row['LatencyTime'], int)):
        return False
    if not (isinstance(row['FlightTime'], float) or isinstance(row['FlightTime'], int)):
        return False
        
    return True

# Apply the validation function to each row
data['Valid'] = data.apply(validate_row, axis=1)

# Filter out invalid rows
data = data[data['Valid'] == True]

# Drop the 'Valid' column as it's no longer needed
data = data.drop(columns=['Valid'])

In [48]:
len(data)

176

In [49]:
print(data.head())


      UserKey    Date     Timestamp Hand  HoldTime Direction  LatencyTime  \
0  MAKSYM0000  170824  10:10:10.111    R     77.96        LR       169.87   
1  MAKSYM0000  170824  10:10:10.111    R     76.26        RR       375.79   
2  MAKSYM0000  170824  10:10:10.111    S    101.36        LS        88.34   
3  MAKSYM0000  170824  10:10:10.111    S     93.00        LS        88.34   
4  MAKSYM0000  170824  10:10:10.111    L     63.72        SL       185.22   

   FlightTime  
0       96.40  
1      297.83  
2      132.53  
3      132.53  
4       92.22  


In [50]:
len(data["Date"])

176

### The datatypes are weird. Going to change them

In [51]:
def variable_conversion_function(data):

    # Convert 'Timestamp' to datetime with format 'HH:MM:SS.SSS' and extract time
    data['Timestamp'] = pd.to_datetime(data['Timestamp'], format='%H:%M:%S.%f').dt.time
    
    # Convert 'UserKey', 'Hand', and 'Direction' to string
    data['UserKey'] = data['UserKey'].astype(str)
    data['Hand'] = data['Hand'].astype(str)
    data['Direction'] = data['Direction'].astype(str)
    
    # Convert the rest to float
    data['HoldTime'] = data['HoldTime'].astype(float)
    data['LatencyTime'] = data['LatencyTime'].astype(float)
    data['FlightTime'] = data['FlightTime'].astype(float)

    return data

variable_conversion_function(data)

# Verify the changes
print(data.dtypes)
print(data)

UserKey         object
Date             int64
Timestamp       object
Hand            object
HoldTime       float64
Direction       object
LatencyTime    float64
FlightTime     float64
dtype: object
        UserKey    Date        Timestamp Hand  HoldTime Direction  \
0    MAKSYM0000  170824  10:10:10.111000    R     77.96        LR   
1    MAKSYM0000  170824  10:10:10.111000    R     76.26        RR   
2    MAKSYM0000  170824  10:10:10.111000    S    101.36        LS   
3    MAKSYM0000  170824  10:10:10.111000    S     93.00        LS   
4    MAKSYM0000  170824  10:10:10.111000    L     63.72        SL   
..          ...     ...              ...  ...       ...       ...   
171  MAKSYM0000  170824  10:10:10.111000    L     73.77        SL   
172  MAKSYM0000  170824  10:10:10.111000    L     84.69        LL   
173  MAKSYM0000  170824  10:10:10.111000    L    123.19        LL   
174  MAKSYM0000  170824  10:10:10.111000    L     99.03        LL   
175  MAKSYM0000  170824  10:10:10.111000   

In [52]:
data["Date"]

0      170824
1      170824
2      170824
3      170824
4      170824
        ...  
171    170824
172    170824
173    170824
174    170824
175    170824
Name: Date, Length: 176, dtype: int64

### Calculate the metrics

In [53]:
datagrouped = data.groupby(["UserKey", "Hand" , "Direction"]).mean(["HoldTime","LatencyTime","FlightTime"])

In [54]:
datagrouped

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Date,HoldTime,LatencyTime,FlightTime
UserKey,Hand,Direction,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
MAKSYM0000,L,LL,170824.0,93.4112,175.358,189.4208
MAKSYM0000,L,RL,170824.0,84.521071,201.381429,124.586429
MAKSYM0000,L,SL,170824.0,85.125333,265.536667,174.632667
MAKSYM0000,R,LR,170824.0,82.6152,164.6608,101.4848
MAKSYM0000,R,RR,170824.0,89.300952,196.14619,114.13381
MAKSYM0000,R,SR,170824.0,456.628824,268.998824,252.660588
MAKSYM0000,S,LS,170824.0,94.524483,250.620345,205.966897
MAKSYM0000,S,RS,170824.0,109.202,154.417333,190.259333
MAKSYM0000,S,SS,170824.0,42.15,519.53,397.24


In [55]:
#Drop date since it is irrelevant
datagrouped = datagrouped.drop("Date", axis = 1)

In [56]:
datagrouped

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,HoldTime,LatencyTime,FlightTime
UserKey,Hand,Direction,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
MAKSYM0000,L,LL,93.4112,175.358,189.4208
MAKSYM0000,L,RL,84.521071,201.381429,124.586429
MAKSYM0000,L,SL,85.125333,265.536667,174.632667
MAKSYM0000,R,LR,82.6152,164.6608,101.4848
MAKSYM0000,R,RR,89.300952,196.14619,114.13381
MAKSYM0000,R,SR,456.628824,268.998824,252.660588
MAKSYM0000,S,LS,94.524483,250.620345,205.966897
MAKSYM0000,S,RS,109.202,154.417333,190.259333
MAKSYM0000,S,SS,42.15,519.53,397.24


### I generate 27 features this way. I want each feature to be a column instead of a row. So I need to reprocess the dataframe a little bit

In [57]:
# Reset index to convert index levels back to columns
datagrouped = datagrouped.reset_index()

# Pivot the table to transpose it
pivoted = datagrouped.pivot_table(index="UserKey", columns=["Hand", "Direction"], values=["HoldTime", "LatencyTime", "FlightTime"])

# Flatten the multi-level column index
pivoted.columns = ['-'.join([hand, direction, metric]) for metric, hand, direction in pivoted.columns]

# Reset index to make 'UserKey' a column again
pivoted = pivoted.reset_index()

# Display the resulting DataFrame
print(pivoted)


      UserKey  L-LL-FlightTime  L-RL-FlightTime  L-SL-FlightTime  \
0  MAKSYM0000         189.4208       124.586429       174.632667   

   R-LR-FlightTime  R-RR-FlightTime  R-SR-FlightTime  S-LS-FlightTime  \
0         101.4848        114.13381       252.660588       205.966897   

   S-RS-FlightTime  S-SS-FlightTime  ...  S-SS-HoldTime  L-LL-LatencyTime  \
0       190.259333           397.24  ...          42.15           175.358   

   L-RL-LatencyTime  L-SL-LatencyTime  R-LR-LatencyTime  R-RR-LatencyTime  \
0        201.381429        265.536667          164.6608         196.14619   

   R-SR-LatencyTime  S-LS-LatencyTime  S-RS-LatencyTime  S-SS-LatencyTime  
0        268.998824        250.620345        154.417333            519.53  

[1 rows x 28 columns]


In [58]:
#In form of function

def generate_new_features(data):
    datagrouped = data.groupby(["UserKey", "Hand" , "Direction"]).mean(["HoldTime","LatencyTime","FlightTime"])

    if "Date" in datagrouped.columns and not datagrouped["Date"].empty:
        datagrouped = datagrouped.drop("Date", axis = 1)

    
    # Reset index to convert index levels back to columns
    datagrouped = datagrouped.reset_index()
    
    # Pivot the table to transpose it
    pivoted = datagrouped.pivot_table(index="UserKey", columns=["Hand", "Direction"], values=["HoldTime", "LatencyTime", "FlightTime"])
    
    # Flatten the multi-level column index
    pivoted.columns = ['-'.join([hand, direction, metric]) for metric, hand, direction in pivoted.columns]
    
    # Reset index to make 'UserKey' a column again
    pivoted = pivoted.reset_index()

    return pivoted

generate_new_features(data)

Unnamed: 0,UserKey,L-LL-FlightTime,L-RL-FlightTime,L-SL-FlightTime,R-LR-FlightTime,R-RR-FlightTime,R-SR-FlightTime,S-LS-FlightTime,S-RS-FlightTime,S-SS-FlightTime,...,S-SS-HoldTime,L-LL-LatencyTime,L-RL-LatencyTime,L-SL-LatencyTime,R-LR-LatencyTime,R-RR-LatencyTime,R-SR-LatencyTime,S-LS-LatencyTime,S-RS-LatencyTime,S-SS-LatencyTime
0,MAKSYM0000,189.4208,124.586429,174.632667,101.4848,114.13381,252.660588,205.966897,190.259333,397.24,...,42.15,175.358,201.381429,265.536667,164.6608,196.14619,268.998824,250.620345,154.417333,519.53


In [59]:
data.shape[1]

8

### Join the patient information to the tappy information

In [60]:
# Function to read the file and get the value from the Nth row
def get_value_from_file(user_key, row_number):
    #file_path = f'physionetdotorg/files/tappy/1/extracted/Archivedusers/User_{user_key}.txt'
    file_path = f'physionetdotorg/files/maksymdata/User_{user_key}.txt'
    try:
        # Read the file
        with open(file_path, 'r') as file:
            lines = file.readlines()
            if len(lines) >= row_number + 1:  # Check if the file has enough lines
                # Split the line by space and get the second column (index 1)
                return lines[row_number].split(":")[1].strip("\n")
            else:
                return None  # Return None if the row_number is out of bounds
    except FileNotFoundError:
        return None  # Handle the case where the file does not exist

# Apply the function to each row to get the new column value
def join_all_info_apply(data):
    pivoted['BirthYear'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 0))
    pivoted['Gender'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 1))
    pivoted['Tremors'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 3))
    pivoted['DiagnosisYear'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 4))
    pivoted['Sided'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 5))
    pivoted['UPDRS'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 6))
    pivoted['Impact'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 7))
    pivoted['Levadopa'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 8))
    pivoted['DA'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 9))
    pivoted['MAOB'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 10))
    pivoted['Other'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 11))
    pivoted['Parkinsons'] = pivoted['UserKey'].apply(lambda x: get_value_from_file(x, 2))
    return pivoted

join_all_info_apply(pivoted)

# Display the resulting DataFrame
print(pivoted)

      UserKey  L-LL-FlightTime  L-RL-FlightTime  L-SL-FlightTime  \
0  MAKSYM0000         189.4208       124.586429       174.632667   

   R-LR-FlightTime  R-RR-FlightTime  R-SR-FlightTime  S-LS-FlightTime  \
0         101.4848        114.13381       252.660588       205.966897   

   S-RS-FlightTime  S-SS-FlightTime  ...  Tremors  DiagnosisYear  Sided  \
0       190.259333           397.24  ...    False         ------   None   

         UPDRS   Impact  Levadopa      DA    MAOB   Other  Parkinsons  
0   Don't know   ------     False   False   False   False       False  

[1 rows x 40 columns]


# !!! ONLY FOR MAKSYM DATA !!!

In [61]:
# Save the resulting DataFrame to a CSV file
output_file_path = 'physionetdotorg/files/maksymdata/maksy_info.csv'
pivoted.to_csv(output_file_path, index=False)

# Display the resulting DataFrame
print(pivoted)

      UserKey  L-LL-FlightTime  L-RL-FlightTime  L-SL-FlightTime  \
0  MAKSYM0000         189.4208       124.586429       174.632667   

   R-LR-FlightTime  R-RR-FlightTime  R-SR-FlightTime  S-LS-FlightTime  \
0         101.4848        114.13381       252.660588       205.966897   

   S-RS-FlightTime  S-SS-FlightTime  ...  Tremors  DiagnosisYear  Sided  \
0       190.259333           397.24  ...    False         ------   None   

         UPDRS   Impact  Levadopa      DA    MAOB   Other  Parkinsons  
0   Don't know   ------     False   False   False   False       False  

[1 rows x 40 columns]


In [62]:
print(pivoted["UserKey"])

0    MAKSYM0000
Name: UserKey, dtype: object


In [63]:
print(pivoted)

      UserKey  L-LL-FlightTime  L-RL-FlightTime  L-SL-FlightTime  \
0  MAKSYM0000         189.4208       124.586429       174.632667   

   R-LR-FlightTime  R-RR-FlightTime  R-SR-FlightTime  S-LS-FlightTime  \
0         101.4848        114.13381       252.660588       205.966897   

   S-RS-FlightTime  S-SS-FlightTime  ...  Tremors  DiagnosisYear  Sided  \
0       190.259333           397.24  ...    False         ------   None   

         UPDRS   Impact  Levadopa      DA    MAOB   Other  Parkinsons  
0   Don't know   ------     False   False   False   False       False  

[1 rows x 40 columns]


In [36]:
pivoted["Gender"]

KeyError: 'Gender'

# Do it for all the files in directory

The following piece of code is basically the amalgamation of everything that came before it but with loops applies so its done for ALL the files of directory and also writting the file to store the joined data

In [30]:
# Define the path to the directory containing the .txt files
directory_path = "physionetdotorg/files/tappy/1/extracted/TappyData/"
file_list = glob.glob(os.path.join(directory_path, "*.txt"))

# Define the maximum number of lines allowed
max_lines = 8_000_000

# Open the output file in append mode (CSV format)
with open("physionetdotorg/files/tappy/1/processed_10082024.csv", 'a') as output_file:
    header_written = False  # Flag to check if header is written

    for filepath in file_list:
        # Count the number of lines in the file
        with open(filepath, 'r') as file:
            line_count = sum(1 for line in file)
        
        # Check if the file exceeds the maximum allowed lines
        if line_count > max_lines:
            print(f"Skipping {filepath} as it exceeds {max_lines} lines.")
            continue  # Skip to the next file
    
        print(f"Working on: {filepath}")
        data1 = read_data(filepath)
        
        apply_remove_leading_zeros_from_float(data1)
        
        # Apply the validation function to each row
        data1['Valid'] = data1.apply(validate_row, axis=1)
        
        # Filter out invalid rows
        data1 = data1[data1['Valid'] == True]
        
        # Drop the 'Valid' column as it's no longer needed
        data1 = data1.drop(columns=['Valid'])
    
        if len(data1) > 0:
            data3 = variable_conversion_function(data1)
            pivoted = generate_new_features(data3)

            print(pivoted.shape[1])
            if pivoted.shape[1] == 28:
                
                pivoted = join_all_info_apply(pivoted)
        
                # Write to output CSV file
                if not header_written:
                    # Write the header once
                    pivoted.to_csv(output_file, index=False, header=True)
                    header_written = True
                else:
                    # Append data without the header
                    pivoted.to_csv(output_file, index=False, header=False)
                
                print(f"Processed data written to output file.")


Working on: physionetdotorg/files/tappy/1/extracted/TappyData/QAH9IVALVC_1612.txt
Working on: physionetdotorg/files/tappy/1/extracted/TappyData/ZWBPPNQCUX_1608.txt
16
Working on: physionetdotorg/files/tappy/1/extracted/TappyData/MG8XVA5BFA_1611.txt


KeyboardInterrupt: 

# Comparing code exexution with VS without spark

I will use a smaller subset of files so the run is not so intense. I will count the time and try to parallelize processes with spark.

## No Spark

In [31]:
start_time = time.time()

# Define the path to the directory containing the .txt files
directory_path = "physionetdotorg/files/tappy/1/extracted/TappyData/"
file_list = glob.glob(os.path.join(directory_path, "*.txt"))

# Define the maximum number of lines allowed
max_lines = 80000

# Open the output file in append mode (CSV format)
with open("physionetdotorg/files/tappy/1/processed_test_no_spark.csv", 'a') as output_file:
    header_written = False  # Flag to check if header is written

    for filepath in file_list:
        # Count the number of lines in the file
        with open(filepath, 'r') as file:
            line_count = sum(1 for line in file)
        
        # Check if the file exceeds the maximum allowed lines
        if line_count > max_lines:
            continue  # Skip to the next file
    
        data1 = read_data(filepath)
        
        apply_remove_leading_zeros_from_float(data1)
        
        # Apply the validation function to each row
        data1['Valid'] = data1.apply(validate_row, axis=1)
        
        # Filter out invalid rows
        data1 = data1[data1['Valid'] == True]
        
        # Drop the 'Valid' column as it's no longer needed
        data1 = data1.drop(columns=['Valid'])
    
        if len(data1) > 0:
            data3 = variable_conversion_function(data1)
            pivoted = generate_new_features(data3)

            if pivoted.shape[1] == 28:
                
                pivoted = join_all_info_apply(pivoted)
        
                # Write to output CSV file
                if not header_written:
                    # Write the header once
                    pivoted.to_csv(output_file, index=False, header=True)
                    header_written = True
                else:
                    # Append data without the header
                    pivoted.to_csv(output_file, index=False, header=False)
                

print("With no Spark it took --- %s seconds ---" % (time.time() - start_time))


  data = pd.read_csv(filepath, delimiter="\t", header = None)
  data = pd.read_csv(filepath, delimiter="\t", header = None)
  data = pd.read_csv(filepath, delimiter="\t", header = None)
  data = pd.read_csv(filepath, delimiter="\t", header = None)


With no Spark it took --- 2783.942713022232 seconds ---


## With Spark

In [32]:
start_time = time.time()

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("TappyDataProcessing") \
    .getOrCreate()


# Define the path to the directory containing the .txt files
directory_path = "physionetdotorg/files/tappy/1/extracted/TappyData/"
file_list = glob.glob(os.path.join(directory_path, "*.txt"))

# Define the maximum number of lines allowed
max_lines = 80000

# Function to process each file
def process_file(filepath):
    print(f"Processing file: {filepath}")  # Log the file being processed
    
    # Count the number of lines in the file
    with open(filepath, 'r') as file:
        line_count = sum(1 for line in file)
    
    # Skip files exceeding the maximum allowed lines
    if line_count > max_lines:
        print(f"Skipping file {filepath} due to excess line count: {line_count}")
        return None
    
    # Read data from the file
    data1 = read_data(filepath)
    
    # Apply transformations and filtering
    apply_remove_leading_zeros_from_float(data1)
    data1['Valid'] = data1.apply(validate_row, axis=1)
    data1 = data1[data1['Valid'] == True].drop(columns=['Valid'])
    
    # If there's data after filtering, continue processing
    if len(data1) > 0:
        data3 = variable_conversion_function(data1)
        pivoted = generate_new_features(data3)
        
        # Ensure the DataFrame has the expected number of columns
        if len(pivoted.columns) == 28:
            pivoted = join_all_info_apply(pivoted)
            return pivoted
    
    return None

# Parallelize the file list and process files
rdd = spark.sparkContext.parallelize(file_list)

# Process each file and filter out None results
processed_data_rdd = rdd.map(process_file).filter(lambda x: x is not None)

# Collect the processed Pandas DataFrames
processed_data_list = processed_data_rdd.collect()

# Convert the list of Pandas DataFrames to a single Pandas DataFrame
if processed_data_list:
    print(f"Number of processed DataFrames: {len(processed_data_list)}")
    
    # Concatenate all the Pandas DataFrames
    combined_df = pd.concat(processed_data_list, ignore_index=True)
    
    # Define the output CSV file path
    output_csv_path = "physionetdotorg/files/tappy/1/processed_test_spark.csv"
    
    # Write the final DataFrame to a single CSV file
    if not os.path.exists(output_csv_path):
        # If the file doesn't exist, write with the header
        combined_df.to_csv(output_csv_path, index=False, mode='w', header=True)
    else:
        # If the file exists, append without the header
        combined_df.to_csv(output_csv_path, index=False, mode='a', header=False)

print("With Spark it took --- %s seconds ---" % (time.time() - start_time))

# Stop the Spark session
spark.stop()

24/08/20 20:52:11 WARN Utils: Your hostname, MojoDojoCasaPC resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/08/20 20:52:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/20 20:52:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Processing file: physionetdotorg/files/tappy/1/extracted/TappyData/9ALACHPPUR_1702.txtProcessing file: physionetdotorg/files/tappy/1/extracted/TappyData/UUIZH9TDXR_1610.txt
Processing file: physionetdotorg/files/tappy/1/extracted/TappyData/FXC5YFXZ0K_1610.txt

Processing file: physionetdotorg/files/tappy/1/extracted/TappyData/UDCY90VKYN_1610.txt
Processing file: physionetdotorg/files/tappy/1/extracted/TappyData/STW9PUPHWN_1609.txt
Processing file: physionetdotorg/files/tappy/1/extract

Number of processed DataFrames: 394
With Spark it took --- 152.65561532974243 seconds ---


With no Spark it took --- 2783.942713022232 seconds ---

With Spark it took --- 152.65561532974243 seconds ---
