# Data Ingestion to MongoDB

In [2]:
from pymongo import MongoClient
from pymongo.operations import SearchIndexModel
import pandas as pd
import math
import time

## Load Observations and Wind Power Data

In [4]:
obs = pd.read_csv("./data/meteorological_data.csv", encoding = "ISO-8859-1")

In [5]:
wp = pd.read_csv("./data/wind_power_generation_data.csv", encoding = "ISO-8859-1")

In [6]:
obs.head(10)

Unnamed: 0,day,start time 3h interval (utc),location,height above ground in m,u-component wind in m/s,v-component wind in m/s,temperature in °C,global surface radiation in W/m2
0,1,00:00,1,2,5.0,-0.3,1.65,0.0
1,1,03:00,1,2,8.22,-1.2,1.45,2.0
2,1,06:00,1,2,5.36,-2.91,0.65,170.0
3,1,09:00,1,2,6.94,-3.17,1.55,350.0
4,1,12:00,1,2,6.58,-2.26,2.95,570.0
5,1,15:00,1,2,7.28,-1.95,2.85,368.0
6,1,18:00,1,2,2.7,0.12,-0.25,0.0
7,1,21:00,1,2,3.66,0.45,0.25,0.0
8,2,00:00,1,2,5.14,1.7,1.05,0.0
9,2,03:00,1,2,7.23,1.91,1.45,2.0


In [7]:
obs.dtypes

day                                   int64
start time 3h interval (utc)         object
location                              int64
height above ground in m              int64
u-component wind in m/s             float64
v-component wind in m/s             float64
temperature in °C                   float64
global surface radiation in W/m2    float64
dtype: object

In [8]:
wp.head()

Unnamed: 0,day,start time quarter hour,wind generation in MW
0,1,00:00,146.15
1,1,00:15,126.28
2,1,00:30,121.11
3,1,00:45,121.32
4,1,01:00,123.91


In [9]:
wp.dtypes

day                          int64
start time quarter hour     object
wind generation in MW      float64
dtype: object

## Data Iteration

* Let's iterate each time point in each measurement points and also historical points to day backward
* Then we'll combine those information to one data row
* And then we'll add total wind power time series to that row
* Finally we'll take time point, wind power time series and calculate the vector to that data row and store it to dataframe

In [11]:
#TODO: Sensitive Data, Replace password
CONNECTION_STRING = "mongodb+srv://aiinaction:<password>@starteconcluster1.cg9gxnb.mongodb.net/?retryWrites=true&w=majority&appName=StarteconCluster1"

In [12]:
client = MongoClient(CONNECTION_STRING)

In [13]:
databases = client.list_database_names()
databases

['EcoWind', 'sample_mflix', 'sample_restaurants', 'admin', 'local']

In [14]:
db = client[databases[0]]

In [15]:
db.list_collection_names()

