In [None]:
# Import 
import sys
import logging
import os
import time

from google.colab import drive

# setup.py -> 

# Constant values 
bucket_name = 'gcp_latam_twitter'
folder_name = 'raw'
zip_file_name = 'tweets.json.zip'
file_id = '1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis'  # ID de ejemplo del archivo de Google Drive
gcloud_url = f"gs://{bucket_name}/{folder_name}/"
start_time = str(time.time())

# Notebook time measure (consider using a timer library for better accuracy)
start_time = str(time.time())


# Local file paths (consider user input/environment variables for flexibility)
drive_mount_point = '/content/drive/MyDrive'
source_path = 'leonardora/de/latam-challenge/'

# Google Cloud project and dataset information (consider environment variables for better management)
project_id = "hip-rain-441704-n7"
project_name = "desafio-latam-leonardora"
dataset = "tweets_dataset"
table ="tweets"

# Logging
logging_level = str(logging.DEBUG)
logging.basicConfig(level=int(logging_level))

%load_ext autoreload
%autoreload 2

# test.py -> 
from google.colab import drive
import os
import logging
import subprocess

drive.mount('/content/drive', force_remount=True)

drive_mount_point = '/content/drive/MyDrive'
source_path = 'leonardora/de/latam-challenge/'
target_dir = os.path.join(drive_mount_point, source_path)

if not os.path.exists(target_dir):
    os.makedirs(target_dir)
    print(f"Directorio creado: {target_dir}")
else:
    print(f"Directorio existente: {target_dir}")

os.chdir(target_dir)
print(f"Directorio actual: {os.getcwd()}")

if os.path.exists(os.path.join(target_dir, ".git")):
    print("Repositorio ya existe. Haciendo pull de los últimos cambios...")
    !git checkout develop
    !git pull origin develop
else:
    repo_url = "https://github.com/leoengufmg/latam-challenge.git"  # Reemplaza con la URL de tu repositorio
    print("Clonando el repositorio...")
    !git clone {repo_url} .

    !git checkout develop

In [None]:
# Test ->
from google.colab import auth
auth.authenticate_user()  

In [None]:
# Function.py -> setup bucket
def create_bucket(bucket_name: str, project_id: str, location: str = "southamerica-west1") -> storage.Bucket:
    """Create a new bucket in Google Cloud Storage.

    Args:
        bucket_name (str): The unique name of the bucket to create.
        project_id (str): The ID of the Google Cloud project.
        location (str): The location where the bucket will be created (default is "southamerica-west1").

    Returns:
        storage.Bucket: The created bucket object.
    """
    # Inicializar el cliente de Google Cloud Storage con el ID de proyecto
    storage_client = storage.Client(project=project_id)
    
    # Crear el bucket
    try:
        bucket = storage_client.create_bucket(bucket_name, location=location)
        print(f"Bucket {bucket.name} created in {location}.")
        return bucket
    except Exception as e:
        print(f"Error creating bucket: {e}")
        raise

new_bucket = create_bucket(bucket_name, project_id)

In [None]:
# Functions.py -> 
import importlib.util
import subprocess
import sys

def install_requirements(requirements_path: str = "./requirements.txt") -> bool:
    """
    Installs libraries listed in the requirements file if they are not already installed.

    Args:
        requirements_path (str): Path to the requirements.txt file. Defaults to "./requirements.txt".

    Returns:
        bool: True if installation is successful or libraries are already installed, False if an error occurs.
    """
    try:
        with open(requirements_path, 'r') as file:
            requirements = [line.strip() for line in file if line.strip()]

        for requirement in requirements:
            print(f"Installing {requirement}...")
            subprocess.run([sys.executable, "-m", "pip", "install", requirement], check=True)
            print(f"{requirement} installed successfully.")

        print("All required libraries were installed successfully.")
        return True

    except subprocess.CalledProcessError as e:
        print(f"Error installing libraries: {e}")
        return False
    except FileNotFoundError:
        print(f"Requirements file not found at: {requirements_path}")
        return False
    except Exception as e:
        print(f"Unexpected error: {e}")
        return False

In [None]:
# test -> installing requirements
if install_requirements():
    print("Procediendo a descargar datos...")
else:
    print("Error al instalar los requisitos. Abortando acciones adicionales.")

In [None]:
# functions.py -> 
import io
import logging
from google.colab import auth
from google.colab import drive
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from google.cloud import storage
from typing import Any

logging.basicConfig(level=logging.INFO)

def authenticate_google_drive() -> None:
    """Authenticate the user with Google Drive."""
    auth.authenticate_user()

def mount_google_drive(mount_point: str = '/content/drive') -> None:
    """Mounts Google Drive to a specified mount point."""
    drive.mount(mount_point, force_remount=True)

def download_file_from_drive(drive_service: Any, file_id: str) -> io.BytesIO:
    """Downloads a file from Google Drive and returns it as a BytesIO object."""
    downloaded = io.BytesIO()
    try:
        request = drive_service.files().get_media(fileId=file_id)
        downloader = MediaIoBaseDownload(downloaded, request)
        
        done = False
        while not done:
            status, done = downloader.next_chunk()
            print(f'Downloading {int(status.progress() * 100)}%')
        
        downloaded.seek(0)
        return downloaded
    except Exception as e:
        logging.error(f"Error downloading file: {e}")
        raise

def upload_drive_file_to_cloud_storage(bucket: storage.Bucket, folder_name: str, file_data: io.BytesIO, file_name: str) -> storage.Blob:
    """Uploads a file to Google Cloud Storage."""
    blob = bucket.blob(f"{folder_name}/{file_name}")
    file_data.seek(0)  # Resetea el puntero del archivo
    blob.upload_from_file(file_data)
    return blob

downloaded: io.BytesIO = io.BytesIO()

try:
    authenticate_google_drive()
    mount_google_drive()
    drive_service: Any = build('drive', 'v3')

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    downloaded = download_file_from_drive(drive_service, file_id)

    if downloaded.getbuffer().nbytes == 0:
        logging.info("Skipping upload as the file is empty.")
    else:
        uploaded_blob = upload_drive_file_to_cloud_storage(bucket, folder_name, downloaded, zip_file_name)
        
        if uploaded_blob.content_type == 'application/zip':
            logging.info(f"File {zip_file_name} uploaded as a ZIP file.")

    logging.info("File transfer successful!")

except Exception as e:
    logging.error(f"An error occurred: {e}")

finally:
    downloaded.close()
    print("File transfer process completed.")