In [None]:
from dask.distributed import Client, LocalCluster
import s3fs
import dask.dataframe as dd
import time
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gpd
from shapely.geometry import Point
import prophet

from dask_cloudprovider.azure import AzureVMCluster

In [None]:
# columns = ["when_captured",
#            "service_uploaded",
#             "loc_lat",
#             "loc_lon",
#             "opc_pm01_0",
#             "opc_pm02_5",
#             "opc_pm10_0",
#             "pms_pm01_0",
#             "pms_pm02_5",
#             "pms_pm10_0",
#             "pms2_pm01_0",
#             "pms2_pm02_5",
#             "pms2_pm10_0",
#             "lnd_712u",
#             "lnd_712c",
#             "lnd_7318u",
#             "lnd_7318c",
#             "lnd_7128ec",
#             "lnd_78017u",
#             "lnd_78017c",
#             "lnd_78017w",
#             "env_temp",
#             "env_humid",
#             "env_press"]

In [None]:
def create_azure_cluster_and_client():
    cluster = AzureVMCluster(resource_group="adzd_group_v3",
                             vnet="adzd_vnet_v3", 
                             location ="northeurope", 
                             security_group="adzd_security_v3",
                             n_workers=1)
    client = Client(cluster)
    return cluster, client

In [None]:
def create_local_cluster_and_client():
    cluster = LocalCluster()
    client = Client(cluster)
    return cluster,client

In [None]:
def scale_and_run_benchmark(cluster,df,benchmark_func,workers):
    cluster.scale(workers)
    start_time = time.time()
    result = benchmark_func(df)
    end_time = time.time()
    return result,end_time-start_time

In [None]:
def get_avg_radiation(payload):
    get_radiation_funcs = [get_lnd_712u, get_lnd_7318u, get_lnd_7318c, get_lnd_7128ec, get_lnd_78017u, get_lnd_78017c, get_lnd_78017w]
    measurements = [func(payload) for func in get_radiation_funcs]
    filtered = [m for m in measurements if m != None]
    msum = float(sum(filtered))
    mcount = len(filtered)
    return msum/mcount if mcount > 0 else None

In [None]:
get_when_captured = lambda payload: payload.get("when_captured",None)
get_lat = lambda payload: payload.get("loc_lat", None)
get_lon = lambda payload: payload.get("loc_lon", None)
# Jednostka dla poniższych: μSv/h
get_lnd_712u = lambda payload: payload.get("lnd_712c", None)
get_lnd_7318u = lambda payload: payload.get("lnd_7318u", None)
get_lnd_7318c = lambda payload: payload.get("lnd_7318c", None)
get_lnd_7128ec = lambda payload: payload.get("lnd_7128ec", None)
get_lnd_78017u = lambda payload: payload.get("lnd_78017u", None)
get_lnd_78017c = lambda payload: payload.get("lnd_78017c", None)
get_lnd_78017w = lambda payload: payload.get("lnd_78017w", None)

In [None]:
def read_data(date_string):
    df = dd.read_json(
            f's3://safecastdata-us-west-2/ingest/prd/s3raw/{date_string}', storage_options={'anon':True})
    return df

In [None]:
def read_data_for_multiple_days(year_and_month_string,first_day=1,last_day=30):
    paths = [
        f's3://safecastdata-us-west-2/ingest/prd/s3raw/{year_and_month_string}-{day:02}/*/*' for day in range(first_day,last_day+1)
    ]
    df = dd.read_json(
            paths, 
            storage_options={'anon': True}
        )
    return df

