In [None]:
import os
import pandas as pd
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from tqdm.notebook import tqdm  # Changed from tqdm.notebook
import logging
from typing import List
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# InfluxDB configuration
bucket = "crypto"
org = os.environ["INFLUXDB_ORG"]
token = os.environ["INFLUXDB_TOKEN"]
url = os.environ["INFLUXDB_URL"]

# Optimized client configuration
client = InfluxDBClient(
    url=url, 
    token=token, 
    org=org,
    timeout=30000,  # 30 second timeout
    retries=3
)

# Use synchronous write API with batching
write_api = client.write_api(write_options=SYNCHRONOUS)

column_names = [
    "open_time", "open", "high", "low", "close", "volume",
    "close_time", "quote_asset_volume", "number_of_trades",
    "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]

# Columns to convert to numeric (excluding time columns and ignore)
numeric_columns = [
    "open", "high", "low", "close", "volume", "quote_asset_volume",
    "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume"
]

def create_points_batch(df: pd.DataFrame, symbol: str) -> List[Point]:
    """Create a batch of InfluxDB points from DataFrame rows."""
    points = []
    
    for row in df.itertuples(index=False):
        try:
            point = (
                Point("klines")
                .tag("symbol", symbol)
                .field("open", float(row.open))
                .field("high", float(row.high))
                .field("low", float(row.low))
                .field("close", float(row.close))
                .field("volume", float(row.volume))
                .field("quote_asset_volume", float(row.quote_asset_volume))
                .field("number_of_trades", int(row.number_of_trades))
                .field("taker_buy_base_asset_volume", float(row.taker_buy_base_asset_volume))
                .field("taker_buy_quote_asset_volume", float(row.taker_buy_quote_asset_volume))
                .time(int(row.open_time), WritePrecision.MS)
            )
            points.append(point)
        except (ValueError, TypeError, AttributeError) as e:
            logger.warning(f"Skipping row due to data error: {e}")
            continue
    
    return points

def process_csv_file(fpath: str, symbol: str, batch_size: int = 5000) -> int:
    """Process a single CSV file and write to InfluxDB in batches."""
    try:
        logger.info(f"Processing file: {fpath}")
        
        # First, peek at the first line to check for headers
        with open(fpath, 'r') as f:
            first_line = f.readline().strip()
        
        # Check if first line contains headers (non-numeric values in timestamp position)
        skip_header = False
        if first_line:
            first_value = first_line.split(',')[0].strip()
            if not first_value.isdigit():
                skip_header = True
                logger.info(f"Detected header row in {fpath}, skipping first row")
        
        # Read CSV with optimized settings
        df = pd.read_csv(
            fpath, 
            names=column_names, 
            header=None,
            skiprows=1 if skip_header else 0,  # Skip header if detected
            dtype={
                'open_time': 'str',  # Read as string first for validation
                'close_time': 'str',  # Also handle close_time
                'number_of_trades': 'Int64'  # Handle potential NaN values
            }
        )
        
        logger.info(f"Read {len(df)} rows from {fpath}")
        
        # Filter valid timestamps more efficiently - handle both string and numeric
        if len(df) > 0:
            # Convert to string if not already, then filter
            df['open_time_str'] = df['open_time'].astype(str)
            df = df[df["open_time_str"].str.isdigit()].copy()
            df = df.drop(columns=['open_time_str'])  # Clean up temp column
        
        if df.empty:
            logger.warning(f"No valid data in {fpath}")
            return 0
        
        # Drop ignore column if it exists
        if "ignore" in df.columns:
            df = df.drop(columns=["ignore"])
        
        # Convert numeric columns efficiently with better error handling
        for col in numeric_columns:
            if col in df.columns:
                # First attempt standard conversion
                df[col] = pd.to_numeric(df[col], errors='coerce')
                # Check for any remaining non-numeric values that might be headers
                non_numeric_mask = df[col].isna() & df[col].notna()
                if non_numeric_mask.any():
                    logger.warning(f"Found non-numeric values in column {col}, they will be dropped")
        
        # Convert open_time to int64 with better error handling
        df['open_time'] = pd.to_numeric(df['open_time'], errors='coerce')
        
        # Drop rows where open_time conversion failed (likely header rows that slipped through)
        df = df[df['open_time'].notna()].copy()
        df['open_time'] = df['open_time'].astype('int64')
        
        # Drop rows with any NaN values in critical columns
        initial_rows = len(df)
        critical_columns = ['open_time'] + numeric_columns
        df = df.dropna(subset=[col for col in critical_columns if col in df.columns])
        
        if len(df) < initial_rows:
            logger.info(f"Dropped {initial_rows - len(df)} rows with invalid data in {fpath}")
        
        if df.empty:
            logger.warning(f"No valid rows remaining after cleaning in {fpath}")
            return 0
        
        # Process in batches with retry logic
        total_rows = len(df)
        rows_written = 0
        max_retries = 3
        
        # Create progress bar for batches
        batch_count = (total_rows + batch_size - 1) // batch_size  # Ceiling division
        batch_pbar = tqdm(
            range(0, total_rows, batch_size), 
            desc=f"Writing batches for {os.path.basename(fpath)}", 
            unit="batch",
            leave=False
        )
        
        for i in batch_pbar:
            batch_df = df.iloc[i:i + batch_size]
            points = create_points_batch(batch_df, symbol)
            
            if points:
                # Retry logic for writing batches
                for attempt in range(max_retries):
                    try:
                        write_api.write(bucket=bucket, org=org, record=points)
                        rows_written += len(points)
                        batch_pbar.set_postfix({"rows_written": rows_written})
                        break
                    except Exception as write_error:
                        logger.warning(f"Write attempt {attempt + 1} failed: {write_error}")
                        if attempt == max_retries - 1:
                            logger.error(f"Failed to write batch after {max_retries} attempts")
                            raise
                        time.sleep(1)  # Brief pause before retry
        
        batch_pbar.close()
        
        logger.info(f"Successfully imported {rows_written} rows from {fpath}")
        return rows_written
        
    except FileNotFoundError:
        logger.error(f"File not found: {fpath}")
        return 0
    except pd.errors.EmptyDataError:
        logger.error(f"Empty or invalid CSV file: {fpath}")
        return 0
    except Exception as e:
        logger.error(f"Error processing {fpath}: {e}")
        return 0

def validate_environment():
    """Validate required environment variables."""
    required_vars = ["INFLUXDB_ORG", "INFLUXDB_TOKEN", "INFLUXDB_URL"]
    missing_vars = [var for var in required_vars if not os.environ.get(var)]
    
    if missing_vars:
        raise ValueError(f"Missing required environment variables: {missing_vars}")

def test_influxdb_connection():
    """Test InfluxDB connection before processing."""
    try:
        # Test connection with a simple query
        query_api = client.query_api()
        # Simple ping query
        result = query_api.query(f'buckets() |> filter(fn: (r) => r.name == "{bucket}") |> limit(n: 1)')
        logger.info("InfluxDB connection test successful")
        return True
    except Exception as e:
        logger.error(f"InfluxDB connection test failed: {e}")
        return False

def main():
    """Main execution function."""
    try:
        # Validate environment
        validate_environment()
        
        # Test InfluxDB connection
        if not test_influxdb_connection():
            logger.error("Cannot proceed without valid InfluxDB connection")
            return
        
        data_dir = "/Users/orentapiero/DATA/binance_klines"
        
        if not os.path.exists(data_dir):
            logger.error(f"Data directory does not exist: {data_dir}")
            return
        
        total_files_processed = 0
        total_rows_imported = 0
        
        # Get list of symbol directories
        symbol_dirs = [d for d in os.listdir(data_dir) 
                       if os.path.isdir(os.path.join(data_dir, d))]
        
        if not symbol_dirs:
            logger.error(f"No symbol directories found in {data_dir}")
            return
        
        logger.info(f"Found {len(symbol_dirs)} symbol directories to process")
        
        for symbol in tqdm(symbol_dirs, desc="Processing symbols"):
            symbol_dir = os.path.join(data_dir, symbol)
            
            # Get CSV files in symbol directory
            csv_files = [f for f in os.listdir(symbol_dir) if f.endswith(".csv")]
            
            if not csv_files:
                logger.warning(f"No CSV files found in {symbol_dir}")
                continue
            
            logger.info(f"Processing {len(csv_files)} files for symbol {symbol}")
            
            for fname in tqdm(csv_files, desc=f"Files in {symbol}", leave=False):
                fpath = os.path.join(symbol_dir, fname)
                rows_imported = process_csv_file(fpath, symbol)
                total_rows_imported += rows_imported
                total_files_processed += 1
        
        logger.info(f"Import completed! Processed {total_files_processed} files, "
                   f"imported {total_rows_imported} total rows")
        
    except KeyboardInterrupt:
        logger.info("Import interrupted by user")
    except Exception as e:
        logger.error(f"Unexpected error during import: {e}")
    finally:
        try:
            client.close()
            logger.info("InfluxDB client closed")
        except Exception as e:
            logger.warning(f"Error closing client: {e}")

if __name__ == "__main__":
    main()

In [2]:
import os
import pandas as pd
from influxdb_client import InfluxDBClient

# InfluxDB configuration
bucket = "crypto"
org = os.environ["INFLUXDB_ORG"]
token = os.environ["INFLUXDB_TOKEN"]
url = os.environ["INFLUXDB_URL"]

def get_btc_ohlcv(days_back=7):
    """
    Fetch OHLCV data for BTCUSDT from InfluxDB
    
    Args:
        days_back (int): Number of days to look back (default: 7)
    
    Returns:
        pandas.DataFrame: DataFrame with columns [time, open, high, low, close, volume]
    """
    
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = client.query_api()
    
    query = f'''
    from(bucket: "{bucket}")
    |> range(start: -{days_back}d)
    |> filter(fn: (r) => r._measurement == "klines")
    |> filter(fn: (r) => r.symbol == "BTCUSDT")
    |> filter(fn: (r) => r._field == "open" or r._field == "high" or r._field == "low" or r._field == "close" or r._field == "volume")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> sort(columns: ["_time"])
    |> keep(columns: ["_time", "open", "high", "low", "close", "volume"])
    '''
    
    try:
        result = query_api.query_data_frame(query)
        
        if not result.empty:
            # Rename _time to time for cleaner output
            result = result.rename(columns={'_time': 'time'})
            # Convert time to datetime if it isn't already
            result['time'] = pd.to_datetime(result['time'])
            # Reset index for cleaner display
            result = result.reset_index(drop=True)
            
        return result
        
    except Exception as e:
        print(f"Error fetching data: {e}")
        return pd.DataFrame()
    
    finally:
        client.close()

# Usage examples
if __name__ == "__main__":
    
    # Get last 7 days of BTCUSDT data
    btc_data = get_btc_ohlcv(days_back=7)
    
    if not btc_data.empty:
        print("BTCUSDT OHLCV Data:")
        print("=" * 50)
        print(f"Total records: {len(btc_data)}")
        print(f"Time range: {btc_data['time'].min()} to {btc_data['time'].max()}")
        print("\nFirst 5 rows:")
        print(btc_data.head())
        print("\nLast 5 rows:")
        print(btc_data.tail())
        
        # Basic statistics
        print("\nPrice Statistics:")
        print(f"Highest price: ${btc_data['high'].max():,.2f}")
        print(f"Lowest price: ${btc_data['low'].min():,.2f}")
        print(f"Latest close: ${btc_data['close'].iloc[-1]:,.2f}")
        print(f"Total volume: {btc_data['volume'].sum():,.2f}")
        
    else:
        print("No data found for BTCUSDT")

# Alternative: Get specific number of latest records
def get_btc_latest(limit=100):
    """Get the latest N records for BTCUSDT"""
    
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = client.query_api()
    
    query = f'''
    from(bucket: "{bucket}")
    |> range(start: -30d)
    |> filter(fn: (r) => r._measurement == "klines")
    |> filter(fn: (r) => r.symbol == "BTCUSDT")
    |> filter(fn: (r) => r._field == "open" or r._field == "high" or r._field == "low" or r._field == "close" or r._field == "volume")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> sort(columns: ["_time"], desc: true)
    |> limit(n: {limit})
    |> sort(columns: ["_time"])
    |> keep(columns: ["_time", "open", "high", "low", "close", "volume"])
    '''
    
    try:
        result = query_api.query_data_frame(query)
        if not result.empty:
            result = result.rename(columns={'_time': 'time'})
            result['time'] = pd.to_datetime(result['time'])
            result = result.reset_index(drop=True)
        return result
    except Exception as e:
        print(f"Error fetching data: {e}")
        return pd.DataFrame()
    finally:
        client.close()

# Quick one-liner usage:
# btc_data = get_btc_ohlcv(days_back=1)  # Last 24 hours
# btc_latest = get_btc_latest(limit=50)   # Latest 50 records

BTCUSDT OHLCV Data:
Total records: 4648
Time range: 2025-07-28 18:32:00+00:00 to 2025-07-31 23:59:00+00:00

First 5 rows:
    result  table                      time     close      high       low  \
0  _result      0 2025-07-28 18:32:00+00:00  117464.7  117465.4  117440.0   
1  _result      0 2025-07-28 18:33:00+00:00  117439.8  117464.8  117400.0   
2  _result      0 2025-07-28 18:34:00+00:00  117423.8  117467.5  117388.0   
3  _result      0 2025-07-28 18:35:00+00:00  117462.1  117500.0  117423.9   
4  _result      0 2025-07-28 18:36:00+00:00  117520.0  117540.0  117462.2   

       open   volume  
0  117464.7  200.595  
1  117464.8  267.729  
2  117438.5  152.327  
3  117423.9   69.893  
4  117462.2   55.829  

Last 5 rows:
       result  table                      time     close      high       low  \
4643  _result      0 2025-07-31 23:55:00+00:00  115665.0  115665.0  115500.0   
4644  _result      0 2025-07-31 23:56:00+00:00  115675.8  115768.7  115588.6   
4645  _result      0 20