In [1]:
from netCDF4 import Dataset, netcdftime, num2date, date2num, date2index
from datetime import datetime, timedelta, date
import pytz
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.basemap import Basemap, addcyclic, shiftgrid
import pymongo
from pymongo import IndexModel, ASCENDING, DESCENDING
from pprint import pprint
from os import listdir
import os
import pandas as pd
import fnmatch
import logging
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
num_cores = 3  # multiprocessing.cpu_count()

In [3]:
def insertToMongo(vars):
    # Stack all 2d arrays in one multi-d array
    this_day = vars['this_day']
    lons = vars['lons']
    lats = vars['lats']
    
    DAT = np.array([vars['ci'],   # 0
                    vars['sst'],  # 1
                    vars['istl1'],# 2
                    vars['sp'],   # 3
                    vars['stl1'], # 4
                    vars['msl'],  # 5
                    vars['u10'],  # 6
                    vars['v10'],  # 7
                    vars['t2m'],  # 8
                    vars['d2m'],  # 9
                    vars['al'],   # 10
                    vars['lcc'],  # 11
                    vars['mcc'],  # 12
                    vars['hcc'],  # 13
                    vars['si10'], # 14
                    vars['skt']   # 15
                    ])
    
    # Shift the grid so lons go from -180 to 180 instead of 0 to 360.
    DAT_shift, lons_shift = shiftgrid(
        lon0=180., datain=DAT, lonsin=lons, start=False)
    lon, lat = np.meshgrid(lons_shift, lats)
    this_dayhh = datetime.strptime(
        "%s-%s-%sT00:00:00Z" % (this_day.year, this_day.month, this_day.day), "%Y-%m-%dT%H:%M:%SZ")
    this_year = this_dayhh.year

    # Insert into MongoDB
    mongo_host_local = 'mongodb://localhost:27017/'
    con = pymongo.MongoClient(mongo_host_local)
    db = con.ECMWF

    #testlon = lon[51:53, 51:52]  # test with a smaller subset
    this_id = 0
    for (i, j), val in np.ndenumerate(lon):  # lon or testlon): !!!!!!!!!!
        this_id += 1
        db.ERAINT_monthly.insert_one({
            "id_grid": this_id,
            "date": this_dayhh,
            "year": this_year,
            "ci": round(DAT_shift[0, i, j], ndigits=2),
            "sst": round(DAT_shift[1, i, j], ndigits=2),
            "istl1": round(DAT_shift[2, i, j], ndigits=2),
            "sp": round(DAT_shift[3, i, j], ndigits=2),
            "stl1": round(DAT_shift[4, i, j], ndigits=2),
            "msl": round(DAT_shift[5, i, j], ndigits=2),
            "u10": round(DAT_shift[6, i, j], ndigits=2),
            "v10": round(DAT_shift[7, i, j], ndigits=2),
            "t2m": round(DAT_shift[8, i, j], ndigits=2),
            "d2m": round(DAT_shift[9, i, j], ndigits=2),
            "al": round(DAT_shift[10, i, j], ndigits=2),
            "lcc": round(DAT_shift[11, i, j], ndigits=2),
            "mcc": round(DAT_shift[12, i, j], ndigits=2),
            "hcc": round(DAT_shift[13, i, j], ndigits=2),
            "si10": round(DAT_shift[14, i, j], ndigits=2),
            "skt": round(DAT_shift[15, i, j], ndigits=2)
        })


def insertOneDay(this_day, ncfile, DF):
    # Choose one arbitrary day
    # this_day = days.iloc[0]
    logging.info(this_day)
    ncfile00 = '%s%s' % (downloadDir, ncfile)
    fh = Dataset(ncfile00, mode='r')
    lons = fh.variables['longitude'][:]
    lats = fh.variables['latitude'][:]
    # Extract the data for this day out of the nc file
    times = DF[DF.date == this_day].time
    ind = date2index(dates=times.tolist(), nctime=fh.variables['time'])

    vars = {'ci': fh.variables['ci'][ind], # Sea-ice cover [0-1]
            'sst': fh.variables['sst'][ind], # Sea surface temperature [K]
            'istl1': fh.variables['istl1'][ind], # Ice temp layer1 [K]
            'sp': fh.variables['sp'][ind], # Surface pressure [Pa]
            'stl1': fh.variables['stl1'][ind], # Soil temp lev1 [K]
            'msl': fh.variables['msl'][ind], # Mean SLP [Pa]
            'u10': fh.variables['u10'][ind], # wind-u [m/s]
            'v10': fh.variables['v10'][ind],
            't2m': fh.variables['t2m'][ind], # 2m temp [K]
            'd2m': fh.variables['d2m'][ind], # 2 metre dewpoint temperature[K]
            'al': fh.variables['al'][ind], # Surface albedo [0-1]
            'lcc': fh.variables['lcc'][ind], # Low cloud cover [0-1]
            'mcc': fh.variables['mcc'][ind], # Medium cloud cover [0-1]
            'hcc': fh.variables['hcc'][ind], # High cloud cover [0-1]
            'si10': fh.variables['si10'][ind], # 10m wind speed [m/s]
            'skt': fh.variables['skt'][ind], # Skin temperature [K]
            'lons': lons,
            'lats': lats,
            'this_day': this_day}

    insertToMongo(vars)
    if (this_day == date(1980, 1, 2)):
        # Setup the indexes just once
        doIndexing()

    fh.close()


