In [4]:
import os
import json
import requests
import logging
import io

import geopandas as gpd
import pandas as pd

from dotenv import load_dotenv
from requests.adapters import HTTPAdapter, Retry
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, Text, Date, Float
from geoalchemy2 import Geometry
from mappings import EVENT_CODES, EVENT_BASE_CODES, EVENT_ROOT_CODES, MAP_FIPS_TO_ISO2

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

load_dotenv()

class RequestHandler:
    """
    Reusable and extensible request handler. 
    
    Todo
    ----
    Provide more generalized functionality. I.E. the ability to retrieve the
    response object directly, as well as the ability to POST messages.
    """
    def __init__(self, 
                 total: int = 3, 
                 backoff: float = 0.1, 
                 redirect: int= 3) -> None:
        """RequestHandler init function.
        

        Parameters
        ----------
        total : int, optional
            Total number of times to retry the endpoint when `get_content` is 
            invoked. The default is 3.
        backoff : float, optional
            Delay added between consecutive requests in the event an endpoint 
            fails to respond with a status 200 message. The default is 0.1.
        redirect : int, optional
            Maximum number of redirects permitted before requests library fails. 
            The default is 3.

        Returns
        -------
        None

        """
        logger.info('Init request handler')
        self.sess = requests.Session()
        
        retries = Retry(
            total=total,
            backoff_factor=backoff,
            redirect=redirect
        )
        
        self.sess.mount('http://', HTTPAdapter(max_retries=retries))
        logger.info('Request handler initialized')
        
    def get_content(self, url: str) -> requests.Response:
        """Fetch response and return the content of the response object.
        

        Parameters
        ----------
        url : str
            Path to target endpoint.

        Returns
        -------
        TYPE
            DESCRIPTION.

        """
        
        resp = self.sess.get(url)
        
        if resp.status_code==200:
            return resp.content
        else:
            logger.info('Endpoint: {} returned status code: {}'.format(
                url, resp.status_code))      
        
TABLE_NAME = "threat_table"
Base = declarative_base()

class Threat(Base):
    """SQLalchemy base class for storing and retreiving objects.
    
    Todo
    ----
    Decide between Integer and Text for Event Codes.
    OPTIMIZATION Consider indexing for increased read performance.
    OPTIMIZATION String(n) not text to min. space requirements where possible
    """
    __tablename__ = TABLE_NAME
    GLOBALEVENTID = Column(Text, primary_key=True) #A
    SQLDATE = Column(Text, nullable=False) #B
    EventCode = Column(Text, nullable=False) #Z
    EventBaseCode = Column(Text, nullable=False) #AA
    EventRootCode = Column(Text, nullable=False) #AB
    ActionGeo_FullName = Column(Text, nullable=False) #BA
    ActionGeo_CountryCode = Column(Text, nullable=False) #BB
    ActionGeo_Lat = Column(Text, nullable=False) #BE
    ActionGeo_Long = Column(Text, nullable=False) #BF
    ActionGeo_Coords = Column(Geometry('POINT'), nullable=False) # BE + BF
    DATEADDED = Column(Text, nullable=False) #BH
    SOURCEURL = Column(Text, nullable=False) #BI
    
class DatabaseHandler:
    """
    Reusable and extensible handler for database connections. 
    
    Todo
    ----
    Extend fetch functionality to enable queries.
    Index database for faster searches
    OPTIMIZATION Implement connection pooling functionality for faster concurrent 
    read/writes.
    
    """
    def __init__(self) -> None:
        """DatabaseHandler init function.
        
        Instatiate connection to Postgres + PostGIS database.
        
        """
        logger.info('Init request handler')
        db_uri = "postgresql://{}:{}@{}:5432/{}".format(
            os.environ.get('DB_USER', 'default_user'), 
            os.environ.get('DB_PW', 'default_pass'), 
            os.environ.get('DB_HOST', 'localhost'), 
            os.environ.get('DB_NAME', 'test_db')
        )
        
        self.engine = create_engine(
            db_uri
        )
        
        self.conn = self.engine.connect() 
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.sess = Session()
        
        logger.info('Database handler initialized')

    def test(self):
        """Retrieve contents of database, count the results, and print a few rows.


        Returns
        -------
        None.

        """
        res = self.fetch_all()
        print('Num rows in db: {}\n'.format(len(res)))
        #logger.info('Num rows in db: {}\n'.format(len(res)))
        
        for idx, row in enumerate(res):
            if idx>= 50:
                break
            
            print(vars(row))
            #logger.info(row)
            
    def close(self) -> None:
        """Close the database connection.
        

        Returns
        -------
        None.

        """
        self.sess.close()
        self.conn.close()
        self.engine.dispose()
        logger.info('Database connection safely closed.')

    def fetch_all(self) -> list:
        """Return results matching all rows.
        

        Returns
        -------
        list
            list of rows retrieved from database.
            
        Todo
        ----
        Enable querying for particular data. This is sufficient for demo purposes.
        OPTIMIZATION Enable indexing

        """
        test_data = self.sess.query(Threat).all()
        return test_data
    
    def upload(self, gdf: gpd.GeoDataFrame()) -> None:
        """Populate table with the contents of `gdf`.
        

        Parameters
        ----------
        gdf : gpd.GeoDataFrame()
            Geodataframe containing points of interest within the United States
            as well as intel sources, dates, and event codings.

        Returns
        -------
        None
        
        Todo
        ----
        OPTIMIZATION Enable caching

        """
        gdf.to_postgis(
            TABLE_NAME, 
            self.engine, 
            if_exists='append', 
            index=False, 
            dtype={'ActionGeo_Coords': Geometry('POINT', srid=3857)}
        )  
        
        print('Adding {} rows'.format(len(gdf.index)))
        logger.info('GeoDataFrame successfully added to table.')
        
