In [None]:
activity_df = spark.sql("""
SELECT 
    symbol,
    date_format(open_timestamp, 'EEEE') as day_of_week,
    date_format(open_timestamp, 'HH') as hour_of_day,
    avg(number_of_trade) as avg_trades,
    avg(volume) as avg_volume
FROM spot_data
GROUP BY 
    symbol, 
    date_format(open_timestamp, 'EEEE'),
    date_format(open_timestamp, 'HH')
ORDER BY avg_volume DESC
""")

print("Top 10 most active trading periods:")
activity_df.show(10)
activity_df.count()

In [None]:



#########################################

daily_vol = spark.sql("""
WITH daily_prices AS (
    SELECT 
        symbol,
        date_format(open_timestamp, 'yyyy-MM-dd') as trade_date,
        max(high_price) as daily_high,
        min(low_price) as daily_low,
        first(open_price) as daily_open,
        last(close_price) as daily_close,
        avg(close_price) as avg_price
    FROM spot_data
    GROUP BY symbol, date_format(open_timestamp, 'yyyy-MM-dd')
)
SELECT 
    symbol,
    trade_date,
    ((daily_high - daily_low) / avg_price) * 100 as price_range_volatility,
    ((daily_high - daily_low) / daily_low) * 100 as hl_volatility,
    abs((daily_close - daily_open) / daily_open) * 100 as oc_volatility
FROM daily_prices
ORDER BY trade_date
""")

print("Daily Volatility Analysis:")
daily_vol.show(10)
daily_vol.count()

In [None]:
intraday_vol = spark.sql("""
SELECT 
    symbol,
    date_format(open_timestamp, 'yyyy-MM-dd') as trade_date,
    date_format(open_timestamp, 'HH') as hour_of_day,
    ((max(high_price) - min(low_price)) / avg(close_price)) * 100 as hourly_volatility,
    avg(number_of_trade) as avg_trades,  -- Changed from number_of_trades to number_of_trade
    avg(volume) as avg_volume
FROM spot_data
GROUP BY 
    symbol,
    date_format(open_timestamp, 'yyyy-MM-dd'),
    date_format(open_timestamp, 'HH')
ORDER BY trade_date, hour_of_day
""")

print("Intraday Volatility Analysis:")
intraday_vol.show(10)
intraday_vol.count()