In [1]:
import pandas as pd
from pyhive import hive
import numpy as np
from datetime import datetime, timedelta

In [2]:
conn = hive.connect(
    host='hsrv01.pic.es',
    port='10000',
    database='cosmohub',
    auth='KERBEROS',
    kerberos_service_name='hive',
)

In [3]:
cursor = conn.cursor()

In [4]:
sql = """
USE msantama
"""

In [5]:
cursor.execute(sql)

In [6]:
loop_start_date = datetime(2021, 1, 1)
loop_end_date = datetime(2023, 12, 31)

In [7]:
jj = 0

current_m_date = loop_start_date + timedelta(days=65)
while True:
    if jj < 126:
        jj += 1
        current_m_date += timedelta(days=1)
        continue
    if jj > 25*5 + 13*5 + 5:
        break
    
    s_date = current_m_date - timedelta(days=65)
    f_date1 = current_m_date + timedelta(days=5)
    f_date2 = current_m_date + timedelta(days=1)
    # f_date3 = current_m_date + timedelta(days=30)
    #f_date4 = current_m_date + timedelta(days=28)

    # Ensure s_date >= loop_start_date and f_date <= loop_end_date
    if ((s_date < loop_start_date or f_date1 > loop_end_date)):
        break

    # Convert dates to Unix timestamps (integer seconds)
    s = s_date.strftime('%Y-%m-%d %H:%M:%S')
    m = current_m_date.strftime('%Y-%m-%d %H:%M:%S')
    f1 = f_date1.strftime('%Y-%m-%d %H:%M:%S')
    f2 = f_date2.strftime('%Y-%m-%d %H:%M:%S')
    # f3 = f_date3.strftime('%Y-%m-%d %H:%M:%S')
    #f4 = f_date4.strftime('%Y-%m-%d %H:%M:%S')

    print(f"Processing window: s={s}, m={m}, f1={f1}")

    sql = """
    DROP TABLE IF EXISTS alive_pnfsids
    """
    cursor.execute(sql)
    
    sql = """
    DROP TABLE IF EXISTS batch_study_table
    """
    cursor.execute(sql)
    
    sql = """
    DROP TABLE IF EXISTS pnfsid_properties
    """
    cursor.execute(sql)
    
    sql = """
    DROP TABLE IF EXISTS subtype_properties
    """
    cursor.execute(sql)
    
    sql = """
    DROP TABLE IF EXISTS y_table1
    """
    cursor.execute(sql)

    sql = """
    DROP TABLE IF EXISTS y_table2
    """
    cursor.execute(sql)

    # sql = """
    # DROP TABLE IF EXISTS y_table3
    # """
    # cursor.execute(sql)

    # sql = """
    # DROP TABLE IF EXISTS y_table4
    # """
    # cursor.execute(sql)    

    sql = """
    DROP TABLE IF EXISTS pre_x_table
    """
    cursor.execute(sql) 
    
    sql = """
    DROP TABLE IF EXISTS x_table
    """
    cursor.execute(sql) 
    
    sql = f"""
    CREATE TEMPORARY TABLE alive_pnfsids AS (
    SELECT DISTINCT pnfsid
    FROM study
    WHERE access_date BETWEEN '{s}' AND '{m}'
      AND action != 'remove'
    )
    """
    cursor.execute(sql)

    sql = f"""
    CREATE TEMPORARY TABLE batch_study_table AS (
    SELECT 
        s.pnfsid,
        s.access_date,
        s.type,
        s.subtype,
        s.fullsize,
        s.read_data,
        s.creation_date,
        s.access_count +1 AS access_count
    FROM study s
    JOIN alive_pnfsids a ON s.pnfsid = a.pnfsid
    WHERE s.access_date BETWEEN '{s}' AND '{m}')
    """ 
    cursor.execute(sql)

    sql = """
    CREATE TEMPORARY TABLE pnfsid_properties AS (
    SELECT 
        pnfsid, 
        MAX(access_count) AS max_access_count_pnfsid,
        STDDEV(UNIX_TIMESTAMP(access_date)) AS stddev_access_date
    FROM batch_study_table
    GROUP BY pnfsid)
    """
    cursor.execute(sql)

    sql = """
    CREATE TEMPORARY TABLE subtype_properties AS (
    SELECT
        subtype,
        MAX(access_count) AS max_access_count_subtype,
        MIN(access_count) AS min_access_count_subtype,
        PERCENTILE_APPROX(access_count, 0.25) AS 25_percentile_access_count_subtype,
        PERCENTILE_APPROX(access_count, 0.5) AS median_access_count_subtype,
        PERCENTILE_APPROX(access_count, 0.75) AS 75_percentile_access_count_subtype,
        STDDEV(access_count) AS stddev_access_count_subtype,
        MAX(fullsize) AS max_filesize_subtype,
        MIN(fullsize) AS min_filesize_subtype,
        PERCENTILE_APPROX(fullsize, 0.25) AS 25_percentile_filesize_subtype,
        PERCENTILE_APPROX(fullsize, 0.5) AS median_filesize_subtype,
        PERCENTILE_APPROX(fullsize, 0.75) AS 75_percentile_filesize_subtype
    FROM batch_study_table
    GROUP BY subtype
    )
    """
    cursor.execute(sql)

    sql = f"""
    CREATE TEMPORARY TABLE y_table1 AS (
    SELECT
        s.pnfsid,
        MAX(CASE WHEN s.action != 'remove' THEN 1 ELSE 0 END) AS y
    FROM study s
    JOIN alive_pnfsids a ON s.pnfsid = a.pnfsid
    WHERE s.access_date BETWEEN '{m}' AND '{f1}'
    GROUP BY s.pnfsid
    )
    """
    cursor.execute(sql)

    sql = f"""
    CREATE TEMPORARY TABLE y_table2 AS (
    SELECT
        s.pnfsid,
        MAX(CASE WHEN s.action != 'remove' THEN 1 ELSE 0 END) AS y
    FROM study s
    JOIN alive_pnfsids a ON s.pnfsid = a.pnfsid
    WHERE s.access_date BETWEEN '{m}' AND '{f2}'
    GROUP BY s.pnfsid
    )
    """
    cursor.execute(sql)

    # sql = f"""
    # CREATE TEMPORARY TABLE y_table3 STORED AS ORC AS (
    # SELECT
    #     s.pnfsid,
    #     MAX(CASE WHEN s.action != 'remove' THEN 1 ELSE 0 END) AS y
    # FROM study s
    # JOIN alive_pnfsids a ON s.pnfsid = a.pnfsid
    # WHERE s.access_date BETWEEN '{m}' AND '{f3}'
    # GROUP BY s.pnfsid
    # )
    # """
    # cursor.execute(sql)

    # sql = f"""
    # CREATE TEMPORARY TABLE y_table4 STORED AS ORC AS (
    # SELECT
    #     s.pnfsid,
    #     MAX(CASE WHEN s.action != 'remove' THEN 1 ELSE 0 END) AS y
    # FROM study s
    # JOIN alive_pnfsids a ON s.pnfsid = a.pnfsid
    # WHERE s.access_date BETWEEN '{m}' AND '{f4}'
    # GROUP BY s.pnfsid
    # )
    # """
    # cursor.execute(sql)

    sql = f"""
    CREATE TEMPORARY TABLE pre_x_table AS (
    SELECT
        b.pnfsid,
        b.type,
        b.subtype,
        p.stddev_access_date,
        UNIX_TIMESTAMP('{m}') - UNIX_TIMESTAMP(CASE WHEN b.access_count = p.max_access_count_pnfsid THEN b.access_date END) AS dt_last_access_date,
        UNIX_TIMESTAMP('{m}') - UNIX_TIMESTAMP(CASE WHEN b.access_count = p.max_access_count_pnfsid - 1 THEN b.access_date END) AS dt_second_last_access_date,
        UNIX_TIMESTAMP('{m}') - UNIX_TIMESTAMP(CASE WHEN b.access_count = p.max_access_count_pnfsid - 2 THEN b.access_date END) AS dt_third_last_access_date,
        UNIX_TIMESTAMP('{m}') - UNIX_TIMESTAMP(CASE WHEN b.access_count = p.max_access_count_pnfsid - 3 THEN b.access_date END) AS dt_fourth_last_access_date,
        UNIX_TIMESTAMP('{m}') - UNIX_TIMESTAMP(CASE WHEN b.access_count = p.max_access_count_pnfsid - 4 THEN b.access_date END) AS dt_fifth_last_access_date,
        
        (b.access_count - s.min_access_count_subtype) / (s.max_access_count_subtype - s.min_access_count_subtype) AS normalized_access_count,
        (b.fullsize - s.min_filesize_subtype) / (s.max_filesize_subtype - s.min_filesize_subtype) AS normalized_filesize,

        CASE
            WHEN p.max_access_count_pnfsid < s.25_percentile_access_count_subtype THEN 'cold'
            WHEN p.max_access_count_pnfsid >= s.25_percentile_access_count_subtype AND p.max_access_count_pnfsid < s.median_access_count_subtype THEN 'cold-warm'
            WHEN p.max_access_count_pnfsid >= s.median_access_count_subtype AND p.max_access_count_pnfsid < s.75_percentile_access_count_subtype THEN 'warm-hot'
            WHEN p.max_access_count_pnfsid >= s.75_percentile_access_count_subtype THEN 'hot'
        END AS temperature,

        CASE
            WHEN b.fullsize < s.25_percentile_filesize_subtype THEN 'small'
            WHEN b.fullsize >= s.25_percentile_filesize_subtype AND b.fullsize < s.median_filesize_subtype THEN 'medium'
            WHEN b.fullsize >= s.median_filesize_subtype AND b.fullsize < s.75_percentile_filesize_subtype THEN 'large'
            WHEN b.fullsize >= s.75_percentile_filesize_subtype THEN 'xlarge'
        END AS size_category,

        CASE WHEN (b.access_count > s.median_access_count_subtype) THEN 1 ELSE 0 END AS above_median_access_count,
        CASE WHEN (b.fullsize > s.median_filesize_subtype) THEN 1 ELSE 0 END AS above_median_filesize,
        
        (CASE WHEN (b.access_date > TIMESTAMP('{m}') - INTERVAL 1 DAY) THEN 1 ELSE 0 END) AS access_count_last_1_day,
        (CASE WHEN (b.access_date > TIMESTAMP('{m}') - INTERVAL 3 DAY) THEN 1 ELSE 0 END) AS access_count_last_3_days,
        (CASE WHEN (b.access_date > TIMESTAMP('{m}') - INTERVAL 7 DAY) THEN 1 ELSE 0 END) AS access_count_last_7_days,
        (CASE WHEN (b.access_date > TIMESTAMP('{m}') - INTERVAL 15 DAY) THEN 1 ELSE 0 END) AS access_count_last_15_days,

        UNIX_TIMESTAMP('{m}') - UNIX_TIMESTAMP(b.creation_date) AS lifetime,
        b.access_count,
        b.read_data


    FROM batch_study_table b
    JOIN pnfsid_properties p
    ON b.pnfsid=p.pnfsid
    JOIN subtype_properties s
    ON b.subtype=s.subtype)
    """
    cursor.execute(sql)

    sql = """
    CREATE TEMPORARY TABLE x_table AS (
    SELECT
        pnfsid, 
        MAX(type) AS type,
        MAX(subtype) AS subtype,
        MAX(stddev_access_date) AS stddev_access_date,
        MAX(dt_last_access_date) AS dt_last_access_date,
        MAX(dt_second_last_access_date) AS dt_second_last_access_date,
        MAX(dt_third_last_access_date) AS dt_third_last_access_date,
        MAX(dt_fourth_last_access_date) AS dt_fourth_last_access_date,
        MAX(dt_fifth_last_access_date) AS dt_fifth_last_access_date,
        MAX(normalized_access_count) AS normalized_access_count,
        MAX(normalized_filesize) AS normalized_filesize,
        MAX(temperature) AS temperature,
        MAX(size_category) AS size_category,
        MAX(above_median_access_count) AS above_median_access_count,
        MAX(above_median_filesize) AS above_median_filesize,
        SUM(access_count_last_1_day) AS access_count_last_1_day,
        SUM(access_count_last_3_days) AS access_count_last_3_days,
        SUM(access_count_last_7_days) AS access_count_last_7_days,
        SUM(access_count_last_15_days) AS access_count_last_15_days,
        MAX(lifetime) AS lifetime,
        MAX(access_count) AS access_count,
        SUM(read_data)/MAX(lifetime) AS read_data_per_second
    FROM pre_x_table
    GROUP BY pnfsid
    )
    """
    cursor.execute(sql)

    sql = """
    SELECT
        x.*,
        COALESCE(y_table1.y, 0) AS y1,
        COALESCE(y_table2.y, 0) AS y2
    FROM x_table x
    LEFT JOIN y_table1 
        ON x.pnfsid = y_table1.pnfsid
    LEFT JOIN y_table2 
        ON x.pnfsid = y_table2.pnfsid
    """
    cursor.execute(sql)
    names = [d[0] for d in cursor.description]
    current_df = pd.DataFrame(cursor.fetchall(), columns=names)
    
    # Add the m_date to the dataframe to know which window this data belongs to
    current_df['m_date_window'] = current_m_date.strftime('%Y-%m-%d')
    current_df['period'] = jj


    current_df.to_csv(f'/data/astro/scratch/msantama/tfm/Query2/data_{jj}.csv', mode='w', header=True, index=False)

    # Move to the next m_date
    current_m_date += timedelta(days=1)

    jj += 1

