In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [2]:
import os
import numpy as np
import pandas as pd
from datetime import date
from prophet import Prophet, serialize
from prophet.diagnostics import cross_validation, performance_metrics
from pyspark.sql.functions import to_date
import mlflow

  from .autonotebook import tqdm as notebook_tqdm
Importing plotly failed. Interactive plots will not work.


In [3]:
import cml.data_v1 as cmldata
import pyspark.pandas as ps



In [4]:
USERNAME = os.environ["PROJECT_OWNER"]
CONNECTION_NAME = "telefonicabr-az-dl"
DATE = date.today()
EXPERIMENT_NAME = "prophet-forecast-{0}".format(USERNAME)

In [49]:
mlflow.set_experiment(EXPERIMENT_NAME)

conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

Spark Application Id:spark-application-1729898469089


In [87]:
# ICEBERG SILVER TABLE
df = spark.sql("SELECT dt_prmr_atcv_lnha, GROSSVALUE FROM SPARK_CATALOG.TELCO_MEDALLION.PRODUCTS_SILVER")
#df.count()
df = df.withColumnRenamed('dt_prmr_atcv_lnha', 'ds')
df = df.withColumnRenamed('GROSSVALUE', 'y')
# Convert the timestamp column to a date column
df = df.withColumn("ds", to_date("ds"))
df.printSchema()

root
 |-- ds: date (nullable = true)
 |-- y: integer (nullable = true)



In [88]:
from pyspark.sql.functions import col, to_date, sum

# Filter the DataFrame
df = df.filter(col("ds") > "2022-12-31")

# Get total sum of Value by Day across all companies:
df = df.groupby("ds").agg(sum("y").alias("y"))

In [89]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = df.toPandas()

                                                                                

In [90]:
#df.head()

In [91]:
df.sort_values(by='ds', ascending=False, inplace=True)

In [92]:
df.head()

Unnamed: 0,ds,y
42,2023-12-31,582944
67,2023-12-29,281200
125,2023-12-28,22280
194,2023-12-26,277560
145,2023-12-25,302560


In [93]:
def extract_params(pr_model):
    params = {attr: getattr(pr_model, attr) for attr in serialize.SIMPLE_ATTRIBUTES}
    return {k: v for k, v in params.items() if isinstance(v, (int, float, str, bool))}

In [97]:
mlflow.set_experiment("prophet-forecast")
with mlflow.start_run():
    model = Prophet().fit(df)
    params = extract_params(model)

    metrics_raw = cross_validation(
        model=model,
        horizon="30 days",
        period="90 days",
        initial="60 days",
        parallel="threads",
        disable_tqdm=True,
    )

    cv_metrics = performance_metrics(metrics_raw)
    metrics = cv_metrics.drop(columns=["horizon"]).mean().to_dict()

    # The training data can be retrieved from the fit model for convenience
    train = model.history

    model_info = mlflow.prophet.log_model(
        model, artifact_path="prophet_model", input_example=train[["ds"]].head(10)
    )
    mlflow.log_params(params)
    mlflow.log_metrics(metrics)

INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
DEBUG:cmdstanpy:input tempfile: /tmp/tmpfijowygc/f4c2m2bo.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpfijowygc/phtxjk7w.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/home/cdsw/.local/lib/python3.10/site-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=64646', 'data', 'file=/tmp/tmpfijowygc/f4c2m2bo.json', 'init=/tmp/tmpfijowygc/phtxjk7w.json', 'output', 'file=/tmp/tmpfijowygc/prophet_modelkdat6oms/prophet_model-20241025235206.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
23:52:06 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
23:52:06 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing
INFO:prophet:Making 4 forecasts w