class DataHandler:
    """
    Custom handler for acquiring, prepping, and storing data from `tar_url` and
    associated export files.
    
    Todo
    ----
    
    
    """
    tar_url = 'http://data.gdeltproject.org/gdeltv2/lastupdate.txt'
    
    def __init__(self) -> None:
        """DataHandler init function.
        
        Instatiate DBHandler and RequestHandler classes.

        """
        logger.info('Init data handler')
        self.db = DatabaseHandler()
        self.reqs = RequestHandler()
        logger.info('Data handler initialized')
        
    def run(self) -> None:
        pass
        
    def _data_prep(self, df: pd.DataFrame) -> gpd.GeoDataFrame:
        """Perform transforms on received data to prepare it for storage.
        
        1. Creates a GeoDataFrame out of the data received from the target 
            endpoint. Uses Long and Lat to generate a point object containing
            coordinates.
        2. Retrieves and loads json file containing US counties and point 
            boundaries and adds this to a GeoDataFrame.
        3. Produces a final GeoDataFrame (`joined_df`) using sjoin. That 
            contains only values from the `global_df` that contain coordinates
            that exist within the bounds of one of the counties loaded into
            `records_df` in step 2.
        4. General cleanup.

        Parameters
        ----------
        df : pd.DataFrame
            Dataset retrieved from the most recent GDelta export.

        Returns
        -------
        joined_df : Geopandas GeoDataFrame
            Dataframe containing all values from the original df that have 
            coordinates within the United States.

        """
        logger.info('Prepping DataFrame with dimensions {}'.format(df.shape))
        global_gdf = gpd.GeoDataFrame(
            df, geometry=gpd.points_from_xy(df.ActionGeo_Long, df.ActionGeo_Lat),
            crs=3857
        )
        global_gdf['geometry'].set_crs(epsg=3857, inplace=True)
        logger.info('Confirming creation of new GeoDataFrame, shape {}'.format(
            global_gdf.shape))
        
        with open("./geojson-counties-fips.json", 'r') as j:
            records = json.loads(j.read())
            
        records_gdf =  gpd.GeoDataFrame.from_features(records['features'], 
                      columns=['geometry'])        
        records_gdf['geometry'].set_crs(epsg=3857, inplace=True)
        logger.info('Created fips-records GeoDataFrame successfully')
                                                              
        joined_df = gpd.sjoin(left_df=global_gdf, right_df=records_gdf, how='inner')
        #Map values using mappings
        
        joined_df.rename_geometry('ActionGeo_Coords', inplace=True)
        joined_df.set_geometry('ActionGeo_Coords', inplace=True)
        joined_df.drop('index_right', axis=1, inplace=True)
        
        logger.info('Stripped non-US datapoints from GeoDataFrame; shape {}.'.format(
            joined_df.shape))
        
        return joined_df
    
    
    def _process_latest(self, results: str) -> list:
        """Process results from `DataHandler.tar_url` to acquire the url of the
        most recent export.
        

        Parameters
        ----------
        results : str
            list of ids and urls associated with the most recent batch of GDelta 
            exports.

        Returns
        -------
        list
            list of urls retrieved from GDelta referencing the most recent batch
            of data.

        """
        res_list = results.split('\n')        
        return [line.split(' ')[-1] for line in res_list if len(line) > 0]
    
    def _extract(self, zipped: str) -> pd.DataFrame:
        """Load the content of the zipped export csv into a Pandas DataFrame.
        
        We use the columns listed in the spec sheet, ignoring the rest of the 
        data.

        Parameters
        ----------
        zipped : str
            Zipped file data as retrieved from the endpoint, converted from a 
            bytes string to a regular string for processing.

        Returns
        -------
        pandas.DataFrame
            Dataframe containing the desired columns from the exported data.

        """
        df = pd.read_csv(zipped, sep='\t', 
                 usecols=[0, 1, 26, 27, 28, 52, 53, 56, 57, 59, 60],
                 names=[
                     'GLOBALEVENTID',
                     'SQLDATE',
                     'EventCode',
                     'EventBaseCode',
                     'EventRootCode',
                     'ActionGeo_FullName',
                     'ActionGeo_CountryCode',
                     'ActionGeo_Lat',
                     'ActionGeo_Long',
                     'DATEADDED',
                     'SOURCEURL'
                     ],
                 converters={
                     'EventCode': self._event_conv, 
                     'EventBaseCode': self._eventbase_conv,
                     'EventRootCode': self._eventroot_conv,
                     'ActionGeo_CountryCode': self._fips_iso2_conv},
                 encoding='latin-1'
                 )
        return df
    
    def _event_conv(self, event_id: str) -> str:
        """Converts event codes to the mapped strings in `MAPPINGS[event_codes]`.

        Parameters
        ----------
        event_id : str
            Event id to be processed.

        Returns
        -------
        str
            Corresponding plaintext version of the event code provided if one
            exists, otherwise returns the original code.

        """
        eid = str(event_id)
        if eid in EVENT_CODES:
            return EVENT_CODES[eid]
        else:
            logger.info('Event code not found in mapping: {}'.format(eid))
            return eid
    
    def _eventbase_conv(self, event_id: int) -> str:
        """Converts event base codes to the mapped strings in `MAPPINGS[eventbase_codes]`.

        Parameters
        ----------
        event_id : str
            Event id to be processed.

        Returns
        -------
        str
            Corresponding plaintext version of the event code provided if one
            exists, otherwise returns the original code.

        """
        eid = str(event_id)
        if eid in EVENT_CODES:
            return EVENT_BASE_CODES[eid]
        else:
            logger.info('Event code not found in mapping: {}'.format(eid))
            return eid
    
    def _eventroot_conv(self, event_id: int) -> str:
        """Converts event root codes to the mapped strings in `MAPPINGS[event_codes]`.

        Parameters
        ----------
        event_id : str
            Event id to be processed.

        Returns
        -------
        str
            Corresponding plaintext version of the event code provided if one
            exists, otherwise returns the original code.

        """
        eid = str(event_id)
        if eid in EVENT_CODES:
            return EVENT_ROOT_CODES[eid]
        else:
            logger.info('Event code not found in mapping: {}'.format(eid))
            return eid
    
    def _fips_iso2_conv(self, fips_id: str) -> str:
        """Converts FIPS codes to ISO2 codes using `MAPPING[map_fips_to_iso2]`.

        Parameters
        ----------
        event_id : str
            FIPS id to be processed.

        Returns
        -------
        str
            Corresponding plaintext version of the FIPS code provided if one
            exists, otherwise returns the original code.

        """
        fid = str(fips_id)
        if fid in EVENT_CODES:
            return MAP_FIPS_TO_ISO2[fid]
        else:
            logger.info('FIPS code not found in mapping: {}'.format(fid))
            return fid