In [None]:
def prep_data(df):
    df_prep = df.copy()
    
    print("GET DATETIME")
    df_prep["datetime"] = df_prep["payload"].map(get_when_captured,meta=('datetime', str))
    
    print("GET LATITUDE AND LONGITUDE")
    df_prep["latitude"] = df_prep["payload"].map(get_lat,meta=('latitude', float))
    df_prep["longitude"] = df_prep["payload"].map(get_lon,meta=('longitude', float))
    
    print("GET RADIATION")
    df_prep["lnd_712u"] = df_prep["payload"].map(get_lnd_712u,meta=('712u', float))
    df_prep["lnd_7318u"] = df_prep["payload"].map(get_lnd_7318u,meta=('7318u', float))
    df_prep["lnd_7318c"] = df_prep["payload"].map(get_lnd_7318c,meta=('7318c', float))
    df_prep["lnd_7128ec"] = df_prep["payload"].map(get_lnd_7128ec,meta=('7128ec', float))
    df_prep["lnd_78017u"] = df_prep["payload"].map(get_lnd_78017u,meta=('78017u', float))
    df_prep["lnd_78017c"] = df_prep["payload"].map(get_lnd_78017c,meta=('78017c', float))
    df_prep["lnd_78017w"] = df_prep["payload"].map(get_lnd_78017w,meta=('78017w', float))
    df_prep["avg_radiation"] = df_prep["payload"].map(get_avg_radiation,meta=('avg_radiation', float))
    
    print("SELECT COLUMNS")
    df_prep = df_prep[["datetime","latitude","longitude","avg_radiation"]]
    
    print("REMOVE NULL VALUES")
    df_prep = df_prep.dropna(how='any')
    
    print("CONVERT STRING TO DATETIME")
    df_prep["datetime"] = dd.to_datetime(df_prep["datetime"])
    df_prep["date_with_hour"] = dd.to_datetime(df_prep["datetime"].apply(lambda dt: str(dt)[:10] + " " + str(dt)[11:13] + ":00", meta=('datetime',str)))
    
    print("REMOVE WRONG DATA BASED ON DATE")
    today = pd.datetime.today()
    df_prep = df_prep[df_prep['date_with_hour'].between('1980-01-01', today)]
    
    print("REMOVE OUTLIERS")
    # Keep only positive values
    df_prep = df_prep[df_prep["avg_radiation"] > 0]
    
    # Identifying outliers with the 1.5xIQR rule
    Q1 = df_prep['avg_radiation'].quantile(.25)
    Q3 = df_prep['avg_radiation'].quantile(.75)
    q1 = Q1 - 1.5 * (Q3 - Q1)
    Q4 = df_prep['avg_radiation'].quantile(0.995)
    df_prep = df_prep[df_prep['avg_radiation'].between(q1, Q4)]

    return df_prep

In [None]:
def show_sensors_locations_on_map(df):
    geometry = [Point(x,y) for x,y in zip(df['longitude'], df['latitude'])]
    gdf = gpd.GeoDataFrame(geometry=geometry)   

    world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
    ax = gdf.plot(ax=world.plot(figsize=(15, 10)), marker='.', color='red', markersize=8)
    ax.set_title("Sensors' locations")
    plt.grid()
    plt.show()

In [None]:
def get_data_for_given_area(df,lon_min,lat_min,lon_max,lat_max):
    return df[(df.longitude > lon_min) & (df.longitude < lon_max) & (df.latitude > lat_min) & (df.latitude < lat_max)]

In [None]:
def filter_data_by_region(df,region):
    lon_min_jpn, lat_min_jpn, lon_max_jpn, lat_max_jpn = 128.03, 30.22, 148.65, 45.83 # Japan
    lon_min_fk, lat_min_fk, lon_max_fk, lat_max_fk = 140.0166, 37.0047, 141.2251, 38.195 # Fukushima
    lon_min_usa, lat_min_usa, lon_max_usa, lat_max_usa = -161.75583, 19.50139, -68.01197, 64.85694 # USA
    
    if region == "Japan":
        return get_data_for_given_area(df,lon_min_jpn,lat_min_jpn,lon_max_jpn,lat_max_jpn) 
    elif region == "Fukushima":
        return get_data_for_given_area(df,lon_min_fk,lat_min_fk,lon_max_fk,lat_max_fk)
    elif region == "USA":
        return get_data_for_given_area(df,lon_min_usa,lat_min_usa,lon_max_usa,lat_max_usa)
    else:
        return df

