# ETL_EDA
This file records the process of acquiring raw data, traforming them, and loading them into a MongoDB. 

In [1]:
import requests
import json
import numpy as np
import pandas as pd
import logging
import utils
import sched
import time
import pymongo

## 1. Raw Data from NASA 
The [dataset](https://api.nasa.gov/assets/insight/InSight%20Weather%20API%20Documentation.pdf) is a continuously updated json file. It contains data of Mars weather in the past seven days. It can be retrieved simply by making `requests` without parameters. Data is updated every Martian day, which is 37 minutes longer than earth day. 

In [5]:
key = "j8jSUUXusA5glsJa3q5bQaDad6sE4H2u9K5rFbST"
requested = requests.get(f"https://api.nasa.gov/insight_weather/?api_key={key}&feedtype=json&ver=1.0").text    # Download data
requested

## 2. Raw Data to Documents/Dicts
Using json and pandas package, we can easily transform downloaded json files into workable dataframe. We take sol days as keys, extract min, max temperature, atmosphere pressure and wind directions as values for each day. But not all data is valid. To deal with this, we check the validity value of each subject for each day, then replace missing values with last day's corresponding value. Now the data can be easily converted to a list of dicts which is what we want for the MongoDB.

In [6]:
requested = json.loads(requested)    # Transform data into dictionary
main_keys = requested['sol_keys']    # Take sol days as unique keys
def valid_check(subject, index):
    """Check if value is valid"""
    return requested['validity_checks'][index][subject]['valid']

# Create a dict to store all data, set default value to -1
df_requested = {"sol_day": [-1], "date": [-1], "min_temp": [-1], "max_temp": [-1], "pressure": [-1], "wind": [-1]}

for i in main_keys:
    # Write data into dict
    df_requested["sol_day"].append(i)
    df_requested["date"].append(requested[i]['Last_UTC'])
    # If data is not valid/empty, use the last value
    df_requested["min_temp"].append(requested[i]['AT']["mn"] if valid_check('AT', i) else df_requested["min_temp"][-1])
    df_requested["max_temp"].append(requested[i]['AT']["mx"] if valid_check('AT', i) else df_requested["max_temp"][-1])
    df_requested["pressure"].append(requested[i]['PRE']["av"] if valid_check('PRE', i) else df_requested["pressure"][-1])
    df_requested["wind"].append(requested[i]["WD"] if valid_check('WD', i) else df_requested["wind"][-1])

# Convert to dataframe
df_requested = pd.DataFrame(df_requested)
df_requested.drop(0, inplace=True)
data = df_requested.to_dict(orient='records')

In [7]:
df_requested.head()

Unnamed: 0,sol_day,date,min_temp,max_temp,pressure,wind
1,370,2019-12-12T09:57:46Z,-98.97,-19.47,658.193,"{'0': {'compass_degrees': 0.0, 'compass_point'..."
2,371,2019-12-13T10:37:21Z,-96.625,-19.785,657.663,"{'0': {'compass_degrees': 0.0, 'compass_point'..."
3,372,2019-12-14T11:16:56Z,-97.195,-21.232,656.774,"{'1': {'compass_degrees': 22.5, 'compass_point..."
4,373,2019-12-15T11:56:31Z,-98.115,-20.061,656.752,"{'0': {'compass_degrees': 0.0, 'compass_point'..."
5,374,2019-12-16T12:36:07Z,-97.305,-21.154,654.061,"{'0': {'compass_degrees': 0.0, 'compass_point'..."


In [9]:
print(data[0])

{'sol_day': '370', 'date': '2019-12-12T09:57:46Z', 'min_temp': -98.97, 'max_temp': -19.47, 'pressure': 658.193, 'wind': {'0': {'compass_degrees': 0.0, 'compass_point': 'N', 'compass_right': 0.0, 'compass_up': 1.0, 'ct': 4}, '1': {'compass_degrees': 22.5, 'compass_point': 'NNE', 'compass_right': 0.382683432365, 'compass_up': 0.923879532511, 'ct': 2}, '10': {'compass_degrees': 225.0, 'compass_point': 'SW', 'compass_right': -0.707106781187, 'compass_up': -0.707106781187, 'ct': 28893}, '11': {'compass_degrees': 247.5, 'compass_point': 'WSW', 'compass_right': -0.923879532511, 'compass_up': -0.382683432365, 'ct': 9754}, '12': {'compass_degrees': 270.0, 'compass_point': 'W', 'compass_right': -1.0, 'compass_up': -0.0, 'ct': 6898}, '13': {'compass_degrees': 292.5, 'compass_point': 'WNW', 'compass_right': -0.923879532511, 'compass_up': 0.382683432365, 'ct': 754}, '14': {'compass_degrees': 315.0, 'compass_point': 'NW', 'compass_right': -0.707106781187, 'compass_up': 0.707106781187, 'ct': 432}, '1

## 3. Upsert MongoDB
If we fetch the data frequently, there are lots of duplicate data entry between each run. The de-duplication happens at insertion. The MongoDB API to use is `collection.replace_one(filter=..., replacement=..., upsert=True)`. The statement matches a document in MongoDB with `filter`, replaces it with `replacement` if the document exists or inserts `replacement` into the database if `filter` matches nothing. Credit to [Martin Ma](https://github.com/blownhither). 

In [10]:
client = pymongo.MongoClient('localhost', 27017)
db = client.get_database("MarsWeather")
collection = db.get_collection("DailyWeather")
update_count = 0
for record in data:
    result = collection.replace_one(
        filter={'sol_day': record['sol_day']},    # locate the document if exists
        replacement=record,                         # latest document
        upsert=True)                                # update if exists, insert if not
    if result.matched_count > 0:
        update_count += 1
print(f"rows={df_requested.shape[0]}, update={update_count}, "
      f"insert={df_requested.shape[0]-update_count}")

rows=7, update=3, insert=4
