In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import monotonically_increasing_id, udf
from pyspark.sql.types import *
# import warning
# warnings.filterwarnings("ignore")

import schemas
import tools

# 1.Methods
## load

In [2]:
def load_parquet(path, schema:StructType = None):
    """
    load apache parquet file
    """
    return sqlContext.read.schema(schema).parquet(path) if schema is not None else sqlContext.read.parquet(path)

def load_parquet_from_weather_toPandas(path, schema:StructType = None):
    """
    load apache parquet file
    """
    weather = load_parquet(path) \
                    .withColumnRenamed("__index_level_0__", "TIME") \
                    .dropna() \
                    .withColumn("id", monotonically_increasing_id())
    weather.createOrReplaceTempView("weather_temp")
    weather_dic = spark.sql("select * from weather_temp where id in (select max(id) as id from weather_temp group by TIME)").toPandas()
    return weather_dic

def load_csv(path, schema:StructType = None):
    """
    load csv file
    """
    # return sqlContext.read.schema(schema).csv(path, sep=";", header=True, schema=schema) if schema is not None else sqlContext.read.schema(schema).csv(path, sep=";", header=True)
    return sqlContext.read.csv(path, sep=";", header=True, schema=schema)


## UDF

In [3]:
def udf_by_grid(df:pd.DataFrame, type = FloatType()):
    return udf(lambda g, t: list(df[df['TIME'] == t[:14]+'00:00'][g])[0], type)

def udf_by_ws():
    schema = StructType([
        StructField("u_interp", FloatType(), True),
        StructField("v_interp", FloatType(), True)
    ])
    return udf(lambda s1, d1, s2, d2, z: tools.wind_interp(s1, d1, s2, d2, z), schema)

def udf_by_tmp():
    schema = StructType([
        StructField("tmp_interp", FloatType(), True),
    ])
    return udf(lambda t1, t2, z: tools.tmp_interp(t1, t2, z), schema)

In [4]:
def udf_regist():
    udf_type = udf(lambda x: {"H": 1.0, "W": 2.0, "P": 3.0, "M": 4.0}.get(x, 0.0), FloatType())
    udf_placement = udf(lambda x: {"LAND": 1.0, "HAV": 2.0}.get(x, 0.0), FloatType())

    udf_hour = udf(lambda x: int(x[11:13]), IntegerType())
    udf_ws10  = udf_by_grid(ws10_dic, FloatType())
    udf_ws100  = udf_by_grid(ws100_dic, FloatType())
    udf_wd10  = udf_by_grid(wd10_dic, IntegerType())
    udf_wd100  = udf_by_grid(wd100_dic, IntegerType())
    # udf_tmp2  = udf_by_grid(tmp2_dic, IntegerType())
    # udf_tmp100  = udf_by_grid(tmp100_dic, IntegerType())
    udf_ws_interp  = udf_by_ws()
    # udf_tmp_interp  = udf_by_tmp()
    return udf_type, udf_placement,udf_hour, udf_ws10, udf_ws100, udf_wd10, udf_wd100, udf_ws_interp

## Aggregate

In [5]:
def aggregate(df, join_df):
    df = df.join(join_df, on="GSRN") \
                    .select(df.GSRN, df.TIME_CET, join_df.cluster, df.VAERDI, join_df.Navhub_height, join_df.grid)

    df = df.withColumn("hour", udf_hour(df.TIME_CET)) \
                .withColumn("ws10", udf_ws10(df.grid, df.TIME_CET)) \
                .withColumn("ws100", udf_ws100(df.grid, df.TIME_CET)) \
                .withColumn("wd10", udf_wd10(df.grid, df.TIME_CET)) \
                .withColumn("wd100", udf_wd100(df.grid, df.TIME_CET))

                # .withColumn("tmp2", udf_tmp2(df.grid, df.TIME_CET)) \
                # .withColumn("tmp100", udf_tmp100(df.grid, df.TIME_CET))
    return df

# def aggregate_with_interp(df, join_df):
#     df = aggregate(df, join_df)
#     df = df.withColumn("wsCol", \
#                 udf_ws_interp(df.ws10, df.wd10, df.ws100, df.wd100, df.Navhub_height)) \
#         .withColumn("tmpCol", \
#         udf_tmp_interp(df.tmp2, df.tmp100, df.Navhub_height)) \
#                 .select("GSRN", "TIME_CET", "Placement", "Capacity_kw", "Rotor_diameter", "Navhub_height", "VAERDI", "wsCol.u_interp", "wsCol.v_interp", "tmpCol.tmp_interp")
#     return df

def aggregate_with_interp(df, join_df):
    df = aggregate(df, join_df)
    df = df.withColumn("wsCol", \
                udf_ws_interp(df.ws10, df.wd10, df.ws100, df.wd100, df.Navhub_height)) \
                .select("GSRN", "TIME_CET", "hour", "cluster", "VAERDI", "wsCol.u_interp", "wsCol.v_interp")
    return df

