In [168]:
import duckdb

con = duckdb.connect(database=':memory:', read_only=False)

In [169]:
query = "SELECT * exclude(month, year, site) FROM parquet_scan('../../../dados_lumet/site=CS130/**/*.parquet', hive_partitioning = true) WHERE year=2023 AND month=12 AND TIMESTAMP>'2023-12-01 12:00:00' AND TIMESTAMP<='2023-12-01 13:00:00' order by TIMESTAMP;"
result = con.execute(query)

df_hf = result.pl().sort("TIMESTAMP").upsample(time_column="TIMESTAMP", every="100ms", maintain_order=True)

In [170]:
df_hf = df_hf.with_columns((df_hf["TS"] + 273.15).alias("TS"))
df_hf = df_hf.with_columns((df_hf["Temp_LI7500A"] + 273.15).alias("Temp_LI7500A"))
df_hf = df_hf.with_columns((df_hf["Pressao_LI7500A"] * 1000).alias("Pressao_LI7500A"))
df_hf = df_hf.with_columns((df_hf["Temp_LI7700"] + 273.15).alias("Temp_LI7700"))
df_hf = df_hf.with_columns((df_hf["Pressao_LI7700"] * 1000).alias("Pressao_LI7700"))

df_hf = df_hf.rename(lambda column_name: column_name.replace("_", "-"))

In [171]:
query = "SELECT * exclude(year, site) FROM parquet_scan('../../../dados_lumet/site=CS131/**/*.parquet', hive_partitioning = true) WHERE TIMESTAMP>'2023-12-01 12:00:00' AND TIMESTAMP<='2023-12-01 13:00:00' order by TIMESTAMP;"
result = con.execute(query)

df_lf = result.pl().sort("TIMESTAMP").upsample(time_column="TIMESTAMP", every="1s", maintain_order=True)

In [172]:
from pyeddy.config_def import Anemometer3DConfig, GasAnalyzer

anemometer3d = Anemometer3DConfig(
    manufacturer="csi",
    model="csat3_1",
    sw_version="ddd",
    id="ddd",
    height=3.0,
    wformat="uvw",
    wref="na",
    north_offset=90.0,
    northward_separation=0.0,
    eastward_separation=0.0,
    vertical_separation=0.0,
    vpath_length=0.115,
    hpath_length=0.058,
    tau=0.1 # 1/60
)

co2_analyzer = GasAnalyzer(
    manufacturer="licor",
    model="li7500a_1",
    sw_version="ddd",
    id="ddd",
    tube_length=0.0,
    tube_diameter=0.0,
    tube_flowrate=0.0,
    northward_separation=20.0,
    eastward_separation=-10.0,
    vertical_separation=0.0,
    vpath_length=1.0,
    hpath_length=1.0,
    tau=0.1,
    kw=0.15,
    ko=0.0085
)

ch4_analyzer = GasAnalyzer(
    manufacturer="licor",
    model="li7700_2",
    sw_version="1.0.29",
    id="ddd",
    tube_length=0.0,
    tube_diameter=0.0,
    tube_flowrate=0.0,
    northward_separation=-20.0,
    eastward_separation=-10.0,
    vertical_separation=0.0,
    vpath_length=1.0,
    hpath_length=1.0,
    tau=0.1,
    kw=0.15,
    ko=0.0085
)

In [173]:
# alternative cov

# # Merge the dataframes on the TIMESTAMP column
# mean_df_ext = (
#     df_hf.get_column("TIMESTAMP")
#     .to_frame()
#     .join(mean_df, on="TIMESTAMP", how="left")
#     .fill_null(strategy="forward")
# )

# # In eddypro defined as Prime
# fluctuations = (
#     df_hf.get_column("TIMESTAMP")
#     .to_frame()
#     .with_columns(df_hf.select(cov_cols[1:]) - mean_df_ext.select(cov_cols[1:]))
# )

# cov_agg = [
#     (pl.col(v[0]) * pl.col(v[1])).mean().alias(f"cov_{v[0]}_{v[1]}")
#     for v in cov_combination
# ]

# # Calculate covariance of fluctuations over the original interval
# covariances = fluctuations.group_by_dynamic(
#     index_column="TIMESTAMP", every=resample_interval, closed="right"
# ).agg(cov_agg)

### Covariance BasicStats from fortran

In [174]:
import polars as pl
import numpy as np
from itertools import product

resample_interval = "30m"