['wind_power_generation_data_05042017_competition',
 'wind_power_generation_data_04042017_competition',
 'meteorological_data_04042017_competition',
 'wind_power_generation_data_07042017_competition',
 'wind_power_generation_data_15042017_competition',
 'wind_power_generation_data_10042017_competition',
 'meteorological_data_10042017_competition',
 'meteorological_data_14042017_competition',
 'meteorological_data_12042017_competition',
 'wind_power_generation_data_08042017_competition',
 'wind_power_generation_data_06042017_competition',
 'meteorological_data_09042017_competition',
 'wind_power_generation_data_03042017_competition',
 'measurements',
 'meteorological_data_13042017_competition',
 'meteorological_data',
 'meteorological_data_15042017_competition',
 'wind_power_generation_data_13042017_competition',
 'meteorological_data_16042017_competition',
 'meteorological_data_07042017_competition',
 'wind_power_generation_data_11042017_competition',
 'wind_power_generation_data_09042

In [16]:
coll = db["observations_wind_power_vectors"]

In [17]:
def min_max_scale(x):
    return (x - min(x)) / (max(x) - min(x)) * 2 - 1

In [18]:
def df_to_vector(df, d, t, tp):
    df2 = df[
                (df["start time 3h interval (utc)"] == tp[t]) & 
                (df["day"] == d)
            ]
    arr = []
    arr.extend(min_max_scale(df2["u-component wind in m/s"]).to_list())
    arr.extend(min_max_scale(df2["v-component wind in m/s"]).to_list())
    arr.extend(min_max_scale(df2["temperature in °C"]).to_list())
    arr.extend(min_max_scale(df2["global surface radiation in W/m2"]).to_list())

    # Replace NaN or zero with epsilon
    valid = [0.0000000000000001 if (math.isnan(x) or x == 0) else x for x in arr]    
    return valid

In [19]:
def get_history_point(h, d, t):
    if (t-h > -1):
        return d, t
    else:
        f = math.floor((h-1)/8)+1
        return d-f, f*8+(t-h)

In [20]:
#hist 
days = 365 # full 1-366, but we'll leave padding
timepoints = ["00:00","03:00","06:00", "09:00","12:00","15:00","18:00","21:00"] # 3 hours periods along the day
altitudes = [2, 80, 100] # 2 meters,80 meters, 100 meters

In [21]:
id = 1
for d in range(2, days):
    for t in range(len(timepoints)):
        vectors = []
        # time 0
        vectors = vectors + df_to_vector(obs, d, t, timepoints)
        # time-1 to time-5
        for h in range(1,6): 
            d1, t1 = get_history_point(h, d, t)
            vectors = vectors + df_to_vector(obs, d1, t1, timepoints)

        # wind power time series, 48 steps forward
        if (t > len(timepoints)-5):
            t1 = t - 4
            d1 = d + 1
        else:
            t1 = t + 4
            d1 = d
            
        df = wp[
                (wp["day"].astype(str).str.zfill(3)+":"+wp["start time quarter hour"].astype(str) >= str(d).zfill(3)+":"+timepoints[t]) & 
                (wp["day"].astype(str).str.zfill(3)+":"+wp["start time quarter hour"].astype(str) < str(d1).zfill(3)+":"+timepoints[t1])
            ]
        windpower = df["wind generation in MW"].to_list()

        item = {
          "id" : id,
          "date" : str(str(d).zfill(3)+":"+timepoints[t]),
          "vectors" : vectors,
          "windpower" : windpower,
        }
        coll.insert_one(item)
        id = id + 1

In [22]:
item_details = coll.find()
for item in item_details:
    print(item)
    break

{'_id': ObjectId('6856b9d15a04f5f1ddebb24d'), 'id': 1, 'date': '002:00:00', 'vectors': [-0.6844106463878328, 0.4296577946768061, 0.581749049429658, -1.0, -0.3498098859315588, -0.17870722433460062, -0.09505703422053235, 0.8669201520912548, 1.0, -0.5494296577946769, 0.33460076045627374, 0.4676806083650189, -0.42395437262357416, 0.4486692015209124, 0.581749049429658, -0.88212927756654, -0.44486692015209117, -0.36882129277566544, -0.9429657794676807, 0.29657794676806093, 0.4676806083650189, -0.5057034220532319, 0.2585551330798479, 0.3916349809885933, -0.7889733840304183, -0.12167300380228152, -0.026615969581749166, -0.6787072243346007, 0.33460076045627374, 0.48669201520912564, -0.11111111111111105, 0.19444444444444442, 0.22222222222222232, 0.38888888888888884, 0.8611111111111112, 0.9444444444444444, -0.7222222222222222, -0.9444444444444444, -1.0, 0.4722222222222221, 0.8333333333333333, 0.8611111111111112, -0.05555555555555558, 0.13888888888888884, 0.16666666666666674, 0.5, 0.77777777777777

In [23]:
search_index_model = SearchIndexModel(
  definition={
    "fields": [
      {
        "type": "vector",
        "numDimensions": 720,
        "path": "vectors",
        "similarity": "cosine",
        "quantization": "scalar"
      }
    ]
  },
  name="wind_power_vector_index",
  type="vectorSearch"
)

In [24]:
index = coll.create_search_index(model=search_index_model)
print("New search index named " + index + " is building.")

New search index named wind_power_vector_index is building.


In [25]:
pred=None
if pred is None:
    pred = lambda index: index.get("queryable") is True
    
while True:
    indices = list(coll.list_search_indexes(index))
    print("Polling to check if the index is ready. This may take up to a minute.")

    if len(indices) and pred(indices[0]):
        break
    time.sleep(5)
print(index + " is ready for querying.")

Polling to check if the index is ready. This may take up to a minute.
Polling to check if the index is ready. This may take up to a minute.
Polling to check if the index is ready. This may take up to a minute.
Polling to check if the index is ready. This may take up to a minute.
Polling to check if the index is ready. This may take up to a minute.
Polling to check if the index is ready. This may take up to a minute.
wind_power_vector_index is ready for querying.
