# ETL_EDA
This file records the process of acquiring raw data, traforming them, and loading them into a MongoDB. The data are store (almost) in their raw form.

## 1. Raw Data from BPA 
The [dataset](https://transmission.bpa.gov/business/operations/Wind/baltwg.txt) is a continuously updated txt file in csv format. It contains the energy producation and load in the last 5 days. There will be some blank lines at the end for today. It can be retrieved simply by making `requests` without parameters. For more complicated API call, you may need to add query parameters.

In [1]:
import requests

url = "https://transmission.bpa.gov/business/operations/Wind/baltwg.txt"
req = requests.get(url, timeout=0.5)
req.raise_for_status()
text = req.text
print(text[:1000])
print('...')
print(text[-500:])

BPA Balancing Authority Load & Total Wind Generation
at 5-minute intervals, last 7 days
Dates: 27Nov2019 - 04Dec2019 (last updated 3Dec2019 06:15:59) Pacific Time
Based on 5-min MW readings from the BPA SCADA system for points 45583, 79687, 79682, 164377, 70681

This represents loads and resources in BPA's Balancing Authority (BA) including some that are not BPA's.
It does not include BPA loads served by transfer, scheduled out of region,
or scheduled to customers with their own BAs such as Seattle and Tacoma

BPA/Technical Operations (TOT-OpInfo@bpa.gov)

Date/Time       	Load	Wind	Hydro	Fossil/Biomass	Nuclear
11/27/2019 00:00	6586	2239	5249	496	1167
11/27/2019 00:05	6612	2259	5222	497	1168
11/27/2019 00:10	6627	2257	5156	495	1167
11/27/2019 00:15	6640	2248	5144	494	1168
11/27/2019 00:20	6571	2231	5155	493	1167
11/27/2019 00:25	6597	2217	5149	497	1166
11/27/2019 00:30	6641	2180	5141	491	1170
11/27/2019 00:35	6615	2168	5183	492	1166
11/27/2019 00:40	6591	2165	5174	4

## 2. Raw Data to Documents/Dicts
Using pandas, it is simple to parse a in-memory string. The first few lines of description need to be skipped. Datetime conversion is made and blank lines are dropped. Now the data can be easily converted to a list of dicts which is what we want for the MongoDB.

In [2]:
import pandas
from io import StringIO

df = pandas.read_csv(StringIO(text), skiprows=11, delimiter='\t')
df.columns = df.columns.str.strip()             # remove space in columns name
df['Datetime'] = pandas.to_datetime(df['Date/Time'])
df.drop(columns=['Date/Time'], axis=1, inplace=True)
df.dropna(inplace=True)  
df.head()

Unnamed: 0,Load,Wind,Hydro,Fossil/Biomass,Nuclear,Datetime
0,6586.0,2239.0,5249.0,496.0,1167.0,2019-11-27 00:00:00
1,6612.0,2259.0,5222.0,497.0,1168.0,2019-11-27 00:05:00
2,6627.0,2257.0,5156.0,495.0,1167.0,2019-11-27 00:10:00
3,6640.0,2248.0,5144.0,494.0,1168.0,2019-11-27 00:15:00
4,6571.0,2231.0,5155.0,493.0,1167.0,2019-11-27 00:20:00


## 3. Upsert MongoDB
If we fetch the data frequently, there are lots of duplicate data entry between each run. The de-duplication happens at insertion. The MongoDB API to use is `collection.replace_one(filter=..., replacement=..., upsert=True)`. The statement matches a document in MongoDB with `filter`, replaces it with `replacement` if the document exists or inserts `replacement` into the database if `filter` matches nothing. 

In [3]:
import pymongo

client = pymongo.MongoClient()

In [21]:
db = client.get_database("energy")
collection = db.get_collection("energy")
update_count = 0
for record in df.to_dict('records'):
    result = collection.replace_one(
        filter={'Datetime': record['Datetime']},    # locate the document if exists
        replacement=record,                         # latest document
        upsert=True)                                # update if exists, insert if not
    if result.matched_count > 0:
        update_count += 1
print(f"rows={df.shape[0]}, update={update_count}, "
      f"insert={df.shape[0]-update_count}")

rows=1936, update=1928, insert=8


In [1]:
import pandas as pd
import json
import requests
import pymongo

In [2]:
client = pymongo.MongoClient()

In [6]:
API_KEY = "eb0b9ddfad9c74ffb30cffe72e9ad2c8"
BASE_URL = "http://api.openweathermap.org/data/2.5/weather?units=imperial&APPID={}&q=".format(API_KEY)

In [7]:
CITY = "Providence"
URL = BASE_URL + CITY

In [8]:
req = requests.get(URL, timeout=0.5)
req.raise_for_status()
text = req.text
weather_dict = json.loads(text)

In [9]:
pd.to_datetime(weather_dict['dt'], unit='s')

Timestamp('2019-12-03 14:07:28')

In [10]:
weather_dict

{'coord': {'lon': -71.41, 'lat': 41.82},
 'weather': [{'id': 500,
   'main': 'Rain',
   'description': 'light rain',
   'icon': '10d'},
  {'id': 600, 'main': 'Snow', 'description': 'light snow', 'icon': '13d'},
  {'id': 701, 'main': 'Mist', 'description': 'mist', 'icon': '50d'}],
 'base': 'stations',
 'main': {'temp': 29.28,
  'pressure': 995,
  'humidity': 92,
  'temp_min': 26.6,
  'temp_max': 30.99},
 'visibility': 1609,
 'wind': {'speed': 17.22, 'deg': 340},
 'snow': {'1h': 0.59},
 'clouds': {'all': 90},
 'dt': 1575382048,
 'sys': {'type': 1,
  'id': 5217,
  'country': 'US',
  'sunrise': 1575374134,
  'sunset': 1575407733},
 'timezone': -18000,
 'id': 5224151,
 'name': 'Providence',
 'cod': 200}

In [60]:
# time = pd.to_datetime(weather_dict['dt'], unit='s')
# temp = weather_dict['main']['temp']
# pressure = weather_dict['main']['pressure']
# humidity = weather_dict['main']['humidity']
# wind = weather_dict['wind']['speed']
# city_id = weather_dict['id']

In [3]:
db = client.get_database("weather")

In [33]:
data = {}
for city in db.collection_names():
    raw_city_data = list(db.get_collection(city).find())
    clean_city_data = []
    for measurements in raw_city_data:
        del measurements['_id'] 
        measurements['dt'] = str(measurements['dt'])
        clean_city_data.append(measurements)
    data[city] = clean_city_data

  


In [37]:
data['Providence'][0]

{'coord': {'lon': -71.41, 'lat': 41.82},
 'weather': [{'id': 500,
   'main': 'Rain',
   'description': 'light rain',
   'icon': '10d'},
  {'id': 600, 'main': 'Snow', 'description': 'light snow', 'icon': '13d'},
  {'id': 701, 'main': 'Mist', 'description': 'mist', 'icon': '50d'}],
 'base': 'stations',
 'main': {'temp': 31.82,
  'pressure': 992,
  'humidity': 92,
  'temp_min': 30,
  'temp_max': 34},
 'visibility': 6437,
 'wind': {'speed': 13.87, 'deg': 310, 'gust': 20.8},
 'snow': {'1h': 0.25},
 'clouds': {'all': 90},
 'dt': '2019-12-03 13:35:55',
 'sys': {'type': 1,
  'id': 5823,
  'country': 'US',
  'sunrise': 1575374134,
  'sunset': 1575407733},
 'timezone': -18000,
 'id': 5224151,
 'name': 'Providence',
 'cod': 200}

In [34]:
with open('weather_data.json', 'w') as f:
    json.dump(data, f)

In [61]:
db = client.get_database("test_weather")
collection = db.get_collection("test_pvd")
update_count = 0
for record in [weather_dict]:
    record['dt'] = pd.to_datetime(record['dt'], unit='s')
    result = collection.replace_one(
        filter={'id': record['id'], 'dt': record['dt']},    # locate the document if exists
        replacement=record,                         # latest document
        upsert=True)                                # update if exists, insert if not
    if result.matched_count > 0:
        update_count += 1

In [3]:
pvd = client.get_database("weather").get_collection("Providence")

In [4]:
list(pvd.find())[0]

{'_id': ObjectId('5de680740c607955d13ffb0c'),
 'coord': {'lon': -71.41, 'lat': 41.82},
 'weather': [{'id': 500,
   'main': 'Rain',
   'description': 'light rain',
   'icon': '10d'},
  {'id': 600, 'main': 'Snow', 'description': 'light snow', 'icon': '13d'},
  {'id': 701, 'main': 'Mist', 'description': 'mist', 'icon': '50d'}],
 'base': 'stations',
 'main': {'temp': 30.2,
  'pressure': 994,
  'humidity': 100,
  'temp_min': 28,
  'temp_max': 32},
 'visibility': 2012,
 'wind': {'speed': 9.17, 'deg': 300},
 'snow': {'1h': 0.34},
 'clouds': {'all': 90},
 'dt': datetime.datetime(2019, 12, 3, 10, 28, 25),
 'sys': {'type': 1,
  'id': 5823,
  'country': 'US',
  'sunrise': 1575374134,
  'sunset': 1575407733},
 'timezone': -18000,
 'id': 5224151,
 'name': 'Providence',
 'cod': 200}

In [30]:
client.get_database("test_weather").get_collection("test_pvd").find_one()

{'_id': ObjectId('5de4423548471c4efb2759df'),
 'coord': {'lon': -71.41, 'lat': 41.82},
 'weather': [{'id': 500,
   'main': 'Rain',
   'description': 'light rain',
   'icon': '10n'},
  {'id': 600, 'main': 'Snow', 'description': 'light snow', 'icon': '13n'},
  {'id': 701, 'main': 'Mist', 'description': 'mist', 'icon': '50n'}],
 'base': 'stations',
 'main': {'temp': 30.16,
  'pressure': 1011,
  'humidity': 100,
  'temp_min': 25,
  'temp_max': 35.6},
 'visibility': 1609,
 'wind': {'speed': 10.29, 'deg': 50},
 'rain': {'1h': 0.45},
 'snow': {'1h': 0.38},
 'clouds': {'all': 90},
 'dt': datetime.datetime(2019, 12, 1, 22, 40, 12),
 'sys': {'type': 1,
  'id': 5441,
  'country': 'US',
  'sunrise': 1575201210,
  'sunset': 1575234966},
 'timezone': -18000,
 'id': 5224151,
 'name': 'Providence',
 'cod': 200}