In [1]:
import os
from minio import Minio
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from functions import get_url_content, get_links_by_extension, unzip_upload_to_minio, update_log

In [2]:
load_dotenv()

MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")
URL_WILDFIRES_BATCH = os.getenv("URL_WILDFIRES_BATCH")

bucket_name = 'raw'
bucket_path_batch = 'wildfires/batch'
bucket_path_log = 'wildfires/log.json'

In [3]:
minio_client = Minio(
    MINIO_ENDPOINT, 
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY ,
    secure=False 
)

In [4]:
response_wildfires_batch = get_url_content(URL_WILDFIRES_BATCH)
links_wildfires_batch = get_links_by_extension(URL_WILDFIRES_BATCH, response_wildfires_batch, '.zip')

In [5]:
for link in links_wildfires_batch:
    link_content = get_url_content(link)
    unzip_upload_to_minio(link_content, minio_client, bucket_name, bucket_path_batch)  

In [6]:
spark = (
    SparkSession.builder
        .appName("ReadCSVMinio")
        .config("spark.hadoop.fs.s3a.endpoint", f"http://{MINIO_ENDPOINT}")
        .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
        .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .getOrCreate()
)

In [7]:
df_wildfires_batch = spark.read.csv(f"s3a://{bucket_name}//{bucket_path_batch}/*", header=True, inferSchema=True)

In [8]:
update_log(df_wildfires_batch, 'data_pas', minio_client, bucket_name, bucket_path_log)