In [None]:
def get_avg_radiation_by_location(df):
    return df.groupby(by=["latitude","longitude"]).agg({'avg_radiation': ['mean', 'std']}).compute()

In [None]:
def get_avg_radiation_per_hour(df):
    return df.groupby(by=["date_with_hour"]).agg({'avg_radiation':'mean'}).compute()

In [None]:
def count_records(df):
    return df.count().compute()["avg_radiation"]

In [None]:
def count_unique_locations(df):
    return df.groupby(by=["latitude","longitude"]).count().count().compute()

In [None]:
def get_max_and_min_radiation(df):
    return {"min": df.avg_radiation.min().compute(), "max": df.avg_radiation.max().compute()}

In [None]:
def get_loc_with_max_radiation(df):
    max_val = df.avg_radiation.max().compute()
    pandas_df = df[df.avg_radiation == max_val].compute()
    lat = pandas_df.iloc[0]["latitude"]
    long = pandas_df.iloc[0]["longitude"]
    return {"latitude":lat,"longitude":long}

In [None]:
def run_benchmarks(cluster,df,cluster_type="local"):
    number_of_records = count_records(df)
    benchmarks = [count_records,count_unique_locations,get_avg_radiation_by_location,get_max_and_min_radiation,get_loc_with_max_radiation]
    workers_list = [1,2,3,4]
    
    results = []
    for benchmark in benchmarks:
        for workers in workers_list:
            print(f"RUNNING BENCHMARK {benchmark.__name__} USING {workers} WORKERS")
            _,duration = scale_and_run_benchmark(cluster,df,benchmark,workers)
            results.append({"records":number_of_records,"benchmark_func":benchmark.__name__,"workers":workers,"time":duration,"cluster_type":cluster_type})
            
    results_df = pd.DataFrame(results)
    
    time_one_instance = results_df.iloc[0].time
    results_df["speedup"] = time_one_instance/results_df["time"]
    results_df["efficiency"] = results_df["speedup"]/results_df["workers"]
            
    return results_df

In [None]:
def plot_efficiency(results,exclude_benchmarks=["count_records"]):
    benchmarks = [b for b in pd.unique(results["benchmark_func"]) if b not in exclude_benchmarks]
    colors = ["tab:blue","tab:orange","tab:green","tab:red","tab:purple","tab:cyan"]
    assert(len(colors)>=len(benchmarks))
    used_colors = colors[:len(benchmarks)]
    
    for benchmark,color in zip(benchmarks,used_colors):
        filtered = results[results["benchmark_func"]==benchmark]
        plt.plot(filtered["workers"],filtered["efficiency"],c=color,label=benchmark,marker='o',linestyle='--')
    
    plt.xlabel("Number of worker instances")
    plt.ylabel("Efficiency")
    plt.ylim(ymin=0,ymax=1.1)
    plt.grid()
    plt.legend()
    plt.show()

In [None]:
def plot_acceleration(results,exclude_benchmarks=["count_records"]):
    benchmarks = [b for b in pd.unique(results["benchmark_func"]) if b not in exclude_benchmarks]
    colors = ["tab:blue","tab:orange","tab:green","tab:red","tab:purple","tab:cyan"]
    assert(len(colors)>=len(benchmarks))
    used_colors = colors[:len(benchmarks)]
    
    for benchmark,color in zip(benchmarks,used_colors):
        filtered = results[results["benchmark_func"]==benchmark]
        plt.plot(filtered["workers"],filtered["speedup"],c=color,label=benchmark,marker='o',linestyle='--')
    
    plt.xlabel("Number of worker instances")
    plt.ylabel("Acceleration")
    plt.grid()
    plt.legend()
    plt.show()

In [None]:
def plot_time(results,exclude_benchmarks=["count_records"]):
    benchmarks = [b for b in pd.unique(results["benchmark_func"]) if b not in exclude_benchmarks]
    colors = ["tab:blue","tab:orange","tab:green","tab:red","tab:purple","tab:cyan"]
    assert(len(colors)>=len(benchmarks))
    used_colors = colors[:len(benchmarks)]
    
    for benchmark,color in zip(benchmarks,used_colors):
        filtered = results[results["benchmark_func"]==benchmark]
        plt.plot(filtered["workers"],filtered["time"],c=color,label=benchmark,marker='o',linestyle='--')
    
    plt.xlabel("Number of worker instances")
    plt.ylabel("Time")
    plt.grid()
    plt.legend()
    plt.show()

