In [None]:
gis = GIS('https://ndhagsb01.esri.com/portal', 
          'admin', 
          'esri.agp2', 
          profile="your_enterprise_portal", verify_cert=False)

In [None]:
arcgis.geoanalytics.is_supported()

In [None]:
bigdata_datastore_manager = arcgis.geoanalytics.get_datastores()
bigdata_datastore_manager

In [None]:
data_item = bigdata_datastore_manager.add_bigdata("air_quality_2019", r"\\DELDEVD014\store")

In [None]:
bigdata_fileshares = bigdata_datastore_manager.search()
bigdata_fileshares

In [None]:
search_result = gis.content.search("", item_type = "big data file share", max_items=40)
search_result

In [None]:
air_item = search_result[-1]

In [None]:
air_lyr = air_item.layers[0]

In [None]:
description = describe_dataset(input_layer=air_lyr,
                               extent_output=True,
                               sample_size=1000,
                               output_name="Description of air quality 2019 data" + str(dt.now().microsecond),
                               return_tuple=True)

In [None]:
description.output_json

In [None]:
df = description.sample_layer.query().sdf

In [None]:
def data_processsing():
    from datetime import datetime as dt
    import pyspark.sql.functions as F
    from pyspark.sql.functions import concat, col, lit
    # Load the big data file share layer into a DataFrame.
    df = layers[0]
    cols = ['Site Num', 'County Code', 'State Code', 'Date Local', 'Time Local', 'Parameter Name', 'Sample Measurement']
    df = df.select(cols)
    df = df.withColumn('Site_Num', F.lpad(df['Site Num'], 4, '0'))
    df = df.withColumn('County_Code', F.lpad(df['County Code'], 3, '0'))
    df = df.withColumn('State_Code', F.lpad(df['State Code'], 2, '0'))
    df = df.withColumn('unique_id', F.concat(F.col('State_Code'), F.col('County_Code'), F.col('Site_Num')))
#     drop_cols = ['Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code']
    df = df.drop('Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code')
    df = df.withColumn('datetime', concat(col("Date Local"), lit(" "), col("Time Local")))
#     drop_cols = ['Time Local', 'Date Local']
    df = df.drop('Time Local', 'Date Local')
    df = df.where(col("unique_id") == df.first().unique_id)
    # group the dataframe by TextType field and count the number of calls for each call type. 
    df = df.groupby(df['datetime'], df['unique_id']).pivot("Parameter Name").avg("Sample Measurement")

    df.write.format("webgis").save("timeseries_data" + str(dt.now().microsecond))

In [None]:
run_python_script(code=data_processsing, layers=[air_lyr])

