In [0]:
import pandas as pd
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient
from io import StringIO

connection_string = "ADL CONNECTION STRING"
container_name = "gold-forecasting-container"
raw_folder = "raw-data"
transformed_folder = "transformed-data"

def get_current_month_file():
    now = datetime.now()
    return f"{now.year}{now.month:02d}.csv"

def get_transformed_file_name():
    now = datetime.now()
    return f"{now.year}{now.month:02d}_transformed.csv"

def download_blob_to_dataframe(blob_service_client, container_name, blob_name):
    """Download a blob and load it into a Pandas DataFrame."""
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    if blob_client.exists():
        blob_data = blob_client.download_blob().readall()
        return pd.read_csv(StringIO(blob_data.decode('utf-8')))
    else:
        raise FileNotFoundError(f"The file {blob_name} does not exist in the container {container_name}.")

def save_dataframe_to_blob(blob_service_client, container_name, blob_name, dataframe):
    """Save a Pandas DataFrame to Azure Blob Storage as a CSV file."""
    csv_buffer = StringIO()
    dataframe.to_csv(csv_buffer, index=False)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    blob_client.upload_blob(csv_buffer.getvalue(), overwrite=True)

def transform_data(dataframe):
    """Transforming the raw dataset: ensure no missing dates, interpolate prices, and remove duplicates."""
    transformed_df = dataframe[["date", "price"]].copy()

    transformed_df["price"] = pd.to_numeric(transformed_df["price"], errors="coerce")

    transformed_df["date"] = pd.to_datetime(transformed_df["date"], errors="coerce").dt.date

    if transformed_df["date"].isnull().any():
        start_date = transformed_df["date"].min() or datetime(datetime.now().year, datetime.now().month, 1).date()
        end_date = transformed_df["date"].max() or (start_date + timedelta(days=30))
        complete_dates = pd.date_range(start=start_date, end=end_date, freq="D").date
        transformed_df = transformed_df.set_index("date").reindex(complete_dates).reset_index()
        transformed_df.rename(columns={"index": "date"}, inplace=True)

    transformed_df["price"] = transformed_df["price"].interpolate(method="linear")

    transformed_df = transformed_df.drop_duplicates()

    return transformed_df

def main():
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)

    raw_file_name = get_current_month_file()
    transformed_file_name = get_transformed_file_name()

    try:
        print(f"Downloading raw data file: {raw_file_name}")
        raw_data_df = download_blob_to_dataframe(blob_service_client, container_name, f"{raw_folder}/{raw_file_name}")

        print("Transforming data...")
        transformed_data_df = transform_data(raw_data_df)

        print(f"Uploading transformed data file: {transformed_file_name}")
        save_dataframe_to_blob(blob_service_client, container_name, f"{transformed_folder}/{transformed_file_name}", transformed_data_df)
        print("Transformation and upload completed successfully.")

    except FileNotFoundError as e:
        print(e)
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()


Downloading raw data file: 202412.csv
Transforming data...
Uploading transformed data file: 202412_transformed.csv
Transformation and upload completed successfully.
