In [20]:
import ast
import httpx
import requests
import sqlparse
import pandas as pd

from enum import Enum
from uuid import UUID
from functools import wraps
from pydantic import BaseModel, AnyHttpUrl
from datetime import date, datetime
from typing import Dict, Union, Optional, List
from duneanalytics import DuneAnalytics

In [2]:
BASE_URL = "https://dune.xyz"
LOGIN_URL = f"{BASE_URL}/auth/login"
API_URL = f"{BASE_URL}/api/"
API_AUTH_URL = f"{API_URL}/auth"
SESSION_URL = f"{API_AUTH_URL}/session"
CSRF_URL = f"{API_AUTH_URL}/csrf"
GRAPH_QL_URL = "https://core-hsr.duneanalytics.com/v1/graphql"

class Verb(str, Enum):
    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"
    
    def __contains__(cls, item):
        try:
            cls(item)
        except ValueError:
            return False
        return True    


In [3]:
def decode_if_bytes(text: Union[bytes, str]) -> str:
    if isinstance(text, bytes):
        try:
            text = text.decode('utf-8')
        except UnicodeDecodeError:
            text = text.decode('iso-8859-1')
    return text


def decode_if_bytes(text: Union[bytes, str]):
    if isinstance(text, bytes):
        try:
            text = text.decode('utf-8')
        except UnicodeDecodeError:
            text = text.decode('iso-8859-1')
    return text

def raise_for_status(response: httpx.Response):
    http_error_msg = ''
    reason = decode_if_bytes(
        response.__dict__\
        .get('extensions')\
        .get('reason_phrase')
    )

    if 400 <= response.status_code < 500:
        http_error_msg = u'%s Client Error: %s for url: %s' % (
            response.status_code, reason, response.url)

    elif 500 <= response.status_code < 600:
        http_error_msg = u'%s Server Error: %s for url: %s' % (
            response.status_code, reason, response.url)

    content = decode_if_bytes(response.content)
    content: Dict = decode_if_bytes(ast.literal_eval(content))

    if http_error_msg:
        raise httpx.HTTPError(http_error_msg)
        
