In [1]:
from datetime import datetime
from sqlalchemy import create_engine
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd

In [2]:
# Load the credentials
def load_credentials(file_path):
    credentials = {}
    with open(file_path, "r") as file:
        for line in file:
            line = line.strip()  # Remove leading/trailing whitespaces
            # Skip empty lines or lines that don't contain an "="
            if not line or "=" not in line:
                continue
            key, value = line.split("=", 1)  # Split only at the first "="
            credentials[key] = value
    return credentials

credentials_file = ".env"  # Update with the correct file path
credentials = load_credentials(credentials_file)

In [3]:
token = credentials["TOKEN"]
org = credentials["ORG"]
bucket = credentials["BUCKET"]
url = credentials["URL"]

In [4]:
# Initialize a client instance
client = InfluxDBClient(
    url=url,
    token=token,
    org=org,
    timeout=30000
)

In [5]:
station_ids = ["034062022", "035062022", "084072023"]

In [6]:
def format_query(station_id, field, agg_fn, yield_name):
    return f'''
        from(bucket: "oxus2")
        |> range(start: -inf)
        |> filter(fn: (r) => r["stationID"] == "{station_id}")
        |> filter(fn: (r) => r["_field"] == "{field}")
        |> aggregateWindow(every: 1d, fn: {agg_fn}, createEmpty: false)
        |> yield(name: "{yield_name}")
    '''

# Define the aggregation functions, fields, and corresponding yield names
agg_fns = [
    ("AirT", "min", "daily_min_temp"),
    ("AirT", "max", "daily_max_temp"),
    ("AirH", "mean", "daily_avg_hum"),
    ("AirT", "mean", "daily_avg_temp")
]

# Initialize an empty dictionary to store the queries
station_queries = {}

# Loop through each station_id and aggregation function
for station_id in station_ids:
    queries = []
    for field, agg_fn, yield_name in agg_fns:
        query = format_query(station_id, field, agg_fn, yield_name)
        queries.append(query)
    station_queries[station_id] = queries

In [7]:
# Initialize an empty dictionary to store the combined DataFrames for each station
station_data = {}

# Loop through each station_id and aggregation function
for station_id in station_ids:
    # Initialize an empty list to store data for each query
    all_query_data = []
    
    for field, agg_fn, yield_name in agg_fns:
        # Format the query for this station and aggregation
        query = format_query(station_id, field, agg_fn, yield_name)
        
        # Execute the query
        tables = client.query_api().query(query)
        
        # Parse the query results into a list of dictionaries
        query_results = []
        for table in tables:
            for record in table.records:
                query_results.append({
                    "time": record.get_time(),
                    yield_name: record.get_value()
                })
        
        # Convert query results to a DataFrame
        df = pd.DataFrame(query_results).set_index("time")
        df = df[~df.index.duplicated(keep='first')]
        df = df.round(2)
        all_query_data.append(df)
    
    # Concatenate all the DataFrames for this station into one table
    combined_df = pd.concat(all_query_data, axis=1)
    combined_df['week'] = ((combined_df.index.dayofyear - 1) // 7 + 1).round(0)
    combined_df.insert(0, 'week', combined_df.pop('week'))
    
    # Store the combined DataFrame for this station
    station_data[station_id] = combined_df

In [8]:
db_user = credentials['USER']
db_password = credentials['PASSWORD']
db_host = credentials['HOST']
db_port = credentials['PORT']
db_name = credentials['DATABASE']

In [9]:
engine = create_engine(
    f'mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
)

for station_id in station_ids:
    station_data[station_id].to_sql(f'{station_id}', engine, if_exists='replace')

print("Data is saved successfully!")

Data is saved successfully!