# 2.Data Preprocessing

In [6]:
# initialise sparkContext\
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("WindTurbine") \
    .config("spark.executor.memory", "8gb") \
    .config("spark.cores.max", "4") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file
sqlContext = SQLContext(sc)

## Settlement

In [7]:
 # to read parquet file
settlement = load_parquet("data/ITU_DATA/settlement/2019.parquet", schemas.settlement_schema)
settlement = settlement.dropna(subset =["VAERDI"]) \
            .withColumn("VAERDI", settlement["VAERDI"].cast("float"))
            # .where("TIME_CET like '%:00:%'")
settlement.persist()

DataFrame[GSRN: string, VAERDI: float, TIME_CET: string]

In [8]:
settlement.printSchema()

root
 |-- GSRN: string (nullable = true)
 |-- VAERDI: float (nullable = true)
 |-- TIME_CET: string (nullable = true)



## Weather
from ENetNEA

In [9]:
ws10_dic = load_parquet_from_weather_toPandas("data/ITU_DATA/prognosis/ENetNEA/wind_speed_10m.parquet")
ws100_dic = load_parquet_from_weather_toPandas("data/ITU_DATA/prognosis/ENetNEA/wind_speed_100m.parquet")
wd10_dic = load_parquet_from_weather_toPandas("data/ITU_DATA/prognosis/ENetNEA/wind_direction_10m.parquet")
wd100_dic = load_parquet_from_weather_toPandas("data/ITU_DATA/prognosis/ENetNEA/wind_direction_100m.parquet")
# tmp2_dic = load_parquet_from_weather_toPandas("data/ITU_DATA/prognosis/ENetNEA/temperatur_2m.parquet")
# tmp100_dic = load_parquet_from_weather_toPandas("data/ITU_DATA/prognosis/ENetNEA/temperatur_100m.parquet")

In [10]:
udf_type, udf_placement, udf_hour, udf_ws10, udf_ws100, udf_wd10, udf_wd100, udf_ws_interp = udf_regist()

## Windmills

In [11]:
windmill = load_csv("data/windmill_cleaned.csv", schemas.windmills_schema)
# BBR_municipal == 101  copengagen only
windmill = windmill.where("grid != 0") \
            .where("BBR_municipal == 101") \
            .fillna(0.1) \
            .withColumn("Turbine_type", udf_type(windmill.Turbine_type)) \
            .withColumn("Placement", udf_placement(windmill.Placement))
windmill.persist()

DataFrame[GSRN: string, Turbine_type: float, Parent_GSRN: string, BBR_municipal: string, Placement: float, UTM_x: string, UTM_y: string, Capacity_kw: float, Rotor_diameter: float, Navhub_height: float, grid: string, grid_in_range: string]

In [None]:
windmill.count()

In [None]:
windmill.printSchema()

# Test

In [None]:
power = settlement.where("TIME_CET like '2019-12-30%'")

df = power.join(windmill, on="GSRN") \
                    .select(power.GSRN, power.TIME_CET, power.VAERDI)


In [None]:
import pyspark.sql.functions as sf
result = df.groupBy("TIME_CET")\
        .agg(sf.sum("VAERDI").alias("sum")) \
        .orderBy("TIME_CET")

In [None]:
pds = result.toPandas()

In [None]:
import seaborn as sns
sns.set(style="whitegrid")
sns.lineplot(x="TIME_CET", y="value", data=pd.melt(pds, ['TIME_CET']), hue='variable',linewidth=2.5)

# 3.ML Analysis

In [12]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

## Windmill self clustering

In [13]:
feat_cols = ["Turbine_type", "Placement", "Capacity_kw", "Rotor_diameter", "Navhub_height"]
vec_assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
windmill = vec_assembler.transform(windmill).select("GSRN", "features", "Navhub_height","grid")

In [14]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(windmill)
windmill = scalerModel.transform(windmill)

optimal k = 35

In [None]:
cost = []
clusters = []

for k in range(2, 20):
    kmeans = KMeans(featuresCol='scaledFeatures',k=k, seed=1)
    model = kmeans.fit(windmill)
    cost.append(model.computeCost(windmill))
    clusters.append(k)

# Plot the cost
df_cost = pd.DataFrame(cost)
df_cost.columns = ["cost"]
df_cost.insert(0, 'cluster', clusters)

import pylab as pl
pl.plot(df_cost.cluster, df_cost.cost)
pl.xlabel('Number of Clusters')
pl.ylabel('Score')
pl.title('Elbow Curve')
pl.show()

In [15]:
kmean = KMeans(featuresCol='scaledFeatures', k=5, seed=1)
model = kmean.fit(windmill)
windmill = model.transform(windmill).withColumnRenamed("prediction", "cluster").select("GSRN", "cluster","Navhub_height", "grid")
windmill.persist()