Processing window: s=2021-05-07 00:00:00, m=2021-07-11 00:00:00, f1=2021-07-16 00:00:00
Processing window: s=2021-05-08 00:00:00, m=2021-07-12 00:00:00, f1=2021-07-17 00:00:00
Processing window: s=2021-05-09 00:00:00, m=2021-07-13 00:00:00, f1=2021-07-18 00:00:00
Processing window: s=2021-05-10 00:00:00, m=2021-07-14 00:00:00, f1=2021-07-19 00:00:00
Processing window: s=2021-05-11 00:00:00, m=2021-07-15 00:00:00, f1=2021-07-20 00:00:00
Processing window: s=2021-05-12 00:00:00, m=2021-07-16 00:00:00, f1=2021-07-21 00:00:00
Processing window: s=2021-05-13 00:00:00, m=2021-07-17 00:00:00, f1=2021-07-22 00:00:00
Processing window: s=2021-05-14 00:00:00, m=2021-07-18 00:00:00, f1=2021-07-23 00:00:00
Processing window: s=2021-05-15 00:00:00, m=2021-07-19 00:00:00, f1=2021-07-24 00:00:00
Processing window: s=2021-05-16 00:00:00, m=2021-07-20 00:00:00, f1=2021-07-25 00:00:00
Processing window: s=2021-05-17 00:00:00, m=2021-07-21 00:00:00, f1=2021-07-26 00:00:00
Processing window: s=2021-05-18 

In [8]:
conn.close()

In [9]:
print("Finnished")

Finnished
