In [None]:
import os
import duckdb
import datetime
import shutil

from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from dotenv import load_dotenv

In [None]:
class GDriveFolder:
    def __init__(self, credentials, raw_folder_items, processed_folder_items, folder_id):
        self.credentials = credentials
        self.raw_folder_items = raw_folder_items
        self.processed_folder_items = processed_folder_items
        self.folder_id = folder_id
        self.SCOPES = ['https://www.googleapis.com/auth/drive']
        self.service = self._authentication()
        self.new_files = self._download_files_gdrive()

    def _authentication(self):
        try:
            creds = None
            scopes = self.SCOPES
            token_path = os.path.join(self.credentials,'token.json')

            if os.path.exists(token_path):
                creds = Credentials.from_authorized_user_file(token_path)

            if not creds or not creds.valid:
                if creds and creds.expired and creds.refresh_token:
                    creds.refresh(Request())

                else:
                    flow = InstalledAppFlow.from_client_secrets_file(
                        token_path, scopes)
                    creds = flow.run_local_server(port=0)

                with open(token_path, 'w') as token:
                    token.write(creds.to_json())

            return build('drive', 'v3', credentials=creds)
        
        except Exception as e:
            print('Error authenticating: ', e)
            return None
        
    def _download_files_gdrive(self):
        if not self.service:
            print('Error authenticating.')
            return
        
        try:
            service = self.service
            folder_id = self.folder_id
            new_files = False

            results = service.files().list(
                q=f"'{folder_id}' in parents",
                fields="files(id, name)"
            ).execute()

            items = results.get('files', [])

            if not items:
                print('No files found.')

            else:
                for item in items:
                    file_id = item['id']
                    file_name = item['name']
                    raw_folder_items = os.path.join(self.raw_folder_items, file_name)
                    processed_folder_items = os.path.join(self.processed_folder_items, file_name)
                    
                    if os.path.exists(raw_folder_items) or os.path.exists(processed_folder_items):
                        pass
                    else:
                        request = service.files().get_media(fileId=file_id)
                        with open(raw_folder_items, 'wb') as f:
                            f.write(request.execute())
                        new_files = True
                            
            return new_files

        except HttpError as e:
            print("An HTTP error occurred during the download:", e)

        except OSError as e:
            print("A system error occurred during the download:", e)
            
        except Exception as e:
            print("An error occurred during the download:", e)

In [None]:
class ProcessingPersistence:
    def __init__(self, raw_folder_items, processed_folder_items, db_plugin, db_name, db_user, db_pass, db_host, db_port, db_query):
        self.raw_folder_items = raw_folder_items
        self.processed_folder_items = processed_folder_items
        self.db_plugin = db_plugin
        self.db_name = db_name
        self.db_user = db_user
        self.db_pass = db_pass
        self.db_host = db_host
        self.db_port = db_port
        self.db_query = db_query
        self.date_now = datetime.datetime.now()
        self.processing_persistence()

    def processing_persistence(self):
        try:
            conn = duckdb.connect(database=':memory:')
            conn.execute(f"INSTALL {self.db_plugin}")
            conn.execute(f"LOAD {self.db_plugin}")
            conn.execute(f"ATTACH 'dbname={self.db_name} user={self.db_user} password={self.db_pass} host={self.db_host} port={self.db_port}' AS db (TYPE {self.db_plugin})")
            conn.execute(self.db_query)

            raw_folder_items = self.raw_folder_items
            processed_folder_items = self.processed_folder_items
            date_now = self.date_now

            for filename in os.listdir(raw_folder_items):
                if filename.endswith(".csv"):
                    conn.execute(f"INSERT INTO temp_table SELECT A.*, '{filename}' NOME_ARQUIVO, '{date_now}' DT_CADASTRO FROM read_csv('{os.path.join(raw_folder_items,filename)}', delim = ';', header = true) A")
                    shutil.move(os.path.join(raw_folder_items,filename), os.path.join(processed_folder_items,filename))

            conn.execute("INSERT INTO db.duckdb_ibge SELECT * FROM temp_table")

        except Exception as e:
            print("An error occurred during processing:", e)

        finally:
            if 'conn' in locals():
                    conn.close()     

In [None]:
def main():
    # GDriveFolder class configs
    credentials = 'C://Tecnology//Projects//dpp-duckdb-processing-persistence//config//credentials'
    raw_folder_items = "C://Tecnology//Projects//dpp-duckdb-processing-persistence//data//raw"
    processed_folder_items = "C:/Tecnology//Projects//dpp-duckdb-processing-persistence//data//processed"
    dotenv_path = 'C://Tecnology//Projects//dpp-duckdb-processing-persistence//config//.env'
    load_dotenv(dotenv_path)
    folder_id=os.getenv("FOLDER_ID")

    # GdriveFolder class
    gdrive = GDriveFolder(credentials, raw_folder_items, processed_folder_items, folder_id)

    if gdrive.new_files:
        # ProcessingPersistence class configs
        db_plugin = 'postgres'
        db_name = 'unifor_duckdb'
        db_user = 'unifor'
        db_pass = 'unifor'
        db_host = 'localhost'
        db_port = '5437'
        db_query = 'CREATE TEMPORARY TABLE temp_table (COD_UF VARCHAR, COD_MUN VARCHAR, COD_ESPECIE VARCHAR, LATITUDE VARCHAR, LONGITUDE VARCHAR, NV_GEO_COORD VARCHAR, NOME_ARQUIVO VARCHAR, DT_CADASTRO DATETIME)'
        
        # ProcessingPersistence class
        data_processing = ProcessingPersistence(raw_folder_items, processed_folder_items, db_plugin, db_name, db_user, db_pass, db_host, db_port, db_query)

In [None]:
if __name__ == '__main__':
    main()