DataFrame[GSRN: string, cluster: int, Navhub_height: float, grid: string]

## Predicte with Weather

In [16]:
train = settlement.where("TIME_CET not like '2019-03-31 02%'").where("TIME_CET not like '2019-12-30%'").sample(fraction=0.003, seed=5)
test = settlement.where("TIME_CET like '2019-12-30%'")

In [17]:
train = aggregate_with_interp(train, windmill)
test = aggregate_with_interp(test, windmill)

In [None]:
train.count(), test.count()

# Demo1

In [None]:
from sparktorch import serialize_torch_obj, SparkTorch
import torch
import torch.nn as nn
from pyspark.ml.pipeline import Pipeline

In [None]:
network = nn.Sequential(
    nn.Linear(4, 50),
    nn.LeakyReLU(),
    nn.Linear(50, 50),
    nn.LeakyReLU(),
    nn.Linear(50, 1),
    nn.Softmax(dim=1)
)

In [None]:
# Build the pytorch object
torch_obj = serialize_torch_obj(
    model=network,
    criterion=nn.CrossEntropyLoss(),
    optimizer=torch.optim.Adam,
    lr=0.0001
)

In [None]:
# Setup features
feat_cols = ["hour", "cluster", "u_interp", "v_interp"]
vec_assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
# Create a SparkTorch Model with torch distributed. Barrier execution is on by default for this mode.
spark_model = SparkTorch(
    inputCol='scaledFeatures',
    labelCol='VAERDI',
    predictionCol='predictions',
    torchObj=torch_obj,
    iters=50,
    verbose=1
)

# Can be used in a pipeline and saved.
p = Pipeline(stages=[vec_assembler, scaler, spark_model]).fit(train)

# Demo2

In [None]:
# Setup features
feat_cols = ["hour", "cluster","u_interp", "v_interp"]
vec_assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
train = vec_assembler.transform(train).select("GSRN","TIME_CET", "features", "VAERDI")
test = vec_assembler.transform(test).select("GSRN","TIME_CET", "features", "VAERDI")

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(train)
train = scalerModel.transform(train)
test = scalerModel.transform(test)

In [None]:
train = train.toPandas()
test = test.toPandas()

In [None]:
x_train = train[["features"]]
y_train = train[["VAERDI"]]
x_test = test[["features"]]
y_test = test[["VAERDI"]]
timeline = test[["TIME_CET"]]

In [None]:
x_train = pd.DataFrame([list(t) for t in x_train["features"].values.tolist()], columns=["f1","f2","f3","f4"])
x_test = pd.DataFrame([list(t) for t in x_test["features"].values.tolist()], columns=["f1","f2","f3","f4"])

In [None]:
import torch
dtype = torch.float

In [None]:
x_train_tensor = torch.tensor(x_train.values, dtype = dtype)
x_test_tensor = torch.tensor(x_test.values, dtype = dtype)

y_train_tensor = torch.tensor(y_train.values, dtype = dtype)
y_test_tensor = torch.tensor(y_test.values, dtype = dtype)

In [None]:
x_train_tensor.shape, y_train_tensor.shape

In [None]:
inp = 4
out = 1

hid = 50

# nn.MSELoss() is implemented by default as: ((input-target)**2).mean()
loss_fn = torch.nn.MSELoss()

learning_rate = 0.0001

In [None]:
model = torch.nn.Sequential(torch.nn.Linear(inp, hid),
                            torch.nn.Tanh(),
                            torch.nn.Linear(hid, out))

In [None]:
for iter in range(10000):
  y_pred = model(x_train_tensor)
  loss = loss_fn(y_pred, y_train_tensor)

  if iter % 1000 == 0:
    print(iter, loss.item())

  model.zero_grad()
  loss.backward()

  with torch.no_grad():
    for param in model.parameters():
      param -= learning_rate * param.grad

In [None]:
y_pred_tensor = model(x_test_tensor)
y_pred = y_pred_tensor.detach().numpy()

In [None]:
y_test["pred"] = pd.Series(y_pred.tolist()).apply(lambda x: x[0])
y_test["time"] = pd.Series(timeline["TIME_CET"])

In [None]:
y_test = y_test.groupby(y_test["time"]).agg({"VAERDI":lambda x:sum(x), "pred":lambda x:sum(x)})

In [None]:
y_test

In [None]:
import seaborn as sns
sns.set(style="whitegrid")
sns.lineplot(x="time", y="value", data=pd.melt(y_test, ['time']), hue='variable',linewidth=2.5)

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(15,6))

plt.plot(y_test["pred"].values, label='Predicted')
plt.plot(y_test["VAERDI"].values, label='Actual')

plt.legend()
plt.show()