In [None]:
# cluster,client = create_azure_cluster_and_client()

cluster,client = create_local_cluster_and_client()

In [None]:
client

In [None]:
pd.set_option('display.max_colwidth', None)

In [None]:
single_date_string = '2021-01-20/*/*'
df_single_date = read_data(single_date_string)
df_single_date.head()

In [None]:
df_single_date

In [None]:
df_sd_prep = prep_data(df_single_date)

In [None]:
df_sd_prep

In [None]:
df_sd_prep.head()

In [None]:
show_sensors_locations_on_map(df_sd_prep)

In [None]:
df_sd_jpn = filter_data_by_region(df_sd_prep,"Japan")
df_sd_fk = filter_data_by_region(df_sd_prep,"Fukushima")
df_sd_usa = filter_data_by_region(df_sd_prep,"USA")

In [None]:
show_sensors_locations_on_map(df_sd_jpn)

In [None]:
show_sensors_locations_on_map(df_sd_fk)

In [None]:
show_sensors_locations_on_map(df_sd_usa)

In [None]:
def is_2021(date):
    return date.year == 2021

In [None]:
df_sd_fk_grouped = get_avg_radiation_per_hour(df_sd_fk)

In [None]:
df_sd_fk_grouped.head()

In [None]:
fk_grouped = df_sd_fk_grouped.reset_index()
fk_grouped["year"] = fk_grouped["date_with_hour"].apply(is_2021)
fk_filtered = fk_grouped[fk_grouped["year"]]
fk_filtered

In [None]:
df_sd_jpn_grouped = get_avg_radiation_per_hour(df_sd_jpn)

In [None]:
df_sd_jpn_grouped.head()

In [None]:
jpn_grouped = df_sd_jpn_grouped.reset_index()
jpn_grouped["year"] = jpn_grouped["date_with_hour"].apply(is_2021)
jpn_filtered = jpn_grouped[jpn_grouped["year"]]
jpn_filtered

In [None]:
df_sd_usa_grouped = get_avg_radiation_per_hour(df_sd_usa)

In [None]:
df_sd_usa_grouped.head()

In [None]:
usa_grouped = df_sd_usa_grouped.reset_index()
usa_grouped["year"] = usa_grouped["date_with_hour"].apply(is_2021)
usa_filtered = usa_grouped[usa_grouped["year"]]
usa_filtered

In [None]:
hours_fk = [str(d) for d in list(fk_filtered.index)]
hours_jpn = [str(d) for d in list(jpn_filtered.index)]
hours_usa = [str(d) for d in list(usa_filtered.index)]

values_fk = fk_filtered.avg_radiation
values_jpn = jpn_filtered.avg_radiation
values_usa = usa_filtered.avg_radiation

In [None]:
hours = [hours_fk,hours_jpn,hours_usa]
values = [values_fk,values_jpn,values_usa]
colors = ["tab:blue","tab:green","tab:red"]
regions = ["Fukushima","Japan","USA"]

In [None]:
plt.figure(figsize=(15,10))

for color,(region,(h,v)) in zip(colors,zip(regions,zip(hours,values))):
    plt.plot(h,v,c=color,label=region,marker='o',linestyle='--')
    
plt.xlabel('Time')
plt.ylabel('Avg radiation')
plt.legend()
plt.xticks(rotation=90)
plt.show()

In [None]:
df_four_days = read_data_for_multiple_days("2021-01",first_day=20,last_day=23)

In [None]:
df_four_days.head()

In [None]:
df_fd_prep = prep_data(df_four_days)

In [None]:
df_fd_prep.head()

In [None]:
df_fd_fk_grouped = get_avg_radiation_per_hour(filter_data_by_region(df_fd_prep,"Fukushima")).reset_index()

