In [25]:
from datetime import datetime, timedelta
import pandas as pd
import os
from dotenv import load_dotenv
import yaml

from client import hit_nasa_api
from adapter import parse_response
from connector.cassandra_connector import get_session, create_and_set_keyspace, create_table, save_dataframe_to_cassandra, get_min_max_dates


load_dotenv()
with open(os.getenv("CONF_PATH"), 'r') as file:
    conf = yaml.load(file, Loader=yaml.FullLoader)
session = get_session()
create_and_set_keyspace(session, conf['cassandra_keyspace_name'])
create_table(session, conf['cassandra_table_name'])

date_offset, _ = get_min_max_dates(session)

def clear_buffer():
    file_path = conf['catchup_buffer_path']
    try:
        os.remove(file_path)
        print(f"File '{file_path}' has been deleted.")
    except FileNotFoundError:
        print(f"File '{file_path}' not found.")
    except Exception as e:
        print(f"An error occurred: {e}")


def get_payload(date_offset, conf):
    if(date_offset < datetime.strptime(conf['catch_upto'], "%Y-%m-%d")):
        raise Exception("CAUGHT UP!!, [you can disable the dag now]")
    
    end_date = date_offset.strftime('%Y-%m-%d')
    start_date = (date_offset - timedelta(days=6)).strftime('%Y-%m-%d')
    date_offset = date_offset - timedelta(days=6)
    # print(start_date, end_date)

    try:
        df = hit_nasa_api(start_date, end_date)
        df.to_csv(conf['catchup_buffer_path'], index=False)
        return
    except Exception as e:
        raise e

def preprocess_payload():
    try:
        df = pd.read_csv(conf['catchup_buffer_path'], header=0)
        df = parse_response(df)
        # adapter takes care of most of the parsing and all, while more can be added in this task later on
        clear_buffer()
        df.to_csv(conf['catchup_buffer_path'], index=False)
    except Exception as e:
        raise e

def dump_payload():
    try:
        df = pd.read_csv(conf['catchup_buffer_path'])
        save_dataframe_to_cassandra(session, df, conf['cassandra_table_name'])
        clear_buffer()
    except Exception as e:
        raise e


if True:
    get_payload(date_offset, conf)
    preprocess_payload()
    # dump_payload()


ValueError: dictionary update sequence element #0 has length 1; 2 is required

In [2]:
df = pd.read_csv(conf['catchup_buffer_path'])

In [24]:

list(df['links'])

["{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3015695?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3263235?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3409960?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3463371?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3529625?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3744853?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3763547?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3801936?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://api.nasa.gov/neo/rest/v1/neo/3837640?api_key=sgxVzybGnW1my1DO9j7xqgL1OQGYcwctKOrn8Xg9'}",
 "{'self': 'http://

In [22]:
df['link'] = df['links'].map(lambda x: dict(x)['links'])

ValueError: dictionary update sequence element #0 has length 1; 2 is required

In [19]:
df

Unnamed: 0,links,id,neo_reference_id,name,nasa_jpl_url,absolute_magnitude_h,estimated_diameter,is_potentially_hazardous_asteroid,close_approach_data,is_sentry_object,date_system_recorded
0,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,3015695,3015695,(1998 WB2),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,21.850,{'kilometers': {'estimated_diameter_min': 0.11...,True,"[{'close_approach_date': '2023-12-03', 'close_...",False,2023-12-03
1,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,3263235,3263235,(2004 XO),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,21.620,{'kilometers': {'estimated_diameter_min': 0.12...,True,"[{'close_approach_date': '2023-12-03', 'close_...",False,2023-12-03
2,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,3409960,3409960,(2008 HA38),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,20.300,{'kilometers': {'estimated_diameter_min': 0.23...,False,"[{'close_approach_date': '2023-12-03', 'close_...",False,2023-12-03
3,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,3463371,3463371,(2009 MW),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,20.890,{'kilometers': {'estimated_diameter_min': 0.17...,False,"[{'close_approach_date': '2023-12-03', 'close_...",False,2023-12-03
4,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,3529625,3529625,(2010 LM68),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,20.740,{'kilometers': {'estimated_diameter_min': 0.18...,False,"[{'close_approach_date': '2023-12-03', 'close_...",False,2023-12-03
...,...,...,...,...,...,...,...,...,...,...,...
148,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,54415874,54415874,(2023 XD8),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,24.644,{'kilometers': {'estimated_diameter_min': 0.03...,False,"[{'close_approach_date': '2023-12-09', 'close_...",False,2023-12-09
149,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,54415881,54415881,(2023 XB10),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,23.029,{'kilometers': {'estimated_diameter_min': 0.06...,False,"[{'close_approach_date': '2023-12-09', 'close_...",False,2023-12-09
150,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,54416580,54416580,(2023 XY10),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,21.405,{'kilometers': {'estimated_diameter_min': 0.13...,True,"[{'close_approach_date': '2023-12-09', 'close_...",False,2023-12-09
151,{'self': 'http://api.nasa.gov/neo/rest/v1/neo/...,54416718,54416718,(2023 XO12),https://ssd.jpl.nasa.gov/tools/sbdb_lookup.htm...,23.742,{'kilometers': {'estimated_diameter_min': 0.04...,False,"[{'close_approach_date': '2023-12-09', 'close_...",False,2023-12-09
