***GENERATED CODE FOR TimeSeriesIntegrationtabTest PIPELINE***

**CONNECTOR FUNCTIONS TO READ DATA FROM DATABRICKS FILESYSTEM**

In [None]:
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class DBFSConnector:

    def fetch(inStages, inStagesData, stageId, spark, config):
        df = spark.read.\
            options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                    inferschema='true',
                    delimiter=eval(config)["delimiter"])\
            .csv(eval(config)['url'])
        display(df.limit(2).toPandas())
        return df

    def put(inStages, inStagesData, stageId, spark, config):
        return inStagesData.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                                        delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


**TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA**

In [None]:
from core.NamingNPathAccord import NumtraNamingNPathAccord
import pandas as pd
import json
import os
from os import listdir
from pyspark.sql.functions import col, when
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def replaceByMean(self, feature, df, mean_=-1):

        meanValue = df.select(mean(col(feature.name)).alias(
            'mean')).collect()[0]["mean"]
        df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                                         meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        maxValue = df.select(max(col(feature.name)).alias('max')).collect()[
            0]["max"]
        df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        minValue = df.select(min(col(feature.name)).alias('min')).collect()[
            0]["min"]
        df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        stddevValue = df.select(stddev(col(feature.name)).alias(
            'stddev')).collect()[0]["stddev"]
        df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        fillValue = df.where(col(feature.name).isNotNull()
                             ).head(1)[0][feature.name]
        df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


Forecasting_FE_Methods = {

}


class TransformationTimeSeriesForecastingMain:
    # TODO: change df argument in run with following
    def run(inStages, inStagesData, stageId, spark, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]['featureList']
        ForecastFE = configObj["FE"]
        transformationDF = inStagesData[inStages[0]]
        featuresSelectedList = [ForecastFE['features']
                                ['timecolumn'], ForecastFE['features']['tocompare']]
        transformedDF = transformationDF.select(
            [c for c in transformationDF.columns if c in featuresSelectedList])
        transformedDF = CleanseData().replaceNullValues(featureData, transformedDF)
        for transformation in featureData:
            feature = transformation["feature"]
            if feature in featuresSelectedList:
                if transformation["transformation"] != '' and transformation["selected"].lower() == "true" and not(feature.__contains__("_transform")):
                    transformedDF = Feature_Transformations_Methods["%s" % transformation["transformation"]](
                        transformedDF, transformation)

        df = getPandasDF(transformedDF, stageId,
                         ForecastFE['features']['timecolumn'])
        if "statFunction" in ForecastFE:
            statFunction = ForecastFE['statFunction']
            if 'Original' in statFunction["function"]:
                pass
            elif "parameter" in statFunction:
                df = Forecasting_FE_Methods["%s" % statFunction["function"]](
                    df, statFunction['parameter'])
            else:
                df = Forecasting_FE_Methods["%s" %
                                            statFunction["function"]](df)

        return df


def getPandasDF(transformedDF, stageID, timeFeature):
    filepath = NumtraNamingNPathAccord().getExtraFilePath(stageID)
    transformedDF.repartition(1).write.csv(
        path=(filepath), mode="append", header="true")
    CSV_Files = [file for file in listdir(
        NumtraNamingNPathAccord().attachDBFSToPath(filepath)) if file.endswith('.csv')]
    df = pd.read_csv(NumtraNamingNPathAccord().attachDBFSToPath(
        filepath) + "/" + CSV_Files[0], delimiter=',', index_col=[timeFeature], encoding='utf-8')
    if os.path.exists(NumtraNamingNPathAccord().attachDBFSToPath(filepath) + "/" + CSV_Files[0]):
        os.remove(NumtraNamingNPathAccord().attachDBFSToPath(
            filepath) + "/" + CSV_Files[0])
    return df


**AUTOML FUNCTIONS**

In [None]:
from fbprophet import Prophet


def driverProphet(df):
    m = Prophet()
    m.fit(df)
    future = m.make_future_dataframe(periods=365)
    forecast = m.predict(future)
    forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()
    m.plot(forecast)


return forecast


**READING DATAFRAME**

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

try: 
	TimeSeriesIntegrationtabTest_DBFS = DBFSConnector.fetch([], {}, "5ea6d4ab77181bc3667b9aca", spark, "{'url': '/Demo/example_wp_log_peyton_manning.csv', 'file_type': 'Delimeted', 'delimiter': ',', 'is_header': 'Use Header Line'}")

except Exception as ex: 
	logging.error(ex)


**TRANSFORMING DATAFRAME**

In [None]:
try: 
	TimeSeriesIntegrationtabTest_FeatureForecast = TranformationsFeatureForecasting.TramformationTimeSeriesForecastingMain.run(["5ea6d4ab77181bc3667b9aca"],{"5ea6d4ab77181bc3667b9aca": TimeSeriesIntegrationtabTest_DBFS}, "5ea6d4ab77181bc3667b9acb", spark,json.dumps( {"FE": {"functionList": [{"function": "log"}, {"function": "difference"}, {"function": "Original"}], "featureList": [{"transformationsData": {}, "feature": "Date", "type": "date", "selected": "True", "replaceby": "random", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "0"}, "transformation": ""}, {"transformationsData": {}, "feature": "WebPageViews", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "567", "mean": "8.16", "stddev": "0.85", "min": "6.79122146272619", "max": "12.846746888829", "missing": "0"}, "transformation": ""}], "originalfile": "/Demo/example_wp_log_peyton_manning.csv", "features": {"timecolumn": "Date", "tocompare": "WebPageViews"}, "dataPercentage": "100", "statFunction": {"function": "Original", "parameter": ""}}}))

except Exception as ex: 
	logging.error(ex)


**TRAIN MODEL**

In [None]:
try: 
	Timeseriesforecastml = driverProphet([TimeSeriesIntegrationtabTest_FeatureForecast])

except Exception as ex: 
	logging.error(ex)