class RunHandler(DataHandler):
    """Custom runner use with the `DataHandler` class.    
    """
    
    def __init__(self) -> None:
        """RunHandler init function.
        
        Trigger parent classed initialization.

        """        
        DataHandler.__init__(self)    
    
    def run(self) -> None:
        results = str(self.reqs.get_content(DataHandler.tar_url), 'UTF-8')
        urls_list = self._process_latest(results)
        print(urls_list)
        
        '''
        Instead of hardcoding (i.e. url = url_list[0]) use list comprehension
        plus if statement to find the desired url. This protects against 
        failures if/when endpoint changes result in the returned list being in
        an unexpected order.
        '''
        url = [url for url in urls_list if 'export' in url][0]
        print(url)
        
        results = io.BytesIO(self.reqs.get_content(url))
        
        #Unpack zip
        df_content = self._extract(results)
        print(len(df_content.index))
        
        #Prep
        gdf = self._data_prep(df_content)
        
        #Load into db
        self.db.upload(gdf)
        self.db.test()
        self.db.close()
    
rh = RunHandler()
rh.run()

['http://data.gdeltproject.org/gdeltv2/20231110150000.export.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20231110150000.mentions.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20231110150000.gkg.csv.zip']
http://data.gdeltproject.org/gdeltv2/20231110150000.export.CSV.zip


ParserError: Too many columns specified: expected 11 and found 7