cov_cols = ["TIMESTAMP", "u", "v", "w", "TS", "CO2", "H2O", "CH4"]

mean_df = (
    df_hf.select(cov_cols)
    .group_by_dynamic(
        "TIMESTAMP", every=resample_interval, closed="right", label="datapoint"
    )
    .agg(pl.all().exclude("TIMESTAMP").mean())
)

cov_combination = list(product(cov_cols[1:], cov_cols[1:]))

# Merge the dataframes on the TIMESTAMP column
mean_df_ext = (
    df_hf.get_column("TIMESTAMP")
    .to_frame()
    .join(mean_df, on="TIMESTAMP", how="left")
    .fill_null(strategy="forward")
)


mean_var_df = (
    df_hf.get_column("TIMESTAMP")
    .to_frame()
    .with_columns(
        (df_hf.select(cov_cols[1:])
        - mean_df_ext.select(cov_cols[1:]))
    ).group_by_dynamic(
        "TIMESTAMP", every=resample_interval, closed="right", label="datapoint"
    )
    .agg(pl.all().exclude("TIMESTAMP").sum()/(pl.all().exclude("TIMESTAMP").count() - pl.all().exclude("TIMESTAMP").null_count()))
)

mean_df = mean_var_df.select(cov_cols[1:]) + mean_df.select(cov_cols[1:])



# In eddypro defined as Prime
fluctuations = (
    df_hf.get_column("TIMESTAMP")
    .to_frame()
    .with_columns(df_hf.select(cov_cols[1:]) - mean_df_ext.select(cov_cols[1:]))
)

cov_agg = [
    (pl.col(v[0]) * pl.col(v[1])).mean().alias(f"cov_{v[0]}_{v[1]}")
    for v in cov_combination
]

# Calculate covariance of fluctuations over the original interval
covariances = fluctuations.group_by_dynamic(
    index_column="TIMESTAMP", every=resample_interval, closed="right"
).agg(cov_agg)

# # Faster but less precise
# cov_agg = [pl.cov(v[0], v[1]).alias(f"cov_{v[0]}_{v[1]}") for v in cov_combination]
# covariances = df_hf.select(cov_cols).group_by_dynamic("TIMESTAMP", every=resample_interval, closed="right").agg(cov_agg)

skewness = (
    df_hf.select(cov_cols)
    .group_by_dynamic(index_column="TIMESTAMP", every=resample_interval, closed="right")
    .agg(pl.all().exclude("TIMESTAMP").skew())
)

kurtosis = (
    df_hf.select(cov_cols)
    .group_by_dynamic(index_column="TIMESTAMP", every=resample_interval, closed="right")
    .agg(pl.all().exclude("TIMESTAMP").kurtosis(fisher=False))
)


# TKE (e.g. Stull, 1988)
TKE = df_hf.select(["TIMESTAMP", "u", "v", "w"]).group_by_dynamic(
    index_column="TIMESTAMP", every=resample_interval, closed="right"
).agg(
    pl.all().exclude("TIMESTAMP").pow(2).sum()
    / pl.all().exclude("TIMESTAMP").is_not_null().sum()
).with_columns(
    ((pl.col("u") + pl.col("v") + pl.col("w"))*0.5).alias("TKE")
).select(["TIMESTAMP", "TKE"])


offset = anemometer3d.north_offset - 180.0
wind_dir = (
    df_hf.select(["TIMESTAMP", "u", "v"])
    .with_columns(
        (180 - pl.arctan2d(pl.col("v"), pl.col("u")) + offset)
        .mod(360)
        .alias("wind_dir")
    )
    .select(["TIMESTAMP", "wind_dir"])
)


wind_dir_mean = wind_dir.group_by_dynamic(
    "TIMESTAMP", every=resample_interval, closed="right", label="datapoint"
).agg(
    pl.arctan2d(
        pl.col("wind_dir").radians().sin().mean().degrees(),
        pl.col("wind_dir").radians().cos().mean().degrees(),
    )
)

# Approximate standard deviation of wind direction (see Yamartino, 1984:
# https://journals.ametsoc.org/doi/pdf/10.1175/1520-0450%281984%29023%3C1362%3AACOSPE%3E2.0.CO%3B2)

