
# Gas Price Forecast


## 0.0. Requirements


### 0.1. Imports

In [0]:
import pandas as pd

from mlflow import pyfunc
from pytz import timezone
from datetime import datetime

from pyspark.sql import functions as pf




### 0.2. S3 Connection

In [0]:
# secrets is not available on Community =D
#access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
#secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
#bronze_bucket_name = dbutils.secrets.get(scope = "aws", key = "aws-bucket-bronze")
#silver_bucket_name = dbutils.secrets.get(scope = "aws", key = "aws-bucket-silver")

In [0]:
# encoded_secret_key = secret_key.replace("/", "%2F")
# aws_bucket_name = "example-aws-bucket"
# mount_name = "mount-name"
# 
# dbutils.fs.mount(
#     f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", 
#     f"/mnt/{mount_name}"
# )

In [0]:
#spark.sql("CREATE EXTERNAL LOCATION [IF NOT EXISTS] <example-location-name-aws> URL 's3://<bucket-name>/<example-path>'
#     WITH (CREDENTIAL <aws-credential-name>)")

In [0]:
# Another Way
# sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
# sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
# sc._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "3")
# sc._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
# sc._jsc.hadoopConfiguration().set("fs.s3a.disable.chunked.encoding", "True")
# sc._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
# sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.path.style.access", "True")
# sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


### 0.3. Data Load

In [0]:
df = spark.read.format("delta").load(f"s3://{silver_bucket_name}/gas")


## 1.0. Data Enrichment


### 1.1. Rename Columns

In [0]:
df = df.select([pf.col(k).alias(k[:-1].upper() if k.endswith("_") else k.upper()) for k in df.columns])


### 1.2. Filter Columns

In [0]:
df = df.drop(pf.col("DATA_BASE"))


### 1.3. Machine Learning Inference

In [0]:
model_uri = "runs:/f02169dcf3ec41f6ba59e3deec7818e9/models/sf_object"
loaded_model = pyfunc.load_model(model_uri)

# Get Forecasts
forecast = loaded_model.predict(10)
forecast = forecast.reset_index()

# Prepare Dataset
forecast = forecast.pivot(index="ds", columns="unique_id", values="CES")
forecast = forecast.reset_index()
forecast.columns = [k[:-1].upper() if k.endswith("_") else k.upper() for k in forecast.columns]
forecast.rename(columns={"DS": "DATE"}, inplace=True)

# Organize Columns
forecast = forecast[df.columns]

# Create Spark DataFrame
forecast = spark.createDataFrame(forecast)
forecast = forecast.withColumn("DATE", pf.to_date(pf.col("DATE")))

  original_result = python_builtin_import(name, globals, locals, fromlist, level)



### 1.4. Union DataFrames

In [0]:
df = df.unionAll(forecast)


## 2.0. Save to Gold

In [0]:
df.write.format("delta").mode("overwrite").save(f"s3://{gold_bucket_name}/gas_complete_forecast")