# PM2.5 prediction on big data using pyspark and facebook model 

## Table of Contents
* [Introduction](#Introduction)
* [Necessary imports](#Necessary-imports)
* [Connect to your ArcGIS Enterprise organization](#Connect-to-your-ArcGIS-Enterprise-organization)
* [Ensure your GIS supports GeoAnalytics](#Ensure-your-GIS-supports-GeoAnalytics)
* [Create a big data file share](#Create-a-big-data-file-share)
* [Get data for analysis](#Get-data-for-analysis)
    * [Search for big data file shares](#Search-for-big-data-file-shares)
    * [Search for feature layers](#Search-for-feature-layers)
* [Describe data](#Describe-data)
 ..fill it
* [Conclusion](#Conclusion)

## Introduction

In [None]:
intro lines

## Necessary imports

In [2]:
%matplotlib inline
import matplotlib.pyplot as plt

from datetime import datetime as dt
import pandas as pd

import arcgis
from arcgis.gis import GIS
from arcgis.geoanalytics.manage_data import run_python_script


## Connect to your ArcGIS Enterprise organization

In [4]:
gis = GIS(profile="your_enterprise_portal")

## Ensure your GIS supports GeoAnalytics
After connecting to Enterprise portal, we need to ensure an ArcGIS Enterprise GIS is set up with a licensed GeoAnalytics server. To do so, we will call the `is_supported()` method.

In [5]:
arcgis.geoanalytics.is_supported()

True

## Prepare the data
To register a file share or an HDFS, we need to format datasets as subfolders within a single parent folder and register the parent folder. This parent folder becomes a datastore, and each subfolder becomes a dataset. Our folder hierarchy would look like below:

Learn more about preparing your big data file share datasets [here](https://enterprise.arcgis.com/en/server/latest/get-started/windows/what-is-a-big-data-file-share.htm)

The `get_datastores()` method of the geoanalytics module returns a `DatastoreManager` object that lets you search for and manage the big data file share items as Python API `Datastore` objects on your GeoAnalytics server.

In [7]:
bigdata_datastore_manager = arcgis.geoanalytics.get_datastores()
bigdata_datastore_manager

<DatastoreManager for https://deldevld016.esri.com:6443/arcgis/admin>

We will register air quality data as a big data file share using the `add_bigdata()` function on a `DatastoreManager` object. 

When we register a directory, all subdirectories under the specified folder are also registered with the server. Always register the parent folder (for example, \\machinename\mydatashare) that contains one or more individual dataset folders as the big data file share item. To learn more, see [register a big data file share](https://enterprise.arcgis.com/en/server/latest/manage-data/windows/registering-your-data-with-arcgis-server-using-manager.htm#ESRI_SECTION1_0D55682C9D6E48E7857852A9E2D5D189)

Note: 
You cannot browse directories in ArcGIS Server Manager. You must provide the full path to the folder you want to register, for example, \\myserver\share\bigdata. Avoid using local paths, such as C:\bigdata, unless the same data folder is available on all nodes of the server site.

In [9]:
data_item = bigdata_datastore_manager.add_bigdata("Air_Auality_17_18_19", r"/mnt/network/data")

Created Big Data file share for Air_Auality_17_18_19


In [10]:
bigdata_fileshares = bigdata_datastore_manager.search()
bigdata_fileshares

[<Datastore title:"/nosqlDatabases/AGSDataStore_nosqldb_tcs_d3fjluut" type:"nosql">,
 <Datastore title:"/nosqlDatabases/AGSDataStore_bigdata_bds_1bpq1uya" type:"nosql">,
 <Datastore title:"/enterpriseDatabases/AGSDataStore_ds_1hn1zqxt" type:"egdb">,
 <Datastore title:"/bigDataFileShares/air_quality_17_18_19Full" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/air_quality_2019" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/air_quality" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/air_quality_17_18_19" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/Air_Auality_17_18_19" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/Chicago_Crime_2001_2020" type:"bigDataFileShare">]

## Get data for analysis

Adding a big data file share to the Geoanalytics server adds a corresponding [big data file share item](https://enterprise.arcgis.com/en/portal/latest/use/what-is-a-big-data-file-share.htm) on the portal. We can search for these types of items using the item_type parameter.

In [11]:
search_result = gis.content.search("bigDataFileShares_Air_Auality_17_18_19", item_type = "big data file share", max_items=40)
search_result

[<Item title:"bigDataFileShares_Air_Auality_17_18_19" type:Big Data File Share owner:admin>]

In [12]:
air_item = search_result[0]

In [13]:
air_lyr = air_item.layers[0]

In [14]:
air_lyr.properties

{
  "dataStoreID": "1aa144a0-a039-4e07-9c81-bc3c4efa4267",
  "fields": [
    {
      "name": "State Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "County Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Site Num",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Parameter Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "POC",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Latitude",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Longitude",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Datum",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Parameter Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date Local",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Time Local",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date GMT",
      "type": "esriFieldTypeString"
    },
    {
  

Querying the layers property of the [item](https://developers.arcgis.com/python/api-reference/arcgis.gis.toc.html#item) returns a featureLayer representing the data. The object is actually an API Layer object.

In [15]:
usa_counties = gis.content.search('usacounties', 'feature layer')[0]

In [16]:
usa_counties

We will use the first item for our analysis. Since the item is a Feature Layer Collection, accessing the layers property will give us a list of Feature layer objects.

In [17]:
usa_counties_lyr = usa_counties.layers[0]

## Describe data

The [`describe_dataset`](https://developers.arcgis.com/python/api-reference/arcgis.geoanalytics.summarize_data.html#describe-dataset) method provides an overview of big data. By default, the tool outputs a [table](https://developers.arcgis.com/python/api-reference/arcgis.features.toc.html#table) layer containing calculated field statistics and a dict outlining geometry and time settings for the input layer.

Optionally, the tool can output a ``feature layer`` representing a sample set of features using the ``sample_size`` parameter, or a single polygon feature layer representing the input feature layers' extent by setting the ``extent_output`` parameter to True. 

In [18]:
from arcgis.geoanalytics.summarize_data import describe_dataset

In [19]:
description = describe_dataset(input_layer=air_lyr,
                               extent_output=True,
                               sample_size=1000,
                               output_name="Description of air quality data" + str(dt.now().microsecond),
                               return_tuple=True)

In [20]:
description.output_json

{'datasetName': 'air_quality',
 'datasetSource': 'Big Data File Share - Air_Auality_17_18_19',
 'recordCount': 147535365,
 'geometry': {'geometryType': 'Point',
  'sref': {'wkid': 4326},
  'countNonEmpty': 147535365,
  'countEmpty': 0,
  'spatialExtent': {'xmin': -161.767,
   'ymin': 17.953006,
   'xmax': -65.915482,
   'ymax': 64.84568999999999}},
 'time': {'timeType': 'Instant',
  'countNonEmpty': 147535365,
  'countEmpty': 0,
  'temporalExtent': {'start': '2017-01-01 00:00:00.000',
   'end': '2019-04-30 00:00:00.000'}}}

In [21]:
description.sample_layer.query().sdf

Unnamed: 0,State_Code,County_Code,Site_Num,Parameter_Code,POC,Latitude,Longitude,Datum,Parameter_Name,Date_Local,...,Method_Type,Method_Code,Method_Name,State_Name,County_Name,Date_of_Last_Change,INSTANT_DATETIME,globalid,OBJECTID,SHAPE
0,56,35,700,62201,1,42.486361,-110.098861,WGS84,Relative Humidity,2017-07-05,...,Non-FRM,60,Instrumental - Vaisala 435C RH/AT Sensor,Wyoming,Sublette,2017-11-20,2017-07-05,{4CB48700-32CB-6126-E342-B783681A14AF},1,"{""x"": -110.098861, ""y"": 42.486360999999995, ""s..."
1,9,9,27,88313,1,41.301400,-72.902871,WGS84,Black Carbon PM2.5 at 880 nm,2017-06-08,...,Non-FRM,894,Magee AE33/ TAPI M633 Aethalometer - Optical a...,Connecticut,New Haven,2018-01-09,2017-06-08,{3D99EDC1-EC50-6B1A-508F-A0034CD9DB02},2,"{""x"": -72.90287099999999, ""y"": 41.3014, ""spati..."
2,45,79,7,42601,2,34.093959,-80.962304,WGS84,Nitric oxide (NO),2018-06-28,...,Non-FRM,674,Instrumental - Chemiluminescence Thermo Electr...,South Carolina,Richland,2019-02-18,2018-06-28,{04ED8FBB-2ADF-AA44-BE30-F61E7A939657},3,"{""x"": -80.962304, ""y"": 34.093959, ""spatialRefe..."
3,48,183,1,62101,1,32.378682,-94.711811,WGS84,Outdoor Temperature,2018-10-18,...,Non-FRM,40,INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.,Texas,Gregg,2019-02-18,2018-10-18,{60E09F2E-891F-EEBD-1253-0FFA407BDEA4},4,"{""x"": -94.711811, ""y"": 32.378682, ""spatialRefe..."
4,19,153,30,44201,1,41.603159,-93.643118,WGS84,Ozone,2017-12-23,...,FEM,47,INSTRUMENTAL - ULTRA VIOLET,Iowa,Polk,2018-01-12,2017-12-23,{73E73030-0883-7A32-ED9C-BD59ACC00A2F},5,"{""x"": -93.643118, ""y"": 41.603159, ""spatialRefe..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,27,53,962,61104,1,44.965242,-93.254759,NAD83,Wind Direction - Resultant,2018-07-15,...,Non-FRM,63,Instrumental - Climatronics,Minnesota,Hennepin,2019-01-17,2018-07-15,{61736B2D-7C20-DF9E-37BA-21CF788F8FDD},1003,"{""x"": -93.25475899999999, ""y"": 44.965241999999..."
996,48,43,101,44201,1,29.302650,-103.177810,WGS84,Ozone,2018-02-21,...,FEM,47,INSTRUMENTAL - ULTRA VIOLET,Texas,Brewster,2018-04-19,2018-02-21,{E739DA0C-7A97-8059-72A3-6505490F990D},1004,"{""x"": -103.17781, ""y"": 29.30265, ""spatialRefer..."
997,19,163,17,62101,1,41.467236,-90.688451,WGS84,Outdoor Temperature,2017-07-16,...,Non-FRM,40,INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.,Iowa,Scott,2017-08-10,2017-07-16,{1B805C6B-7621-B9B2-9C25-E35DC16D466E},1006,"{""x"": -90.688451, ""y"": 41.467236, ""spatialRefe..."
998,39,145,21,62101,1,38.600611,-82.829782,NAD83,Outdoor Temperature,2017-12-20,...,Non-FRM,40,INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.,Ohio,Scioto,2018-01-12,2017-12-20,{965A3811-00DB-88A4-A734-959041D2CF67},1007,"{""x"": -82.829782, ""y"": 38.600611, ""spatialRefe..."


In [22]:
m1 = gis.map()
m1

MapView(layout=Layout(height='400px', width='100%'))

In [24]:
m1.add_layer(description.sample_layer)

In [25]:
m1.zoom_to_layer(description.sample_layer)

In [27]:
m1.legend = False

In [28]:
def measurement_type():
    from datetime import datetime as dt
    # Load the big data file share layer into a DataFrame
    df = layers[0]
    out = df.groupBy('Method Name','Parameter Name').count()
    out.write.format("webgis").save("common_method_type" + str(dt.now().microsecond))

In [29]:
run_python_script(code=measurement_type, layers=[air_lyr])

[{'type': 'esriJobMessageTypeInformative',
  'description': 'Executing (RunPythonScript): RunPythonScript "def measurement_type():\\n    from datetime import datetime as dt\\n    # Load the big data file share layer into a DataFrame\\n    df = layers[0]\\n    out = df.groupBy(\'Method Name\',\'Parameter Name\').count()\\n    out.write.format("webgis").save("common_method_type" + str(dt.now().microsecond))\\n\\nmeasurement_type()" https://deldevld016.esri.com/server/rest/services/DataStoreCatalogs/bigDataFileShares_Air_Auality_17_18_19/BigDataCatalogServer/air_quality "{"defaultAggregationStyles": false}"'},
 {'type': 'esriJobMessageTypeInformative',
  'description': 'Start Time: Thu Jul  2 21:58:58 2020'},
 {'type': 'esriJobMessageTypeInformative',
  'description': 'Using URL based GPRecordSet param: https://deldevld016.esri.com/server/rest/services/DataStoreCatalogs/bigDataFileShares_Air_Auality_17_18_19/BigDataCatalogServer/air_quality'},
 {'type': 'esriJobMessageTypeInformative',
  

In [30]:
method_item = gis.content.search('common_method_type')[0]

In [34]:
method_df = method_item.tables[0].query(as_df=True)

In [35]:
method_df.sort_values(by='count', ascending=False)

Unnamed: 0,Method_Name,Parameter_Name,count,globalid,OBJECTID
89,INSTRUMENTAL - ULTRA VIOLET ABSORPTION,Ozone,10650047,{1C333EB9-AB7D-9ACF-3116-09924757FD90},112
6,INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.,Outdoor Temperature,9808818,{17DCDDD0-079F-5BF2-45C3-0BF485355EB0},8
158,INSTRUMENTAL - ULTRA VIOLET,Ozone,8634142,{EDA2D065-CE9E-2175-F094-8D9C2F6DCEBD},198
290,INSTRUMENTAL - VECTOR SUMMATION,Wind Direction - Resultant,7348868,{7154F73E-775C-4C1A-8775-FB99B0074348},503
277,INSTRUMENTAL - VECTOR SUMMATION,Wind Speed - Resultant,7267098,{4CCC027A-BABE-DF05-FC6B-9B04F013AF14},455
...,...,...,...,...,...
100,Cooper Environmental Services model Xact 620 -...,Vanadium PM10 LC,105,{ABFA136A-BE6E-79A7-FCA4-608CFABCB77F},126
234,Cooper Environmental Services model Xact 620 -...,Tin PM10 LC,105,{A5B11059-6FD5-EC9A-65E4-6D4B06B25FFF},339
245,Cooper Environmental Services model Xact 620 -...,Silver PM10 LC,105,{E56280AE-5F42-AD31-2D95-C3418CB40985},356
12,Cooper Environmental Services model Xact 620 -...,Calcium PM10 LC,105,{6FC4B1BA-A7E4-A69C-BB44-CC26A6FD935B},17


## Average PM 2.5 value by County

In [36]:
def average():
    from datetime import datetime as dt
    df = layers[0]
    df = df.filter(df['Parameter Name'] == 'PM2.5 - Local Conditions')
    res = geoanalytics.join_features(target_layer=layers[1], 
                                     join_layer=df, 
                                     join_operation="JoinOneToOne",
                                     summary_fields=[{'statisticType' : 'mean', 'onStatisticField' : 'Sample Measurement'}],
                                     spatial_relationship='Contains')
    res.write.format("webgis").save("average_pm_by_county" + str(dt.now().microsecond))

In [37]:
run_python_script(average, [air_lyr, usa_counties_lyr])

[{'type': 'esriJobMessageTypeInformative',
  'description': 'Executing (RunPythonScript): RunPythonScript "def average():\\n    from datetime import datetime as dt\\n    df = layers[0]\\n    df = df.filter(df[\'Parameter Name\'] == \'PM2.5 - Local Conditions\')\\n    res = geoanalytics.join_features(target_layer=layers[1], \\n                                     join_layer=df, \\n                                     join_operation="JoinOneToOne",\\n                                     summary_fields=[{\'statisticType\' : \'mean\', \'onStatisticField\' : \'Sample Measurement\'}],\\n                                     spatial_relationship=\'Contains\')\\n    res.write.format("webgis").save("average_pm_by_county" + str(dt.now().microsecond))\\n\\naverage()" https://deldevld016.esri.com/server/rest/services/DataStoreCatalogs/bigDataFileShares_Air_Auality_17_18_19/BigDataCatalogServer/air_quality;https://deldevld016.esri.com/server/rest/services/Hosted/usaCounties/FeatureServer/0 "{"defaul

In [38]:
res = run_python_script(average, [air_lyr, usa_counties_lyr])

In [40]:
average_pm_by_county = gis.content.search('average_pm_by_county')[0]

In [41]:
average_pm_by_county

In [39]:
m2 = gis.map()
m2

MapView(layout=Layout(height='400px', width='100%'))

In [42]:
m2.add_layer(average_pm_by_county.layers[0])

In [43]:
m2.zoom_to_layer(average_pm_by_county.layers[0])

In [17]:
def data_processsing():
    from datetime import datetime as dt
    import pyspark.sql.functions as F
    from pyspark.sql.functions import concat, col, lit
    # Load the big data file share layer into a DataFrame.
    df = layers[0]
    cols = ['Site Num', 'County Code', 'State Code', 'Date Local', 'Time Local', 'Parameter Name', 'Sample Measurement']
    df = df.select(cols)
    df = df.withColumn('Site_Num', F.lpad(df['Site Num'], 4, '0'))
    df = df.withColumn('County_Code', F.lpad(df['County Code'], 3, '0'))
    df = df.withColumn('State_Code', F.lpad(df['State Code'], 2, '0'))
    df = df.withColumn('unique_id', F.concat(F.col('State_Code'), F.col('County_Code'), F.col('Site_Num')))
#     drop_cols = ['Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code']
    df = df.drop('Site_Num', 'County_Code', 'Staate_Code', 'Site Num', 'County Code', 'State Code')
    df = df.withColumn('datetime', concat(col("Date Local"), lit(" "), col("Time Local")))
#     drop_cols = ['Time Local', 'Date Local']
    df = df.drop('Time Local', 'Date Local')
    df = df.filter(df.unique_id == df.first().unique_id)
    # group the dataframe by TextType field and count the number of calls for each call type. 
    df = df.groupby(df['datetime'], df['unique_id']).pivot("Parameter Name").avg("Sample Measurement")

    df.write.format("webgis").save("timeseries_data_17_18_19_1station" + str(dt.now().microsecond))

In [13]:
run_python_script(code=data_processsing, layers=[air_lyr], gis=gis)

Submitted.
Executing...
Executing (RunPythonScript): RunPythonScript "def data_processsing():\n    from datetime import datetime as dt\n    import pyspark.sql.functions as F\n    from pyspark.sql.functions import concat, col, lit\n    # Load the big data file share layer into a DataFrame.\n    df = layers[0]\n    cols = ['Site Num', 'County Code', 'State Code', 'Date Local', 'Time Local', 'Parameter Name', 'Sample Measurement']\n    df = df.select(cols)\n    df = df.withColumn('Site_Num', F.lpad(df['Site Num'], 4, '0'))\n    df = df.withColumn('County_Code', F.lpad(df['County Code'], 3, '0'))\n    df = df.withColumn('State_Code', F.lpad(df['State Code'], 2, '0'))\n    df = df.withColumn('unique_id', F.concat(F.col('State_Code'), F.col('County_Code'), F.col('Site_Num')))\n#     drop_cols = ['Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code']\n    df = df.drop('Site_Num', 'County_Code', 'Staate_Code', 'Site Num', 'County Code', 'State Code')\n    df = df.wit

{"messageCode":"BD_101029","message":"1007/1348 distributed tasks completed.","params":{"completedTasks":"1007","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1024/1348 distributed tasks completed.","params":{"completedTasks":"1024","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1046/1348 distributed tasks completed.","params":{"completedTasks":"1046","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1065/1348 distributed tasks completed.","params":{"completedTasks":"1065","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1089/1348 distributed tasks completed.","params":{"completedTasks":"1089","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1115/1348 distributed tasks completed.","params":{"completedTasks":"1115","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1138/1348 distributed tasks completed.","params":{"completedTasks":"1138","totalTasks":"1348"}}
{"messageCode":"BD_101029","message":"1348/1348 distributed ta

{"messageCode":"BD_101029","message":"1124/1547 distributed tasks completed.","params":{"completedTasks":"1124","totalTasks":"1547"}}
{"messageCode":"BD_101029","message":"1128/1547 distributed tasks completed.","params":{"completedTasks":"1128","totalTasks":"1547"}}
{"messageCode":"BD_101029","message":"1348/1547 distributed tasks completed.","params":{"completedTasks":"1348","totalTasks":"1547"}}
{"messageCode":"BD_101029","message":"1547/1547 distributed tasks completed.","params":{"completedTasks":"1547","totalTasks":"1547"}}
{"messageCode":"BD_101081","message":"Finished writing results:"}
{"messageCode":"BD_101082","message":"* Count of features = 18231","params":{"resultCount":"18231"}}
{"messageCode":"BD_101083","message":"* Spatial extent = None","params":{"extent":"None"}}
{"messageCode":"BD_101084","message":"* Temporal extent = None","params":{"extent":"None"}}
{"messageCode":"BD_101226","message":"Feature service layer created: https://deldevld016.esri.com/server/rest/serv

[{'type': 'esriJobMessageTypeInformative',
  'description': 'Executing (RunPythonScript): RunPythonScript "def data_processsing():\\n    from datetime import datetime as dt\\n    import pyspark.sql.functions as F\\n    from pyspark.sql.functions import concat, col, lit\\n    # Load the big data file share layer into a DataFrame.\\n    df = layers[0]\\n    cols = [\'Site Num\', \'County Code\', \'State Code\', \'Date Local\', \'Time Local\', \'Parameter Name\', \'Sample Measurement\']\\n    df = df.select(cols)\\n    df = df.withColumn(\'Site_Num\', F.lpad(df[\'Site Num\'], 4, \'0\'))\\n    df = df.withColumn(\'County_Code\', F.lpad(df[\'County Code\'], 3, \'0\'))\\n    df = df.withColumn(\'State_Code\', F.lpad(df[\'State Code\'], 2, \'0\'))\\n    df = df.withColumn(\'unique_id\', F.concat(F.col(\'State_Code\'), F.col(\'County_Code\'), F.col(\'Site_Num\')))\\n#     drop_cols = [\'Site_Num\', \'County_Code\', \'State_Code\', \'Site Num\', \'County Code\', \'State Code\']\\n    df = df.dr

In [45]:
data = gis.content.search('timeseries_data_17_18_19_1station_working')

In [46]:
data[0]

In [47]:
series_data = data[0].tables[0]

In [48]:
series_data.query().sdf.columns

Index(['datetime', 'unique_id', 'Barometric_pressure', 'Carbon_monoxide',
       'Nitric_oxide__NO_', 'Nitrogen_dioxide__NO2_', 'Outdoor_Temperature',
       'Oxides_of_nitrogen__NOx_', 'Ozone', 'PM10_Total_0_10um_STP',
       'PM2_5___Local_Conditions', 'Reactive_oxides_of_nitrogen__NOy_',
       'Relative_Humidity', 'Sulfur_dioxide', 'Wind_Direction___Resultant',
       'Wind_Speed___Resultant', 'globalid', 'OBJECTID'],
      dtype='object')

In [68]:
def predict_pm25():
    from arcgis.gis import GIS
    gis = GIS('https://deldevld016.esri.com/portal', 'admin', 'esri.agp2', profile="your_enterprise_portal", verify_cert=False)
    from datetime import datetime as dt
    from pyspark.sql.functions import concat, col, lit
    import pandas as pd
    import numpy as np
    from fbprophet import Prophet
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType
    import warnings
    warnings.filterwarnings('ignore')
    
    df1 = layers[0] 
    cols = ['Outdoor_Temperature', 'Ozone', 'PM10_Total_0_10um_STP',
        'PM2_5___Local_Conditions',
        'Wind_Direction___Resultant',
        'Wind_Speed___Resultant', 'datetime']
    df1 = df1.select(cols)
    df1 = df1.withColumn('flag', lit(1))
    schema = StructType([StructField('ds', TimestampType(), True), 
                         StructField('yhat_lower', FloatType(), True),
                         StructField('yhat_upper', FloatType(), True),
                         StructField('yhat', FloatType(), True),
                         StructField('y', FloatType(), True)])
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def forecast_pm25(df):
        #prepare data 
        df['Date'] = df['datetime'].astype('datetime64[ns]')
        df['year'] = df['Date'].dt.year
        df.set_index('Date', inplace=True) 
        df.sort_index(inplace=True)
        v = pd.date_range(start='2016-12-31 23:00:00', periods=18265, freq='H', closed='right')
        newdf = pd.DataFrame(index=v)
        # Fill missing dates 
        historical=pd.merge(newdf, df, how='left', left_index=True, right_index=True)
        historical.interpolate(method='time', inplace=True)
        historical.reset_index(inplace=True)
        historical.rename(columns={'index': 'ds', 'PM2_5___Local_Conditions': 'y'}, inplace=True)
        historical.fillna(0, inplace=True)
        # handle zero and negative values for pm
        for i,item in enumerate(historical['y']):
            if item<=0:
                historical['y'].iloc[i]=historical['y'].iloc[i-1]
            else:
                historical['y'].iloc[i]=item
                
        for i,item in enumerate(historical['PM10_Total_0_10um_STP']):
            if item<=0:
                historical['PM10_Total_0_10um_STP'].iloc[i]=historical['PM10_Total_0_10um_STP'].iloc[i-1]
            else:
                historical['PM10_Total_0_10um_STP'].iloc[i]=item        
         
        for i,item in enumerate(historical['Wind_Speed___Resultant']):
            if item<=0:
                historical['Wind_Speed___Resultant'].iloc[i]=historical['Wind_Speed___Resultant'].iloc[i-1]
            else:
                historical['Wind_Speed___Resultant'].iloc[i]=item
        
        for i,item in enumerate(historical['Wind_Direction___Resultant']):
            if item<=0:
                historical['Wind_Direction___Resultant'].iloc[i]=historical['Wind_Direction___Resultant'].iloc[i-1]
            else:
                historical['Wind_Direction___Resultant'].iloc[i]=item
        # split data into train and test        
        train_df = historical[historical.year != 2019]
        test_df = historical[historical.year == 2019]
        test_df.drop(columns='y', inplace=True)        
        # train model    
        m = Prophet(daily_seasonality=True,
                    weekly_seasonality=True)
        m.add_regressor('PM10_Total_0_10um_STP')
        m.add_regressor('Wind_Speed___Resultant')
        m.add_regressor('Wind_Direction___Resultant')
        m.fit(train_df);
        # predict on test data
        forecast = m.predict(test_df)
        # save plots locally
        plot1 = m.plot(forecast);
        plot2 = m.plot_components(forecast);
        plot1.savefig(r'/home/ags/localdatastore/fbdata/forecast.png')
        plot2.savefig(r'/home/ags/localdatastore/fbdata/cmponents.png')
        # Uncomment the following lines if you want to publish and visualize your graphs as as item.
#         gis = GIS('https://machinename/portal', 'username', 'password')
#         gis.content.add(item_properties={"type": "Image", "title": "Forecast Plot"}, data=r"/home/ags/localdatastore/fbdata/forecast.png")
#         gis.content.add(item_properties={"type": "Image", "title": "Forecase Components2"}, data=r"/home/ags/localdatastore/fbdata/cmponents.png")
        # create df with actual and predicted fields
        cmp_df = forecast.set_index('ds')[['yhat', 'yhat_lower', 'yhat_upper']].join(historical.set_index('ds'))
        cmp_df.reset_index(inplace=True)
        cmp_df = cmp_df[['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y']]
        return cmp_df
    res = df1.groupby(['flag']).apply(forecast_pm25)

    res.write.format("webgis").save("predicted_results_on_test_data" + str(dt.now().microsecond))

In [69]:
run_python_script(code=predict_pm25, layers=[series_data])

[{'type': 'esriJobMessageTypeInformative',
 {'type': 'esriJobMessageTypeInformative',
  'description': 'Start Time: Fri Jul  3 08:50:44 2020'},
 {'type': 'esriJobMessageTypeInformative',
  'description': 'Using URL based GPRecordSet param: https://deldevld016.esri.com/server/rest/services/Hosted/timeseries_data_17_18_19_1station897490/FeatureServer/0'},
 {'type': 'esriJobMessageTypeInformative',
  'description': '{"messageCode":"BD_101028","message":"Starting new distributed job with 206 tasks.","params":{"totalTasks":"206"}}'},
 {'type': 'esriJobMessageTypeInformative',
  'description': '{"messageCode":"BD_101029","message":"0/206 distributed tasks completed.","params":{"completedTasks":"0","totalTasks":"206"}}'},
 {'type': 'esriJobMessageTypeInformative',
  'description': '{"messageCode":"BD_101029","message":"7/206 distributed tasks completed.","params":{"completedTasks":"7","totalTasks":"206"}}'},
 {'type': 'esriJobMessageTypeInformative',
  'description': '{"messageCode":"BD_10102

In [28]:
predicted_item = gis.content.search('predicted_results_on_test_data')

In [29]:
predicted_item[0]

In [30]:
predicted_df = predicted_item[0].tables[0].query().sdf

In [31]:
predicted_df.columns

Index(['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y', 'globalid', 'OBJECTID'], dtype='object')

In [32]:
predicted_df.head()

Unnamed: 0,ds,yhat_lower,yhat_upper,yhat,y,globalid,OBJECTID
0,2018-12-31 18:30:00,1.357769,15.94111,8.691076,5.5,{7BCD0AF8-F5B1-4A7E-7118-88589E0A4DAD},44
1,2018-12-31 19:30:00,1.141512,16.265352,8.773848,5.7,{747EF178-8A36-5CCC-74A3-4BAD71DB0553},244
2,2018-12-31 20:30:00,0.389374,15.927957,7.641061,5.4,{7D6C9381-7279-AB10-C5F7-AE138BFD6AE0},444
3,2018-12-31 21:30:00,-0.164141,15.033084,7.111085,4.4,{B9BD44CF-F794-87F3-E389-E654CDCE8844},644
4,2018-12-31 22:30:00,-0.587776,14.489634,7.00242,4.4,{B7C8C5C7-1C9E-E66A-CA41-39E5F0D601FA},844


dashboard link

## Conclusion

## References