In [6]:
import nasdaqdatalink as nd
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, Float, String, inspect
from sqlalchemy.exc import SQLAlchemyError
import pandas as pd
import os
import gzip
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import time
from dotenv import load_dotenv
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
load_dotenv()

True

In [7]:
NASDAQ_API = os.environ.get('NASDAQ_API')
nd.ApiConfig.api_key = NASDAQ_API

In [8]:
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DB = os.getenv("MYSQL_DB")

DATABASE_URL = f"mysql+mysqlconnector://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}"
engine = create_engine(DATABASE_URL)

In [9]:
MYSQL_DB

'nasdaqdata'

In [10]:
def save_to_csv(dataframe, file_path):
    dataframe.to_csv(file_path, index=False)

def compress_file(file_path):
    with open(file_path, 'rb') as f_in:
        with gzip.open(file_path + '.gz', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    os.remove(file_path)

def download_data(table, **filters):
    try:
        data = nd.get_table(table, **filters)
        return table, data
    except Exception as e:
        print(f"Error downloading data for {table}: {str(e)}")
        return table, None

# 保存数据到数据库的函数
def save_to_db(dataframe, table_name, engine):
    try:
        dataframe.to_sql(table_name, engine, if_exists='replace', index=False)
        print(f"Data saved to {table_name} table successfully.")
    except SQLAlchemyError as e:
        print(f"Error saving {table_name}: {str(e)}")

# 下载并存储数据的函数
def download_and_store_data(table, filters, engine):
    table_name, data = download_data(table, **filters)
    if data is not None:
        table_name = table_name.replace('/', '_')  # 将 '/' 替换为 '_'
        save_to_db(data, table_name, engine)
    else:
        print(f"Failed to download data for table: {table}")

# 多线程下载和存储
def download_all_tables(tables_filters):
    start_time = time.time()  # 开始计时
    num_workers = len(tables_filters)
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        future_to_table = {executor.submit(download_and_store_data, table, filters, engine): table for table, filters in tables_filters.items()}
        for future in as_completed(future_to_table):
            table = future_to_table[future]
            try:
                future.result()
            except Exception as e:
                print(f"Error processing table {table}: {str(e)}")
    end_time = time.time()  # 结束计时
    print(f"Total time taken: {end_time - start_time:.2f} seconds")


def retrieve_data(query, engine):
    with engine.connect() as connection:
        result = pd.read_sql(query, connection)
    return result

In [11]:

tables_filters = {
    'QDL/ODA': {},
    'QDL/FON': {},
    'QDL/OPEC': {},
    'QDL/JODI': {},
    'QDL/BITFINEX': {},
    'QDL/BCHAIN': {},
    'QDL/LME': {},
    'ZILLOW/DATA': {},
    'WASDE/DATA': {},
    'WB/DATA': {}
}
# filters = {
#     # 'date': '2019-10-08', 
#     # 'contract_code': '967654'
# }
# data = download_data(table, **filters)

In [11]:
# data.head()

In [12]:
# save_to_db(data, 'QDL_FON')

In [12]:
download_all_tables(tables_filters)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/datatables/QDL/OPEC HTTP/1.1" 429 None
DEBUG:urllib3.connectionpool:https://data.nasdaq.co

Data saved to WASDE_DATA table successfully.


DEBUG:urllib3.connectionpool:Retry: /api/v3/datatables/ZILLOW/DATA
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/datatables/ZILLOW/DATA HTTP/1.1" 200 None
  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to QDL_JODI table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to QDL_LME table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to QDL_OPEC table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to WB_DATA table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to QDL_BCHAIN table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to QDL_ODA table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to ZILLOW_DATA table successfully.
Data saved to QDL_FON table successfully.


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


Data saved to QDL_BITFINEX table successfully.
Total time taken: 21.91 seconds


  dataframe.to_sql(table_name, engine, if_exists='replace', index=False)


In [13]:
query = "SELECT * FROM QDL_FON "
retrieved_data = retrieve_data(query, engine)
retrieved_data.head()

Unnamed: 0,contract_code,type,date,market_participation,producer_merchant_processor_user_longs,producer_merchant_processor_user_shorts,swap_dealer_longs,swap_dealer_shorts,swap_dealer_spreads,money_manager_longs,money_manager_shorts,money_manager_spreads,other_reportable_longs,other_reportable_shorts,other_reportable_spreads,total_reportable_longs,total_reportable_shorts,non_reportable_longs,non_reportable_shorts
0,967654,F_OTR_OI,2019-10-08,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,967654,F_OTR_OI,2019-10-01,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,967654,F_OTR_OI,2019-09-24,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,967654,F_OTR_OI,2019-08-27,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,967654,F_OTR_OI,2019-08-20,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [36]:
query = "SELECT * FROM ZILLOW_DATA"
retrieved_data = retrieve_data(query, engine)
retrieved_data.head()

Unnamed: 0,indicator_id,region_id,date,value
0,ZSFH,99999,2024-04-30,471998.338021
1,ZSFH,99999,2024-03-31,470671.863494
2,ZSFH,99999,2024-02-29,471144.364833
3,ZSFH,99999,2024-01-31,474834.034632
4,ZSFH,99999,2023-12-31,478670.136789


In [None]:
# TEST

In [27]:
def test_download_data():
    table = 'QDL/FON'
    filters = {'date': '2019-10-08', 'contract_code': '967654'}
    data = download_data(table, **filters)
    assert not data.empty, "Downloaded data is empty"

test_download_data()

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/datatables/QDL/FON?date=2019-10-08&contract_code=967654 HTTP/1.1" 200 None


In [28]:
def test_save_to_db():
    table = 'QDL/FON'
    filters = {'date': '2019-10-08', 'contract_code': '967654'}
    data = download_data(table, **filters)
    save_to_db(data, 'test_table')
    # Further assertions to verify the data saved to the database

test_save_to_db()

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/datatables/QDL/FON?date=2019-10-08&contract_code=967654 HTTP/1.1" 200 None


Data saved to test_table table successfully.


In [29]:
def test_retrieve_data():
    query = "SELECT * FROM test_table WHERE date='2019-10-08'"
    data = retrieve_data(query)
    assert not data.empty, "Retrieved data is empty"

test_retrieve_data()

In [27]:
# nd.Database('WIKI').bulk_download_to_file('/data')


In [26]:
# db = nd.Database('WIKI')
# db.database_code

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/databases/WIKI HTTP/1.1" 200 None


'WIKI'

In [21]:
# dt = nd.Datatable('ZACKS/FC')

In [24]:
# dt.data_fields()

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/datatables/ZACKS/FC/metadata HTTP/1.1" 200 None


['vendor_code',
 'datatable_code',
 'name',
 'description',
 'columns',
 'filters',
 'primary_key',
 'premium',
 'status',
 'data_version']

In [30]:
# database = nd.Database('WIKI')
# database.data_fields()


DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/databases/WIKI HTTP/1.1" 200 None


['id',
 'name',
 'database_code',
 'description',
 'datasets_count',
 'downloads',
 'premium',
 'image',
 'favorite',
 'url_name',
 'exclusive']

In [31]:
# databases = nd.Datatable('RATE')
# databases.values

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): data.nasdaq.com:443
DEBUG:urllib3.connectionpool:https://data.nasdaq.com:443 "GET /api/v3/databases HTTP/1.1" 410 None


DataLinkError: (Status 410) Something went wrong. Please try again. If you continue to have problems, please contact us at connect@data.nasdaq.com.