wind_dir_std_dev = wind_dir.group_by_dynamic(
    "TIMESTAMP", every=resample_interval, closed="right", label="datapoint"
).agg(
    (
        1
        - (
            pl.col("wind_dir").radians().sin().mean().pow(2)
            + pl.col("wind_dir").radians().cos().mean().pow(2)
        )
    )
    .sqrt()
    .alias("eps")
).with_columns(
    (pl.col("eps").arcsin()
    * (1 + (2 / np.sqrt(3) - 1) * pl.col("eps").pow(3))).degrees().alias("std_dev")
).select(["TIMESTAMP", "std_dev"])

In [175]:
mean_df

u,v,w,TS,CO2,H2O,CH4
f64,f64,f64,f64,f64,f64,f64
1.063175,1.993309,0.062974,306.377361,16.28678,1201.593074,0.06722
1.437255,1.723983,0.065025,307.205385,16.268556,1210.264405,0.066968


### User basic stats

In [176]:
user_data_cols = [
    "TIMESTAMP",
    "Temp-LI7500A",
    "Pressao-LI7500A",
    "CH4-mole-fraction",
    "Temp-LI7700",
    "Pressao-LI7700",
]

user_data = df_hf.select(user_data_cols)

cov_combination = list(product(user_data_cols[1:], user_data_cols[1:]))


# Compute the mean
user_mean_df = (
    df_hf.select(user_data_cols)
    .group_by_dynamic(
        "TIMESTAMP", every=resample_interval, closed="right", label="datapoint"
    )
    .agg(pl.all().exclude("TIMESTAMP").mean())
)

cov_combination = list(product(user_data_cols[1:], user_data_cols[1:]))

# Merge the dataframes on the TIMESTAMP column
mean_df_ext = (
    df_hf.get_column("TIMESTAMP")
    .to_frame()
    .join(user_mean_df, on="TIMESTAMP", how="left")
    .fill_null(strategy="forward")
)

mean_var_df = (
    df_hf.get_column("TIMESTAMP")
    .to_frame()
    .with_columns(
        (df_hf.select(user_data_cols[1:])
        - mean_df_ext.select(user_data_cols[1:]))
    ).group_by_dynamic(
        "TIMESTAMP", every=resample_interval, closed="right", label="datapoint"
    )
    .agg(pl.all().exclude("TIMESTAMP").sum()/(pl.all().exclude("TIMESTAMP").count() - pl.all().exclude("TIMESTAMP").null_count()))
)

user_mean_df = mean_var_df.select(user_data_cols[1:]) + user_mean_df.select(user_data_cols[1:])

# In eddypro defined as Prime
user_fluctuations = (
    df_hf.get_column("TIMESTAMP")
    .to_frame()
    .with_columns(
        df_hf.select(user_data_cols[1:]) - mean_df_ext.select(user_data_cols[1:])
    )
)

cov_agg = [
    (pl.col(v[0]) * pl.col(v[1])).mean().alias(f"cov_{v[0]}_{v[1]}")
    for v in cov_combination
]

# Calculate covariance of fluctuations over the original interval
user_covariances = user_fluctuations.group_by_dynamic(
    index_column="TIMESTAMP", every=resample_interval, closed="right"
).agg(cov_agg)


std_cols = [f"cov_{c}_{c}" for c in user_data_cols[1:]]
std_agg = [pl.col(v).sqrt().alias(v.split("_")[-1]) for v in std_cols]

user_std_dev = user_covariances.with_columns(std_agg).select(user_data_cols)

user_skewness = (
    df_hf.select(user_data_cols)
    .group_by_dynamic(index_column="TIMESTAMP", every=resample_interval, closed="right")
    .agg(pl.all().exclude("TIMESTAMP").skew())
)

user_kurtosis = (
    df_hf.select(user_data_cols)
    .group_by_dynamic(index_column="TIMESTAMP", every=resample_interval, closed="right")
    .agg(pl.all().exclude("TIMESTAMP").kurtosis(fisher=False))
)

In [178]:
# Replace temperature with user_mean

mean_df.filter((pl.col("TS") < 220) | (pl.col("TS") > 340))

u,v,w,TS,CO2,H2O,CH4
f64,f64,f64,f64,f64,f64,f64
1.063175,1.993309,0.062974,306.377361,16.28678,1201.593074,0.06722
1.437255,1.723983,0.065025,307.205385,16.268556,1210.264405,0.066968


### StatisticalScreening

In [180]:
df_hf.select(cov_cols)