def raise_on_bad_status(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        # Do some checks
        response = func(self, *args, **kwargs)
        if response.status_code < 400:
            return response
        return raise_for_status(response)
    return wrapper


In [4]:
class User(BaseModel):
    id: int
    name: str
    profile_image_url: Optional[AnyHttpUrl]
        
    @property
    def handle(self) -> str:
        return f'@{self.handle}'

class QueryMetadata(BaseModel):
    id: int
    name: str
    description: str
    user: User
    query: str # SQL
    # https://sqlparse.readthedocs.io/en/latest/#:~:text=sqlparse%20is%20a%20non%2Dvalidating,of%20the%20New%20BSD%20license.
    created_at: datetime
    updated_at: datetime

class RawRow(BaseModel):
    data: Dict
    __typename: str

class QueryResultData(BaseModel):
    id: UUID
    job_id: UUID
    runtime: int # seconds
    generated_at: datetime
    columns: List[str]
    raw_data: List[RawRow]
        
class Query(BaseModel):
    metadata: QueryMetadata
    result_data: QueryResultData


In [21]:
DEFAULT_HEADERS = {
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,'
              'image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
    'dnt': '1',
    'sec-ch-ua': '"Google Chrome";v="95", "Chromium";v="95", ";Not A Brand";v="99"',
    'sec-ch-ua-mobile': '?0',
    'sec-fetch-dest': 'document',
    'sec-fetch-mode': 'cors',
    'sec-fetch-site': 'same-site',
    'origin': BASE_URL,
    'upgrade-insecure-requests': '1',
    'x-hasura-api-key': ''
}

FIND_QUERY = "query FindQuery($session_id: Int, $id: Int!, $favs_last_24h: Boolean! = false, $favs_last_7d: Boolean! = false, $favs_last_30d: Boolean! = false, $favs_all_time: Boolean! = true) {\n  queries(where: {id: {_eq: $id}}) {\n    ...Query\n    favorite_queries(where: {user_id: {_eq: $session_id}}, limit: 1) {\n      created_at\n      __typename\n    }\n    __typename\n  }\n}\n\nfragment Query on queries {\n  ...BaseQuery\n  ...QueryVisualizations\n  ...QueryForked\n  ...QueryUsers\n  ...QueryFavorites\n  __typename\n}\n\nfragment BaseQuery on queries {\n  id\n  dataset_id\n  name\n  description\n  query\n  private_to_group_id\n  is_temp\n  is_archived\n  created_at\n  updated_at\n  schedule\n  tags\n  parameters\n  __typename\n}\n\nfragment QueryVisualizations on queries {\n  visualizations {\n    id\n    type\n    name\n    options\n    created_at\n    __typename\n  }\n  __typename\n}\n\nfragment QueryForked on queries {\n  forked_query {\n    id\n    name\n    user {\n      name\n      __typename\n    }\n    __typename\n  }\n  __typename\n}\n\nfragment QueryUsers on queries {\n  user {\n    ...User\n    __typename\n  }\n  __typename\n}\n\nfragment User on users {\n  id\n  name\n  profile_image_url\n  __typename\n}\n\nfragment QueryFavorites on queries {\n  query_favorite_count_all @include(if: $favs_all_time) {\n    favorite_count\n    __typename\n  }\n  query_favorite_count_last_24h @include(if: $favs_last_24h) {\n    favorite_count\n    __typename\n  }\n  query_favorite_count_last_7d @include(if: $favs_last_7d) {\n    favorite_count\n    __typename\n  }\n  query_favorite_count_last_30d @include(if: $favs_last_30d) {\n    favorite_count\n    __typename\n  }\n  __typename\n}\n"
GET_RESULT_QUERY = "query GetResult($query_id: Int!, $parameters: [Parameter!]) {\n  get_result(query_id: $query_id, parameters: $parameters) {\n    job_id\n    result_id\n    __typename\n  }\n}\n"
FIND_RESULT_DATA_BY_RESULT_QUERY = "query FindResultDataByResult($result_id: uuid!) {\n  query_results(where: {id: {_eq: $result_id}}) {\n    id\n    job_id\n    error\n    runtime\n    generated_at\n    columns\n    __typename\n  }\n  get_result_by_result_id(args: {want_result_id: $result_id}) {\n    data\n    __typename\n  }\n}\n"

class Client(httpx.Client):

    @raise_on_bad_status
    def send(self, *args, **kwargs) -> httpx.Response:
        return super().send(*args, **kwargs)
            

class DuneQuery:
    def __init__(self, query: Query):
        self.metadata: QueryMetadata = query.metadata
        self.result_data: QueryResultData = query.result_data
        self._df = None
        
    def __repr__(self) -> str:
        return f'<DuneQuery query_id={self.query_id} name={self.name} length={self.length} rows>'
    
    @property
    def query_id(self) -> int:
        return self.metadata.id
    
    @property
    def name(self) -> str:
        return self.metadata.name
    
    @property
    def length(self) -> int:
        return len(self.result_data.raw_data)
    
    @property
    def author(self) -> str:
        return self.metadata.user.handle
    
    @property
    def columns(self) -> List[str]:
        return self.result_data.columns
    
    @property
    def raw(self) -> List[RawRow]:
        return self.result_data.raw_data
    
    @property
    def raw_sql(self) -> str:
        return self.metadata.query
    
    @property
    def df(self) -> pd.DataFrame:
        # ad-hoc caching
        if self._df is None:
            self._df = self._process_to_df(self.raw)
        return self._df
    
    def _process_to_df(self, results: List) -> pd.DataFrame:
        processed = [r.data for r in results]
        return pd.DataFrame(processed) 
    
    def to_csv(self, filename: str) -> None:
        return self.df.to_csv(filename)
    
    
#     @property
#     def raw_sql(self) -> str:
          
    
class Dune:
    def __init__(self, username=None, password=None):
        self.client = httpx.Client()
        self.client.headers.update(DEFAULT_HEADERS)
        if username is not None and password is not None:
            self.logged_in = self.login(username, password)
        else:
            self.logged_in = False
    
    def create_graphql_request(operation: str, query: str, variables: dict) -> httpx.Request:
        data = {
            "operationName": operation,
            "query": query_string,
            "variables": variables
        }
        request = self.client.build_request(
            "POST", GRAPH_URL, json=data
        )
        return request


    def login(self, username: str, password: str) -> bool:
        self.client.get(LOGIN_URL)
        self.client.post(CSRF_URL)
        csrf_token = self.client.cookies.get('csrf')
        
        form_data = {
            'action': 'login',
            'username': username,
            'password': password,
            'csrf': csrf_token,
            'next': BASE_URL
        }
        self.client.post(API_AUTH_URL, data=form_data)
        
        # Fetch AUTH token
        response = self.client.post(SESSION_URL)
        if response.status_code >= 400:
            print("Dune Login Failed: Defaulting to No User/Pass")
            return False
        
        token = response.json().get('token')
        self.client.headers.update(
            {'authorization': f'Bearer {token}'}
        )
        return True
    
    def _post_graph_ql(self, operation: str, query: str, variables: dict) -> dict:
        data = {
            "operationName": operation,
            "query": query,
            "variables": variables
        }
        response = self.client.post(GRAPH_QL_URL, json=data)
        return response.json()
    
    def _fetch_query_metadata(self, query_id: int) -> QueryMetadata:
        raw_metadata = self._post_graph_ql(
            "FindQuery",
            FIND_QUERY,
            {"id": query_id}
        )
        metadata=raw_metadata['data']['queries'][0]
        return QueryMetadata(**metadata)
    
    def _fetch_result_id(self, query_id: int) -> str:
        result_id_data = self._post_graph_ql(
            "GetResult",
            GET_RESULT_QUERY,
            {"query_id": query_id}
        )
        return result_id_data\
            .get('data')\
            .get('get_result')\
            .get('result_id')
        
    def _fetch_result_data(self, query_id: int) -> QueryResultData:
        result_id = self._fetch_result_id(query_id)
        raw_result = self._post_graph_ql(
            "FindResultDataByResult",
            FIND_RESULT_DATA_BY_RESULT_QUERY,
            {"result_id": result_id}
        )
        
        query_result_data = raw_result['data']['query_results'][0]
        query_result_data['raw_data'] = raw_result['data']['get_result_by_result_id']
        return QueryResultData(**query_result_data)
    
    # Should return a DuneQuery Object, that can be used to grab the table, charts etc
    # Streaming Responses??
    def fetch_query(self, query_id: int) -> DuneQuery:
        metadata = self._fetch_query_metadata(query_id)
        result_data = self._fetch_result_data(query_id)
        query = Query(metadata=metadata, result_data=result_data)
        return DuneQuery(query)
        
        

In [22]:
dune = Dune()
query = dune.fetch_query(441535)
query.df.head()

Unnamed: 0,Plants Sold,LPs Harvested,LPs per Plant Harvested,Plants Bought,Plants Compounded,Plants Planted,day
0,,,,,,,2021-12-16T00:00:00+00:00
1,,,,,,,2021-12-17T00:00:00+00:00
2,,,,,,,2021-12-18T00:00:00+00:00
3,,,,,,,2021-12-19T00:00:00+00:00
4,,,,,,,2021-12-20T00:00:00+00:00


In [36]:
raw = query.raw_sql.replace('\n', ' ')


In [24]:
# query.sql

<Statement ' WITH ...' at 0x11BF07430>

In [None]:
query['data']

In [None]:
query['data']['get_result_by_result_id']

In [None]:
'''
{'id': 441535,
 'dataset_id': 9,
 'name': 'User wallet plant strategy',
 'description': "Gives you the user wallet's past plant strategies",
 'query': '\nWITH\nseeds_planted AS (\n    SELECT\n        tr.evt_block_time,\n        DATE_TRUNC(\'day\', tr.evt_block_time) AS DAY,\n        tr."seedsUsed" as amount,\n        tr.contract_address,\n        tr.sender as sender\n    FROM drip."Garden_evt_SeedsPlanted" tr\n    WHERE sender = CONCAT(\'\\x\', substring(\'{{Wallet}}\' from 3))::bytea\n    ORDER BY evt_block_time DESC\n),\nseeds_planted_daily AS (\n    SELECT \n        day,\n        SUM(amount) AS "Seeds Planted",\n        --SUM(amount)/2592000 AS "Plants Planted"\n        SUM(TRUNC(amount/2592000, 0)) AS "Plants Planted"\n    FROM seeds_planted\n    GROUP BY day\n    ORDER BY day ASC\n),\nseeds_bought AS (\n    SELECT\n        tr.evt_block_time,\n        DATE_TRUNC(\'day\', tr.evt_block_time) AS DAY,\n        tr."amountEggs" as amount,\n        tr.contract_address,\n        tr.sender as sender\n    FROM drip."Garden_evt_SeedsBought" tr\n    WHERE sender = CONCAT(\'\\x\', substring(\'{{Wallet}}\' from 3))::bytea\n    ORDER BY evt_block_time DESC\n),\nseeds_bought_daily AS (\n    SELECT \n        day,\n        SUM(amount) AS "Seeds Bought",\n        SUM(amount)/2592000 AS "Plants Bought"\n    FROM seeds_bought\n    GROUP BY day\n    ORDER BY day ASC\n),\nseeds_sold AS (\n    SELECT\n        tr.evt_block_time,\n        DATE_TRUNC(\'day\', tr.evt_block_time) AS DAY,\n        tr."seedsSold" as amount,\n        tr.contract_address,\n        tr.sender as sender,\n        tr."seedValue"\n    FROM drip."Garden_evt_SeedsSold" tr\n    WHERE sender = CONCAT(\'\\x\', substring(\'{{Wallet}}\' from 3))::bytea\n    ORDER BY evt_block_time DESC\n),\nseeds_sold_daily AS (\n    SELECT \n        day,\n        SUM(amount) AS "Seeds Sold",\n        SUM(amount)/2592000 AS "Plants Sold"\n    FROM seeds_sold\n    GROUP BY day\n    ORDER BY day ASC\n),\n\nseeds_sold_value_daily AS (\n    SELECT \n        day,\n        SUM("seedValue") AS "Seeds Value",\n        SUM("seedValue")/1e18 AS "Plants Value"\n    FROM seeds_sold\n    GROUP BY day\n    ORDER BY day ASC\n),\n\ndays AS\n(\n    SELECT generate_series((SELECT DATE_TRUNC(\'day\', MIN(evt_block_time)) FROM drip."Garden_evt_SeedsBought")::timestamp, DATE_TRUNC(\'day\', NOW()), \'1 day\') AS DAY\n)\n\nSELECT\n    d.day,\n    sb."Plants Bought" AS "Plants Bought",\n    sp."Plants Planted" AS "Plants Planted",\n    ss."Plants Sold" AS " Plants Sold",\n    ROUND(sp."Plants Planted" - COALESCE(sb."Plants Bought", 0), 0) AS "Plants Compounded",\n    ssv."Plants Value" AS "LPs Harvested",\n    ssv."Plants Value" / ss."Plants Sold" AS "LPs per Plant Harvested"\nFROM\n    days d\nLEFT JOIN\n    seeds_planted_daily sp ON sp.day = d.day\nLEFT JOIN\n    seeds_bought_daily sb ON sb.day = d.day\nLEFT JOIN\n    seeds_sold_daily ss ON ss.day = d.day\nLEFT JOIN\n    seeds_sold_value_daily ssv ON ssv.day = ss.day\nWHERE\n    d.day >= NOW() - interval \'{{Period}}\'\n',
 'private_to_group_id': None,
 'is_temp': False,
 'is_archived': False,
 'created_at': '2022-02-21T16:48:28.266724+00:00',
 'updated_at': '2022-03-17T16:59:30.838907+00:00',
 'schedule': None,
 'tags': [],
 'parameters': [{'key': 'Period',
   'type': 'enum',
   'value': '12 month',
   'enumOptions': ['1 month', '3 month', '6 month', '9 month', '12 month']},
  {'key': 'Wallet',
   'type': 'text',
   'value': '0x000000000000000000000000000000000000dead'}],
 '__typename': 'queries',
 'visualizations': [{'id': 839573,
   'type': 'table',
   'name': 'Query results',
   'options': {},
   'created_at': '2022-02-21T16:48:28.266724+00:00',
   '__typename': 'visualizations'},
  {'id': 840866,
   'type': 'chart',
   'name': 'Chart',
   'options': {'sortX': True,
    'xAxis': {'type': '-'},
    'yAxis': [{'type': 'linear'}],
    'legend': {'enabled': True},
    'series': {'percentValues': False},
    'reverseX': False,
    'columnMapping': {'day': 'x',
     'Sold': 'y',
     'Bought': 'y',
     'LP Sold': 'y',
     'Planted': 'y',
     'Compounded': 'y',
     'Plants Sold': 'y',
     ' Plants Sold': 'y',
     'LPs Harvested': 'y',
     'Plants Bought': 'y',
     'Plants Planted': 'y',
     'LP per Plant Sold': 'y',
     'Plants Compounded': 'y',
     'LPs per Plant Harvested': 'y'},
    'seriesOptions': {' Plants Sold': {'type': 'column',
      'yAxis': 0,
      'zIndex': 2},
     'LPs Harvested': {'type': 'scatter', 'yAxis': 0, 'zIndex': 3},
     'Plants Bought': {'type': 'column', 'yAxis': 0, 'zIndex': 0},
     'Plants Planted': {'type': 'column', 'yAxis': 0, 'zIndex': 1},
     'Plants Compounded': {'type': 'column', 'yAxis': 0, 'zIndex': 2},
     'LPs per Plant Harvested': {'type': 'scatter', 'yAxis': 0, 'zIndex': 4}},
    'valuesOptions': {},
    'showDataLabels': False,
    'globalSeriesType': 'line'},
   'created_at': '2022-02-21T20:58:56.851956+00:00',
   '__typename': 'visualizations'}],
 'forked_query': None,
 'user': {'id': 90167,
  'name': 'crypto586',
  'profile_image_url': None,
  '__typename': 'users'},
 'query_favorite_count_all': {'favorite_count': 3,
  '__typename': 'query_favorite_count_all'},
 'favorite_queries': [{'created_at': '2022-03-20T07:06:38.238884+00:00',
   '__typename': 'favorite_queries'}]}
'''