In [None]:
def predict():
    import os
    from pyspark.ml.regression import RandomForestRegressor
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.feature import VectorAssembler

    # The training dataset is a feature service with the revenue and demographics of current customer areas.
    data = layers[0]
    data = data.filter(data.PM2_5___Local_Conditions.isNotNull())

    # Combine explanatory columns into a single column called "features"
    assembler = VectorAssembler(inputCols=['Outdoor_Temperature', 
                                           'Relative_Humidity', 
                                           'Wind_Direction___Resultant', 
                                           'Wind_Speed___Resultant', 
                                           'PM10_Total_0_10um_STP'], 
                                outputCol='features')
    data = assembler.setHandleInvalid("skip").transform(data)

    # Split the dataset to keep 10% for model validation
    (trainingData, testData) = data.randomSplit([0.9, 0.1])

    # Create the Random Forest model and fit it using the training data
    rf = RandomForestRegressor(featuresCol="features", labelCol="PM2_5___Local_Conditions", numTrees=100, seed=14389)
    model = rf.fit(trainingData)

    # Apply the model to the test data removed earlier for validation
    predictions = model.transform(testData)

    # Calculate and print the Root Mean Squared Error between model results and actual revenue
    evaluator = RegressionEvaluator(labelCol="PM2_5___Local_Conditions", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

    # Write a summary of feature importance to a shared folder
    out_path = r"\\DELDEVD014\store"
    with open(os.path.join(out_path, 'feature_importance.txt'), 'w') as w:
        w.write(str(model.featureImportances))

In [None]:
run_python_script(code=predict, layers=[series_data])

In [None]:
counties = gis.content.get('10308a3e7dd7424592a1e3c648ac37b0')

In [None]:
boundary = counties.layers[0]

In [None]:
def average():
    from datetime import datetime as dt
    df = layers[0]
    df = df.filter(df['Parameter Name'] == 'PM2.5 - Local Conditions')
    res = geoanalytics.join_features(target_layer=layers[1], 
                                     join_layer=df, 
                                     join_operation="JoinOneToOne",
                                     summary_fields=[{'statisticType' : 'mean', 'onStatisticField' : 'Sample Measurement'}],
                                     spatial_relationship='Contains')
    res.write.format("webgis").save("average_pm_by_boundaryFinal" + str(dt.now().microsecond))

In [None]:
run_python_script(average, [air_lyr, boundary])

In [None]:
def date_parse():
    from datetime import datetime as dt
    from pyspark.sql import functions as F
#     from pyspark.sql.functions import concat, col, lit
    from pyspark.sql.functions import year, month, hour, dayofmonth, dayofweek
    df = layers[0]
    df = df.filter(df['Parameter Name'] == 'PM2.5 - Local Conditions')
#     df = df.filter(df.year == 2019)
#     df = df.withColumn('dt', concat(col("Date Local"), lit(" "), col("Time Local")))
    df = df.withColumn('date', F.unix_timestamp('Date GMT', 'yyyy-MM-dd').cast('timestamp'))
    df = df.withColumn('month', month(df['date']))
    df = df.withColumn('dayofmonth', dayofmonth(df['date']))
    df = df.withColumn('dayofweek', dayofweek(df['date']))    
    df.write.format("webgis").save("date_parsed" + str(dt.now().microsecond))

In [None]:
run_python_script(code=date_parse, layers=[air_lyr])

In [None]:
def process_df():
    from datetime import datetime as dt
    from pyspark.sql.functions import concat, col, lit
    import pandas as pd
    from fbprophet import Prophet
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType, StringType
    import warnings
    warnings.filterwarnings('ignore')
    
    df1 = layers[0]
    df1 = df1.withColumn('flag', lit(1))
    schema = StructType([StructField('ds', TimestampType(), True),
                        StructField('Outdoor_Temperature', FloatType(), True),
                        StructField('Ozone', FloatType(), True),
                        StructField('PM10_Total_0_10um_STP', FloatType(), True),
                        StructField('y', FloatType(), True),
                        StructField('Relative_Humidity', FloatType(), True),
                        StructField('Wind_Direction___Resultant', FloatType(), True),
                        StructField('Wind_Speed___Resultant', FloatType(), True),
                        StructField('datetime', StringType(), True),
                        StructField('flag', IntegerType(), True),
                        StructField('year', IntegerType(), True)])
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def forecast_pm25(df):
        cols = ['Outdoor_Temperature', 'Ozone', 'PM10_Total_0_10um_STP',
                'PM2_5___Local_Conditions', 'Relative_Humidity',
                'Wind_Direction___Resultant',
                'Wind_Speed___Resultant', 'datetime','flag']
        df = df[cols]
        df['Date'] = df['datetime'].astype('datetime64[ns]')
        df['year'] = df['Date'].dt.year
        df.set_index('Date', inplace=True) 
        df.sort_index(inplace=True)
        v = pd.date_range(start='2016-12-31 23:00:00', periods=18265, freq='H', closed='right')
        newdf = pd.DataFrame(index=v)

        merge=pd.merge(newdf, df, how='left', left_index=True, right_index=True)
        merge.interpolate(method='time', inplace=True)
        merge.reset_index(inplace=True)
        merge.rename(columns={'index': 'ds', 'PM2_5___Local_Conditions': 'y'}, inplace=True)
        merge.interpolate('nearest', inplace=True)
        merge['y'].fillna(0, inplace=True)
        merge['PM10_Total_0_10um_STP'].fillna(0, inplace=True)
        merge['Wind_Direction___Resultant'].fillna(0, inplace=True)
        merge['Wind_Speed___Resultant'].fillna(0, inplace=True)
        
        for i,item in enumerate(merge['y']):
            if item<=0:
                merge['y'].iloc[i]=merge['y'].iloc[i-1]
            else:
                merge['y'].iloc[i]=item
                
        for i,item in enumerate(merge['PM10_Total_0_10um_STP']):
            if item<=0:
                merge['PM10_Total_0_10um_STP'].iloc[i]=merge['PM10_Total_0_10um_STP'].iloc[i-1]
            else:
                merge['PM10_Total_0_10um_STP'].iloc[i]=item        
         
        for i,item in enumerate(merge['Wind_Speed___Resultant']):
            if item<=0:
                merge['Wind_Speed___Resultant'].iloc[i]=merge['Wind_Speed___Resultant'].iloc[i-1]
            else:
                merge['Wind_Speed___Resultant'].iloc[i]=item
        
        for i,item in enumerate(merge['Wind_Direction___Resultant']):
            if item<=0:
                merge['Wind_Direction___Resultant'].iloc[i]=merge['Wind_Direction___Resultant'].iloc[i-1]
            else:
                merge['Wind_Direction___Resultant'].iloc[i]=item
                
        return merge[:10]
    res = df1.groupby(['flag']).apply(forecast_pm25)
#     res.toPandas().to_csv(r'\\DELDEVD014\store\forecast')
    res.write.format("webgis").save("merged_df_subset" + str(dt.now().microsecond))

In [None]:
run_python_script(code=process_df, layers=[series_data])

In [None]:
merge_item = gis.content.search('merged_df_subset')[0]

In [None]:
merge_lyr = merge_item.tables[0]

In [None]:
#univariate
def fbprophet():
    from datetime import datetime as dt
    from pyspark.sql.functions import concat, col, lit
    import pandas as pd
    from fbprophet import Prophet
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType
    import warnings
    warnings.filterwarnings('ignore')
    
    df1 = layers[0]
    schema = StructType([StructField('ds', TimestampType(), True), 
                     StructField('yhat', FloatType(), True)],)
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def forecast_pm25(df):
        train_df = merge[merge.year != 2019]
        test_df = merge[merge.year == 2019] 
        test_df.drop(columns='y', inplace=True)     
        
        m = Prophet(daily_seasonality=True, weekly_seasonality=True)
        m.fit(train_df);

        forecast = (m.predict(test_df)[["ds", "yhat"]]
                  .assign(ds = lambda x : pd.to_datetime(x["ds"])))

        return forecast

    res = df1.groupby(['flag']).apply(forecast_pm25)

In [None]:
run_python_script(code=process_df, layers=[series_data])

In [None]:
#multivariate
def fbprophet():
    from datetime import datetime as dt
    from pyspark.sql.functions import concat, col, lit
    import pandas as pd
    from fbprophet import Prophet
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType
    import warnings
    warnings.filterwarnings('ignore')
    
    df1 = layers[0]
    schema = StructType([StructField('ds', TimestampType(), True), 
                         StructField('trend', FloatType(), True),
                         StructField('yhat_lower', FloatType(), True),
                         StructField('yhat_upper', FloatType(), True),
                         StructField('trend_lower', FloatType(), True),
                         StructField('trend_upper', FloatType(), True),
                         StructField('PM10_Total_0_10um_STP', FloatType(), True),
                         StructField('PM10_Total_0_10um_STP_lower', FloatType(), True),
                         StructField('PM10_Total_0_10um_STP_upper', FloatType(), True),
                         StructField('Wind_Direction___Resultant', FloatType(), True),
                         StructField('Wind_Direction___Resultant_lower', FloatType(), True),
                         StructField('Wind_Direction___Resultant_upper', FloatType(), True),
                         StructField('Wind_Speed___Resultant', FloatType(), True),
                         StructField('Wind_Speed___Resultant_lower', FloatType(), True),
                         StructField('Wind_Speed___Resultant_upper', FloatType(), True),
                         StructField('additive_terms', FloatType(), True),
                         StructField('additive_terms_lower', FloatType(), True),
                         StructField('additive_terms_upper', FloatType(), True),
                         StructField('daily', FloatType(), True),
                         StructField('daily_lower', FloatType(), True),
                         StructField('daily_upper', FloatType(), True),
                         StructField('extra_regressors_additive', FloatType(), True),
                         StructField('extra_regressors_additive_lower', FloatType(), True),
                         StructField('extra_regressors_additive_upper', FloatType(), True),
                         StructField('weekly', FloatType(), True),
                         StructField('weekly_lower', FloatType(), True),
                         StructField('weekly_upper', FloatType(), True),
                         StructField('multiplicative_terms', FloatType(), True),
                         StructField('multiplicative_terms_lower', FloatType(), True),
                         StructField('multiplicative_terms_upper', FloatType(), True),
                         StructField('yhat', FloatType(), True)])
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def forecast_pm25(df):           
        train_df = merge[merge.year != 2019]
        test_df = merge[merge.year == 2019]
        test_df.drop(columns='y', inplace=True)        
            
        m = Prophet(daily_seasonality=True,
                    weekly_seasonality=True)
        m.add_regressor('PM10_Total_0_10um_STP')
        m.add_regressor('Wind_Speed___Resultant')
        m.add_regressor('Wind_Direction___Resultant')
        m.fit(train_df);
        m.save(r'\\DELDEVD014\store\forecast_model')
        forecast = m.predict(test_df)
        return forecast
    res = df1.groupby(['flag']).apply(forecast_pm25)
    res.write.format("webgis").save("predicted_pm25" + str(dt.now().microsecond))