In [0]:
word_df = spark.sparkContext.parallelize(["akshaaya"])

# Count occurrences of 'a'
a_count = word_df \
    .flatMap(lambda word: list(word)) \
    .filter(lambda char: char == 'a') \
    .count()

print(f"Number of 'a' characters: {a_count}")

In [0]:
from pyspark.sql.functions import explode, split

In [0]:
input_data = [(1, '10#23#45')]
input_schema = ['id', 'value']

input_df = spark.createDataFrame(input_data, input_schema)

input_df.withColumn('value', explode(split('value', '#'))).show()

In [0]:
test_record = [
    ('hdfs', '2025-03-06 12:00:00', 'up'),
    ('hdfs', '2025-03-06 12:01:00', 'up'),
    ('hdfs', '2025-03-06 12:02:00', 'down'),
    ('hdfs', '2025-03-06 12:03:00', 'down'),
    ('hdfs', '2025-03-06 12:04:00', 'down'),
    ('hdfs', '2025-03-06 12:05:00', 'down'),
    ('hdfs', '2025-03-06 12:06:00', 'down'),
    ('hdfs', '2025-03-06 12:07:00', 'up'),
    ('hdfs', '2025-03-06 12:08:00', 'up'),
    ('hdfs', '2025-03-06 12:09:00', 'down'),
    ('hdfs', '2025-03-06 12:10:00', 'down')
]

test_schema = ['host', 'updated_time', 'status']

test_df = spark.createDataFrame(test_record, test_schema)
display(test_df)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, col

window = Window.partitionBy('host').orderBy('updated_time')

lvl1 = test_df.withColumn('prev_status', lag('status').over(window)).withColumn('next_status', lead('status').over(window))

lvl1.filter(((col('prev_status') == 'up') | (col('prev_status').isNull())) | ((col('next_status') == 'up') | (col('next_status').isNull())) & (col('status') == 'down')).show()

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col, when, count, first, last, row_number, sum

# First, let's create a window spec ordered by time
w = Window.partitionBy('host').orderBy('updated_time')

# Create a column to identify status changes
df_with_changes = test_df.withColumn(
    'status_changed',
    when(
        lag('status').over(w) != col('status'),
        1
    ).otherwise(0)
)

# Create groups of consecutive statuses
w_cumsum = Window.partitionBy('host').orderBy('updated_time')
df_with_groups = df_with_changes.withColumn(
    'group_id',
    sum('status_changed').over(w_cumsum)
)

# Find sequences of 'down' status and their durations
down_sequences = df_with_groups.filter(col('status') == 'down') \
    .groupBy('host', 'group_id') \
    .agg(
        first('updated_time').alias('start_time'),
        last('updated_time').alias('end_time'),
        count('*').alias('duration')
    )

# Get the sequence with maximum duration
max_downtime = down_sequences.orderBy(col('duration').desc()).limit(1) \
    .select('host', 'start_time', 'end_time')

max_downtime.show(truncate=False)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number, count, min, max

# Create a window partitioned by host and ordered by time
w = Window.partitionBy('host').orderBy('updated_time')

# Get longest continuous down period in one step
max_downtime = test_df.filter(col('status') == 'down') \
    .withColumn('group_time', 
                col('updated_time') - row_number().over(w)) \
    .groupBy('host', 'group_time') \
    .agg(
        min('updated_time').alias('start_time'),
        max('updated_time').alias('end_time'),
        count('*').alias('duration')
    ) \
    .orderBy(col('duration').desc()) \
    .limit(1) \
    .select('host', 'start_time', 'end_time')

max_downtime.show(truncate=False)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col, when, count, first, last, sum

# Single window spec since both use the same partitioning/ordering
w = Window.partitionBy('host').orderBy('updated_time')

max_downtime = test_df \
    .withColumn('group_id', 
                sum(when(lag('status').over(w) != col('status'), 1).otherwise(0))
                .over(w)) \
    .filter(col('status') == 'down') \
    .groupBy('host', 'group_id') \
    .agg(
        first('updated_time').alias('start_time'),
        last('updated_time').alias('end_time')
    ) \
    .orderBy(count('*').over(Window.partitionBy('host', 'group_id')).desc()) \
    .limit(1)

max_downtime.show(truncate=False)

In [0]:
input_file_name = '/FileStore/tables/file5.json'

file_name = input_file_name.split('/')[-1]

print(file_name)