In [0]:
from pyspark.sql import SparkSession
import time

def validate_and_map(row):
    # row is [line_number, c0, c1]
    if len(row) >= 3:
        return (row[1], (row[0], row[2]))
    else:
        return None

def inside_accomodation(x_position, left, right):
    return float(x_position) > left and float(x_position) < right

def inside_incident(x_position, left, right):
    return float(x_position) > left and float(x_position) < right

def find_accomodation_or_incident(id, values, cutoff_index, accom_left, accom_right, inc_left, inc_right):
    if not values:
        return 0, 0

    sorted_values = sorted(values, key=lambda x: int(x[0]))
    accomodation_count = 0
    incident_count = 0
    current_state = -1

    for line, x_position in sorted_values:
        if inside_accomodation(x_position, accom_left, accom_right):
            if current_state == 1:
                accomodation_count += 1
                current_state = 2
        elif inside_incident(x_position, inc_left, inc_right):
            if current_state == 0 and int(line) <= cutoff_index:
                incident_count += 1
                current_state = 1
        else:
            current_state = 0

    return incident_count, accomodation_count

# Start
start_time = time.time()
printable_string = ''

set_count_to_deduct_for_incident = 500

# Read from Databricks table
df = spark.table("hive_metastore.default.mydata_2_txt")
# Assume columns: c0 (id as string), c1 (position as string)
rdd = df.rdd.map(lambda row: (row['_c0'], row['_c1']))

# Add line numbers using zipWithIndex
indexed_rdd = rdd.zipWithIndex().map(lambda x: [x[1] + 1, x[0][0], x[0][1]])

# Filter only lines that contain 'NaN'
nan_lines_rdd = indexed_rdd.filter(lambda x: 'NaN' in x)

# Get the first NaN line if exists, else use dataset length
first_nan = nan_lines_rdd.take(1)  # This does not collect entire RDD, just the first NaN line
if first_nan:
    cutoff_index = first_nan[0][0]  # line number of the first NaN occurrence
else:
    # If no NaN found, the cutoff is the entire dataset length
    total_lines = indexed_rdd.count()  # Count of the entire dataset
    cutoff_index = total_lines

# If we need to adjust cutoff_index based on the last 500 NaN lines:
nan_line_numbers_rdd = nan_lines_rdd.map(lambda x: x[0])
nan_count = nan_line_numbers_rdd.count()

if set_count_to_deduct_for_incident > 0 and nan_count >= set_count_to_deduct_for_incident:
    # Get the highest (largest line number) NaN lines using 'top'
    last_500_nan = nan_line_numbers_rdd.top(set_count_to_deduct_for_incident)
    # This returns up to 500 largest NaN line numbers.
    # We want the smallest line number among these "top" NaN lines as the cutoff.
    new_cutoff = min(last_500_nan)
    cutoff_index = new_cutoff

# Filter out lines with NaN for further processing
accomodation_cleaned_rdd = indexed_rdd.filter(lambda row: 'NaN' not in row)
accomodation_key_value_rdd = accomodation_cleaned_rdd.map(validate_and_map).filter(lambda x: x is not None)
accomodation_grouped_rdd = accomodation_key_value_rdd.groupByKey()

# Define planes
accomodation_plane_Right = 73.2186
accomodation_plane_Left = 2.709
incident_plane_Right = accomodation_plane_Right + 20
incident_plane_Left = accomodation_plane_Left - 20

accomodation_rdd_result = accomodation_grouped_rdd.map(
    lambda x: [x[0], find_accomodation_or_incident(
        x[0], list(x[1]), cutoff_index,
        accomodation_plane_Left, accomodation_plane_Right,
        incident_plane_Left, incident_plane_Right
    )]
)

total_counts = accomodation_rdd_result.map(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
total_incidents, total_accommodations = total_counts
overall_ratio = total_accommodations / total_incidents if total_incidents != 0 else "Infinity"

# Print results
printable_string += f"Total Accommodations: {total_accommodations}\n"
printable_string += f"Total Incidents: {total_incidents}\n"
printable_string += f"Overall Accommodation-to-Incident Ratio: {overall_ratio}\n"

end_time = time.time()
elapsed_time = end_time - start_time
printable_string += f"Elapsed time: {elapsed_time:.6f} seconds\n"

print(printable_string)

Total Accommodations: 28
Total Incidents: 30
Overall Accommodation-to-Incident Ratio: 0.9333333333333333
Elapsed time: 266.804359 seconds