TIMESTAMP,u,v,w,TS,CO2,H2O,CH4
datetime[μs],f32,f32,f32,f32,f32,f32,f32
2023-12-01 12:00:00.100,0.61125,1.57325,0.40625,306.981934,16.338131,1206.360962,0.06759
2023-12-01 12:00:00.200,0.989,1.75575,0.202,306.326752,16.341631,1206.800049,0.06743
2023-12-01 12:00:00.300,1.0555,1.89975,-0.094,305.958435,16.35182,1206.004028,0.067372
2023-12-01 12:00:00.400,0.83975,2.026,0.0265,305.965393,16.325371,1208.625977,0.067487
2023-12-01 12:00:00.500,1.11725,1.8765,-0.1125,306.056152,16.31052,1205.051025,0.067512
…,…,…,…,…,…,…,…
2023-12-01 12:59:59.600,1.50225,1.74275,0.7265,309.244537,16.13073,1264.366943,0.065951
2023-12-01 12:59:59.700,1.281,2.134,0.2055,309.226959,16.182671,1249.436035,0.066012
2023-12-01 12:59:59.800,1.65275,2.249,0.581,309.083099,16.16498,1255.180054,0.065895
2023-12-01 12:59:59.900,1.21425,2.186,0.58625,309.333984,16.16173,1257.842041,0.066062


In [None]:


# Calculate fluctuations
fluctuations = fluctuations.with_columns([
    (pl.col("w") - pl.col("mean_w")).alias("fluc_w"),
    (pl.col("CO2") - pl.col("mean_CO2")).alias("fluc_CO2"),
    (pl.col("H2O") - pl.col("mean_H2O")).alias("fluc_H2O")
])

# Calculate covariance of fluctuations over the original interval
covariances = fluctuations.groupby_dynamic(
    index_column="datetime",
    every=resample_interval,
    period="backward",
    offset="0ns",  # Adjust as needed
    closed="left"
).agg([
    (pl.col("fluc_w") * pl.col("fluc_CO2")).mean().alias("cov_w_CO2"),
    (pl.col("fluc_w") * pl.col("fluc_H2O")).mean().alias("cov_w_H2O")
])


In [None]:
import polars as pl

interval = "30m"

average_df = df.group_by_dynamic("TIMESTAMP", every=interval).agg(pl.all().exclude("TIMESTAMP").mean())

ts_col = df.get_column("TIMESTAMP")

fluctuations = df-average_df.upsample(time_column="TIMESTAMP", every="100ms", maintain_order=True).select(pl.all().forward_fill())
fluctuations = fluctuations.replace_column(0, df.get_column("TIMESTAMP"))

cov_w_CO2 = (fluctuations['w'] * fluctuations['CO2']).to_frame().with_columns(ts_col).group_by_dynamic("TIMESTAMP", every=interval).agg(pl.all().exclude("TIMESTAMP").mean())
cov_w_H2O = (fluctuations['w'] * fluctuations['H2O']).to_frame().with_columns(ts_col).group_by_dynamic("TIMESTAMP", every=interval).agg(pl.all().exclude("TIMESTAMP").mean())

In [None]:
import polars as pl

# Define your resampling interval (e.g., '5m' for 5 minutes)
resample_interval = '5m'  # Change '5m' to your desired interval

# Group by interval and calculate means
resampled_df = df.groupby_dynamic(
    index_column="datetime",
    every=resample_interval,
    period="backward",
    offset="0ns",  # Adjust as needed
    closed="left"
).agg([
    pl.col("w").mean().alias("mean_w"),
    pl.col("CO2").mean().alias("mean_CO2"),
    pl.col("H2O").mean().alias("mean_H2O")
])

# Calculate fluctuations based on mean
# This requires joining back the means to the original data, or you can calculate directly if only means are needed.
fluctuations = df.join(
    resampled_df,
    on=pl.col("datetime").cast(pl.Date).dt.truncate(resample_interval),
    how="left"
)

# Calculate fluctuations
fluctuations = fluctuations.with_columns([
    (pl.col("w") - pl.col("mean_w")).alias("fluc_w"),
    (pl.col("CO2") - pl.col("mean_CO2")).alias("fluc_CO2"),
    (pl.col("H2O") - pl.col("mean_H2O")).alias("fluc_H2O")
])

# Calculate covariance of fluctuations over the original interval
covariances = fluctuations.groupby_dynamic(
    index_column="datetime",
    every=resample_interval,
    period="backward",
    offset="0ns",  # Adjust as needed
    closed="left"
).agg([
    (pl.col("fluc_w") * pl.col("fluc_CO2")).mean().alias("cov_w_CO2"),
    (pl.col("fluc_w") * pl.col("fluc_H2O")).mean().alias("cov_w_H2O")
])