In [None]:
df_fd_fk_grouped["is_2021"] = df_fd_fk_grouped["date_with_hour"].apply(is_2021)

In [None]:
forecast_df = df_fd_fk_grouped[df_fd_fk_grouped["is_2021"]][["date_with_hour","avg_radiation"]]

In [None]:
forecast_df.head()

In [None]:
forecast_df.rename(columns={"date_with_hour":"ds","avg_radiation":"y"},inplace=True)

In [None]:
forecast_df.head()

In [None]:
len(forecast_df)

In [None]:
plt.figure(figsize=(15,10))

plt.plot(forecast_df["ds"],forecast_df["y"],marker='o',linestyle='--')
    
plt.xlabel('Time')
plt.ylabel('Avg radiation')
plt.xticks(rotation=90)
plt.show()

In [None]:
# based on: https://examples.dask.org/applications/forecasting-with-prophet.html

In [None]:
# ! pip3 install pystan==2.19.1.1

In [None]:
# ! pip3 install prophet

In [None]:
from prophet import Prophet

In [None]:
m = Prophet()
m.fit(forecast_df)

In [None]:
future = m.make_future_dataframe(periods=100, freq='H')

In [None]:
fcst = m.predict(future)

In [None]:
fig = m.plot(fcst)

In [None]:
# Z dokumentacji:
# Prophet includes a prophet.diagnostics.cross_validation function method, which uses simulated historical forecasts to provide some idea of a model’s quality.
# This is done by selecting cutoff points in the history, and for each of them fitting the model using data only up to that cutoff point. 
# We can then compare the forecasted values to the actual values.
# Internally, cross_validation generates a list of cutoff values to try. 
# Prophet fits a model and computes some metrics for each of these. 
# By default each model is fit sequentially, 
# but the models can be trained in parallel using the parallel= keyword. 
# On a single machine parallel="processes" is a good choice. For large problems where you’d like to distribute the work on a cluster, 
# use parallel="dask" after you’ve connected to the cluster by creating a Client.
# For large problems where you’d like to distribute the work on a cluster, use parallel="dask" after you’ve connected to the cluster by creating a Client.

In [None]:
df_cv = prophet.diagnostics.cross_validation(
    m, horizon="10 hours",
    parallel="dask"
)

In [None]:
df_cv

In [None]:
from sktime.datasets import load_airline
from sktime.forecasting.base import ForecastingHorizon
from sktime.forecasting.naive import NaiveForecaster
from sktime.utils.plotting import plot_series
import numpy as np

In [None]:
forecast_df

In [None]:
forecast_df_ts = forecast_df.rename(columns={'ds': 'Period'})
forecast_df_ts.set_index("Period", inplace=True)

In [None]:
forecast_series = forecast_df_ts['y']

In [None]:
forecast_series = forecast_series.asfreq(freq='H')

In [None]:
forecast_series

In [None]:
y = forecast_series

# step 2: specifying forecasting horizon
fh = np.arange(1, 37)

# step 3: specifying the forecasting algorithm
forecaster = NaiveForecaster(strategy = "last", sp = 12)

forecaster.fit(y, fh = fh)
y_pred = forecaster.predict()

In [None]:
# optional: plotting predictions and past data
plot_series(y, y_pred, labels=["y", "y_pred"])

In [None]:
sd_results = run_benchmarks(cluster,df_sd_prep)

In [None]:
fd_results = run_benchmarks(cluster,df_fd_prep)

In [None]:
sd_results

In [None]:
plt.figure(figsize=(15,10))
plot_time(sd_results)

In [None]:
plt.figure(figsize=(15,10))
plot_acceleration(sd_results)

In [None]:
plt.figure(figsize=(15,10))
plot_efficiency(sd_results)

In [None]:
fd_results

In [None]:
plt.figure(figsize=(15,10))
plot_time(fd_results)

In [None]:
plt.figure(figsize=(15,10))
plot_acceleration(sd_results)

In [None]:
plt.figure(figsize=(15,10))
plot_efficiency(sd_results)