<a href="https://colab.research.google.com/github/mchl-schrdng/Cube_Guardian/blob/main/export_file.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install google-cloud-bigquery google-cloud-storage paramiko pandas

In [None]:
config_data = """
{
  "BIGQUERY_PROJECT": "redeemer-420917",
  "BIGQUERY_DATASET": "export_to_ftp",
  "TABLE_NAME": "data_to_export",
  "DATE_COLUMN": "export_date",
  "GCS_PROJECT": "rational-symbol-411613",
  "GCS_BUCKET": "export_to_gcs",
  "BASE_GCS_PATH": "data/",
  "UPLOAD_FOLDER": "uploads/",
  "ERROR_FOLDER": "errors/",
  "FILENAME_PREFIX": "daily_data_export",
  "FILE_EXTENSION": ".csv",
  "SFTP_DIRECTORY": "path/to/sftp/upload/"
}
"""
with open('config.json', 'w') as f:
    f.write(config_data.strip())

In [None]:
import os
os.environ['SFTP_HOST'] = 'your_sftp_host'
os.environ['SFTP_USER'] = 'your_sftp_username'
os.environ['SFTP_PASSWORD'] = 'your_sftp_password'

In [None]:
import os
import json
import logging
import paramiko
from google.cloud import bigquery, storage
from datetime import datetime
from io import StringIO
import pandas as pd

# Configure detailed logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_config():
    with open('config.json', 'r') as file:
        return json.load(file)

def authenticate_google_clients():
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/sa.json'
    return bigquery.Client(), storage.Client()

def fetch_data_from_bigquery(config, bigquery_client):
    query = f"SELECT * FROM `{config['BIGQUERY_PROJECT']}.{config['BIGQUERY_DATASET']}.{config['TABLE_NAME']}` WHERE DATE({config['DATE_COLUMN']}) = CURRENT_DATE()"
    try:
        df = bigquery_client.query(query).to_dataframe()
        logging.info("Data fetched successfully from BigQuery.")
        return df
    except bigquery.exceptions.BigQueryError as e:
        logging.error(f"BigQuery error: {e.message}")
        return pd.DataFrame()

def upload_file_to_gcs(config, storage_client, df):
    if df.empty:
        logging.error("DataFrame is empty. No data to upload.")
        return None
    current_date = datetime.now().strftime("%Y-%m-%d")
    filename = f"{config['FILENAME_PREFIX']}_{current_date}{config['FILE_EXTENSION']}"
    file_path = os.path.join(config['BASE_GCS_PATH'], config['UPLOAD_FOLDER'], filename)
    try:
        blob = storage_client.bucket(config['GCS_BUCKET']).blob(file_path)
        blob.upload_from_string(df.to_csv(index=False), content_type='text/csv')
        logging.info(f"File uploaded to GCS at {file_path}")
        return filename
    except storage.exceptions.GoogleCloudError as e:
        logging.error(f"Failed to upload file to GCS: {e}")
        return None

# def transfer_file_to_sftp(config, storage_client, filename):
#     if not filename:
#         logging.error("No filename provided for SFTP transfer.")
#         return
#     remote_path = os.path.join(config['SFTP_DIRECTORY'], filename)
#     try:
#         with paramiko.Transport((os.getenv('SFTP_HOST'), 22)) as transport:
#             transport.connect(username=os.getenv('SFTP_USER'), password=os.getenv('SFTP_PASSWORD'))
#             with paramiko.SFTPClient.from_transport(transport) as sftp:
#                 with StringIO(storage_client.bucket(config['GCS_BUCKET']).blob(os.path.join(config['UPLOAD_FOLDER'], filename)).download_as_string().decode('utf-8')) as file_stream:
#                     sftp.putfo(file_stream, remote_path)
#                 logging.info(f"File transferred to SFTP at {remote_path}")
#     except paramiko.SSHException as e:
#         logging.error(f"SFTP transfer failed: {e}")

def main():
    config = load_config()
    bigquery_client, storage_client = authenticate_google_clients()
    df = fetch_data_from_bigquery(config, bigquery_client)
    filename = upload_file_to_gcs(config, storage_client, df)
    # transfer_file_to_sftp(config, storage_client, filename)

if __name__ == "__main__":
    main()