print(covariances)

In [None]:
from pyeddy.filters import FluxDataFrame

df.flux.config("CS_130.metadata")

In [None]:
import re

def read_metadata_block(metadata, block):
    block_data = re.search(fr'\[{block}\]\n(.*?)(?=\n\n|\Z)', metadata, re.DOTALL)
    if not block_data:
        return None
    rows = block_data[0].split('\n')[1:]
    output = {}
    for r in rows:
        field = r.split("=")
        if len(field) == 2:
            output[field[0]] = field[1]
        elif len(field) == 1:
            output[field[0]] = None
        else:
            raise ValueError(f"Invalid field: {r}")
    return output

with open("CS_130.metadata") as f:
    metadata = f.read()

read_metadata_block(metadata, "Site")

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def potential_radiation(latit):
    """
    Calculate potential radiation based on latitude on a 30-minute basis.
    Returns an array of 17,568 radiation values.
    """
    # Constants
    solar_constant = 1376.0
    p = np.pi
    num_points = 17568
    RP = np.zeros(num_points)  # Output array for potential radiation

    # Calculate day of year (DOY), theta, and other parameters
    lDOY = np.arange(1, num_points + 1) // 48 + 1
    theta = 2 * p * (lDOY - 1) / 365

    ET = (0.000075 + 0.001868 * np.cos(theta) - 0.032077 * np.sin(theta) -
          0.014615 * np.cos(2 * theta) - 0.040849 * np.sin(2 * theta))

    delta = (0.006918 - 0.399912 * np.cos(theta) + 0.070257 * np.sin(theta) -
             0.006758 * np.cos(2 * theta) + 0.000907 * np.sin(2 * theta) -
             0.002697 * np.cos(3 * theta) + 0.00148 * np.sin(3 * theta))

    LAS = 12 - (np.mod(np.arange(1, num_points + 1), 48) / 2 + 0.25)
    LAS = np.abs(LAS)

    omega = -15 * LAS
    lat_rad = latit * p / 180

    omega_rad = omega * p / 180
    theta_rad = np.arccos(
        np.sin(delta) * np.sin(lat_rad) +
        np.cos(delta) * np.cos(lat_rad) * np.cos(omega_rad)
    )

    rpot = (solar_constant * (1.00011 + 0.034221 * np.cos(theta) +
                              0.00128 * np.sin(theta) + 0.000719 * np.cos(2 * theta) +
                              0.000077 * np.sin(2 * theta)))

    RP = rpot * np.cos(theta_rad)
    RP = np.maximum(RP, 0)

    return RP

plt.plot(potential_radiation(-30))

In [None]:
from datetime import datetime
import numpy as np
import pandas as pd

def potential_radiation(latit, dt, flux_window=30):
    """
    Calculate potential radiation based on latitude on a 30-minute basis.
    Returns an array of 17,568 radiation values.
    """
    # Constants
    solar_constant = 1376.0

    # Calculate day of year (DOY), theta, and other parameters
    # lDOY = np.arange(1, num_points + 1) // 48 + 1
    lDOY = dt.timetuple().tm_yday
    theta = 2 * np.pi * (lDOY - 1) / 365

    delta = (0.006918 - 0.399912 * np.cos(theta) + 0.070257 * np.sin(theta) -
             0.006758 * np.cos(2 * theta) + 0.000907 * np.sin(2 * theta) -
             0.002697 * np.cos(3 * theta) + 0.00148 * np.sin(3 * theta))

    LAS = 12 - dt.timetuple().tm_hour + dt.timetuple().tm_min/60 + flux_window/2/60 

    omega = -15 * LAS
    lat_rad = np.radians(latit)

    omega_rad = np.radians(omega)
    theta_rad = np.arccos(
        np.sin(delta) * np.sin(lat_rad) +
        np.cos(delta) * np.cos(lat_rad) * np.cos(omega_rad)
    )

    rpot = (solar_constant * (1.00011 + 0.034221 * np.cos(theta) +
                              0.00128 * np.sin(theta) + 0.000719 * np.cos(2 * theta) +
                              0.000077 * np.sin(2 * theta)))

    pot_rad = rpot * np.cos(theta_rad)
    pot_rad = np.maximum(pot_rad, 0)

    return pot_rad

periods = pd.date_range(start='2022-01-01', periods=48, freq='30T')

periods.to_series().apply(lambda x: potential_radiation(-30, x))