def getDatesDF(nc_file):  # insertFile(nc_file):
    logging.info("Inserting %s" % (nc_file))
    nc_file00 = '%s%s' % (downloadDir, nc_file)
    fh = Dataset(nc_file00, mode='r')
    nctime = fh.variables['time'][:]
    t_unit = fh.variables['time'].units
    fh.close()
    time = num2date(nctime, units=t_unit)
    # Create a data frame
    df = pd.DataFrame({'time': time})
    df = df.assign(date=df.time.dt.date)
    # Do some aggregation
    gdf = pd.DataFrame(df.groupby('date').size().rename('ndoc')).reset_index()
    df2 = pd.merge(left=df, right=gdf, on="date")
    # exclude datesInMongo (data already ingested)
    DF = df2[~pd.to_datetime(df2.date).isin(datesInMongo)]

In [8]:
downloadDir = '/home/dmasson/data/era-interim/'
files00 = listdir(downloadDir)
files = fnmatch.filter(files00, '*multivarm1*.nc')
files.sort()
this_file = files[0]
this_file

'era-int_multivarm1_1979-01-01_to_2017-08-31.nc'

In [None]:

DF = getDatesDF(this_file)
DF = getDatesDF(this_file)
DF
days = DF.date.drop_duplicates()
days
this_day = days[1]
this_day
ncfile = this_file
nc_file = this_file
logging.info("Inserting %s" % (nc_file))
nc_file00 = '%s%s' % (downloadDir, nc_file)
fh = Dataset(nc_file00, mode='r')
nctime = fh.variables['time'][:]
t_unit = fh.variables['time'].units
fh.close()
logging.info(this_day)
ncfile00 = '%s%s' % (downloadDir, ncfile)
fh = Dataset(ncfile00, mode='r')
lons = fh.variables['longitude'][:]
lats = fh.variables['latitude'][:]
times = DF[DF.date == this_day].time
ind = date2index(dates=times.tolist(), nctime=fh.variables['time'])
import codecs, os;__pyfile = codecs.open('''/tmp/py3002rjD''', encoding='''utf-8''');__code = __pyfile.read().encode('''utf-8''');__pyfile.close();os.remove('''/tmp/py3002rjD''');exec(compile(__code, '''/home/dmasson/CloudStation/code/winter_predictor/era_interim_insert.py''', 'exec'));
vars
this_day
this_day = vars['this_day']
lons = vars['lons']
lats = vars['lats']
lons
sizeof(vars['ci'])
size(vars['ci'])
vars['ci'].size
vars['ci'].shape
vars['ci'][1,3]
vars['ci'][1:4,3]
import readline
for i in range(readline.get_current_history_length()):
    print readline.get_history_item(i + 1)
import codecs, os;__pyfile = codecs.open('''/tmp/py30025nc''', encoding='''utf-8''');__code = __pyfile.read().encode('''utf-8''');__pyfile.close();os.remove('''/tmp/py30025nc''');exec(compile(__code, '''/home/dmasson/CloudStation/code/winter_predictor/era_interim_insert.py''', 'exec'));
import codecs, os;__pyfile = codecs.open('''/tmp/py3002GAL''', encoding='''utf-8''');__code = __pyfile.read().encode('''utf-8''');__pyfile.close();os.remove('''/tmp/py3002GAL''');exec(compile(__code, '''/home/dmasson/CloudStation/code/winter_predictor/era_interim_insert.py''', 'exec'));
import codecs, os;__pyfile = codecs.open('''/tmp/py3002UEk''', encoding='''utf-8''');__code = __pyfile.read().encode('''utf-8''');__pyfile.close();os.remove('''/tmp/py3002UEk''');exec(compile(__code, '''/home/dmasson/CloudStation/code/winter_predictor/era_interim_insert.py''', 'exec'));