In [11]:
import pandas as pd
import numpy as np
import requests
from requests import Request
import json
import csv
import dotenv
import os
from dotenv import load_dotenv, find_dotenv
from flipside import Flipside
import logging
import time
# from degate.spot import Spot as Client
# from degate.lib.utils import config_logging
from concurrent.futures import ThreadPoolExecutor, as_completed

In [12]:
load_dotenv()
dotenv.load_dotenv('../../.env')
api_key = os.environ["DUNE_API_KEY"]
headers = {"X-Dune-API-Key": api_key}
flipside_key = os.environ["FLIPSIDE_API_KEY"]

In [None]:
# 28/02 last version to clean and prepare trading data to be uploaded to Dune
import time
def clean_and_prepare_data(input_file):
    """
    Clean and prepare the trading data from the input file.

    """
    try:
        df = pd.read_csv(input_file)
        logging.info(f"File {input_file} loaded successfully with shape {df.shape}")

        base = df.copy()
        # logging.info("Dataframe copied for transformation.")
        base.rename(columns={'0':'date','1':'open','2':'high','3':'low','4':'close','5':'volume','6':'close_time',
                            '7':'quote_volume','8':'trades','9':'taker_buy_base_vol','10':'taker_buy_quote_vol',
                            '11':'ignore','12':'_avg_price'}, inplace=True)
        logging.info(f"Columns renamed successfully. New columns: {list(base.columns)}")

        base = base.drop(['taker_buy_base_vol','taker_buy_quote_vol','ignore','_avg_price'], axis=1)
        logging.info(f"Columns dropped successfully. New columns: {list(base.columns)}")
    
        # Save file with timestamps to use for taking the close_time for the next fetching of trading data
        last_update = base.to_csv('test_degate_trades_updates.csv', index=False)
        logging.info(f"Intermediate file saved as: {last_update}")

        # Prepare df for further uploads
        base['date'] = pd.to_datetime(base['date'], unit='ms')
        base['close_time'] = pd.to_datetime(base['close_time'], unit='ms')
        logging.info(f"Converted 'date' and 'close_time' to datetime format.")

        # Save the final cleaned data to a new file
        base.to_csv(f'degate_updates_{time.strftime("%Y%m%d_%H%M%S")}.csv', index=False)
        logging.info(f"Data cleaned and prepared for upload to Dune.")
        
        return base

    except Exception as e:
        logging.error(f"Error during data cleaning process: {e}", exc_info=True)
    return None



In [None]:
#  Step 3: Upload the data to Dune 
# url = "https://api.dune.com/api/v1/table/kemasan/degate_trades/insert"
# headers = {
#     "X-DUNE-API-KEY": api_key,
#     "Content-Type": "text/csv"
# }

# with open(f"degate_updates_{time.strftime('%Y%m%d_%H%M%S')}.csv", "rb") as data:
#     response = requests.request("POST", url, data=data, headers=headers)

# print(response.text)

In [None]:
# 02.03 test 
def update_dune_table(df):
    """
    Insert new trading data to Dune table.
    """
    try:
        # Ensure the DataFrame is saved to a CSV file
        output_file = f'degate_updates_{time.strftime("%Y%m%d_%H%M%S")}.csv'
        df.to_csv(output_file, index=False)

        # Set the URL and headers for Dune API
        url = "https://api.dune.com/api/v1/table/kemasan/degate_trades/insert"
        headers = {
            "X-DUNE-API-KEY": api_key,
            "Content-Type": "text/csv"
        }

        # Upload the CSV to Dune
        with open(output_file, "rb") as data:
            response = requests.request("POST", url, data=data, headers=headers)

        # Check the response status
        if response.status_code == 200:
            logging.info(f"Data uploaded successfully to Dune. Response: {response.text}")
        else:
            logging.error(f"Error uploading data to Dune. Response: {response.text}")
    
    except Exception as e:
        logging.error(f"Error during upload to Dune: {e}", exc_info=True)
    return None

def main(input_file):
    # Clean and prepare the data
    cleaned_data = clean_and_prepare_data(input_file)
    
    # If data was successfully cleaned, upload it to Dune
    if cleaned_data is not None:
        update_dune_table(cleaned_data)


In [None]:
# # 2/03/25 gpt version
# def clean_and_prepare_data(input_file):
#     """
#     Clean and prepare the trading data from the input file.
#     """
#     try:
#         df = pd.read_csv(input_file)
#         logging.info(f"File {input_file} loaded successfully with shape {df.shape}")

#         base = df.copy()
#         base.rename(columns={'0':'date','1':'open','2':'high','3':'low','4':'close','5':'volume','6':'close_time',
#                              '7':'quote_volume','8':'trades','9':'taker_buy_base_vol','10':'taker_buy_quote_vol',
#                              '11':'ignore','12':'_avg_price'}, inplace=True)
#         logging.info(f"Columns renamed successfully. New columns: {list(base.columns)}")

#         base = base.drop(['taker_buy_base_vol','taker_buy_quote_vol','ignore','_avg_price'], axis=1)
#         logging.info(f"Columns dropped successfully. New columns: {list(base.columns)}")

#         # Save intermediate file with timestamps to use for close_time for next fetch
#         base.to_csv('test_degate_trades_updates.csv', index=False)
#         logging.info(f"Intermediate file saved as: test_degate_trades_updates.csv")

#         # Convert timestamps to datetime format
#         base['date'] = pd.to_datetime(base['date'], unit='ms')
#         base['close_time'] = pd.to_datetime(base['close_time'], unit='ms')
#         logging.info(f"Converted 'date' and 'close_time' to datetime format.")

#         # Save cleaned data to a new file
#         base.to_csv(f'degate_updates_{time.strftime("%Y%m%d_%H%M%S")}.csv', index=False)
#         logging.info(f"Data cleaned and prepared for upload to Dune.")

#         # Return cleaned DataFrame for further processing
#         return base

#     except Exception as e:
#         logging.error(f"Error during data cleaning process: {e}", exc_info=True)
#     return None


# def upload_to_dune(data):
#     """
#     Upload the cleaned data to Dune Analytics.
#     This is a placeholder function, you need to implement the upload logic here.
#     """
#     try:
#         # Example: Upload the data to Dune (replace with actual upload code)
#         logging.info("Uploading data to Dune...")
#         # Your upload logic goes here
#         logging.info(f"Successfully uploaded data: {data.shape[0]} rows.")
#     except Exception as e:
#         logging.error(f"Error during uploading process: {e}", exc_info=True)


# def main():
#     # Assume `output_file` is the file generated in your earlier process
#     output_file = "trading_data.csv"  # Update with actual filename if needed

#     # Clean and prepare data
#     cleaned_data = clean_and_prepare_data(output_file)
    
#     if cleaned_data is not None:
#         # Upload the cleaned data to Dune
#         upload_to_dune(cleaned_data)
#     else:
#         logging.warning("No data to upload to Dune.")
