In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
import os
import codecs
import string
import datetime
import pickle
from concurrent.futures import ThreadPoolExecutor, wait, as_completed, ProcessPoolExecutor

In [52]:
class WeatherData(object):
    
    def __init__(self, path):
        if not os.path.exists(path):
            raise RuntimeError('File Not Exists: %s' % path)
        
        self._data = None
        self.head = ''
        self.meta = []
        
        self.year, self.month, self.day, self.hour = 0, 0, 0, 0
        self.longitude_interval, self.latitude_interval = .0, .0
        self.latitude_start, self.latitude_end, self.longitude_start, self.longitude_end = .0, .0, .0, .0
        self.shape = 0
        
        self.load_data(path)
    
    @property
    def data(self):
        return self._data

    @data.setter
    def data(self, value):
        self._data = value
    
    # 16 01 01 08 000 200   0.250000  -0.250000  80.000000 150.000000  60.000000 -10.000000 281 281
    #
    # 2016 01 01 08 0 400 
    # 0.250000 -0.250000 80.000000 140.000000 50.000000 -10.000000 241 241 2.000000 0.000000 100.000000 1.000000 0.000000
    #
    def load_data(self, path):
        
        try:
            with codecs.open(path, 'r') as f:
                self.head = f.readline().strip()

                for i in range(3):
                    self.meta.extend(f.readline().split())
                    if len(self.meta) == 19:
                        break
                if len(self.meta) != 19:
                    raise RuntimeError('File Meta Error')
                
                self.year, self.month, self.day, self.hour = self.meta[:4]
                self.latitude_interval, self.longitude_interval = map(float, self.meta[6:8])
                self.latitude_start, self.latitude_end, self.longitude_start, self.longitude_end = map(float, self.meta[8:12])
                self.shape = map(int, self.meta[12:14])

                _shape = self.shape
                _shape.reverse()
                self.data = np.transpose(np.array(f.read().split()).astype(np.float).reshape(_shape))

                self.latitudes = np.arange(self.latitude_start, self.latitude_end + self.latitude_interval, self.latitude_interval)
                self.longitudes = np.arange(self.longitude_start, self.longitude_end + self.longitude_interval, self.longitude_interval)

                latv, longv = np.meshgrid(self.latitudes, self.longitudes[::-1])
                self.grid = np.stack([latv, longv], axis=2).reshape(-1, 2)

                self.df = pd.DataFrame(self.data, index=self.latitudes, columns=self.longitudes)
        except Exception as e:
            print 'Error: %s, %s' % (e, path)
            raise

In [4]:
#more /mnt/disk/weather_project/data/160101/2D-999/16010108.000

In [5]:
wd = WeatherData('/mnt/disk/weather_project/data/160101/2D-999/16010108.000')

In [6]:
wd.df.iloc[:5]

Unnamed: 0,40.0,39.875,39.75,39.625,39.5,39.375,39.25,39.125,39.0,38.875,...,11.125,11.0,10.875,10.75,10.625,10.5,10.375,10.25,10.125,10.0
100.0,-18.08,-18.89,-19.71,-19.68,-19.02,-19.64,-22.05,-24.11,-25.08,-24.02,...,23.36,23.32,23.29,23.29,23.39,23.54,23.67,23.7,23.76,23.82
100.125,-17.99,-18.68,-19.36,-19.49,-19.02,-19.21,-20.64,-22.24,-23.33,-24.77,...,23.32,23.32,23.29,23.26,23.32,23.48,23.61,23.64,23.67,23.76
100.25,-17.83,-17.96,-18.49,-18.83,-18.71,-19.11,-20.27,-20.96,-22.61,-23.64,...,23.26,23.29,23.29,23.23,23.26,23.45,23.54,23.57,23.67,23.82
100.375,-17.71,-18.39,-19.55,-20.27,-20.55,-20.58,-20.52,-20.46,-20.68,-20.93,...,23.2,23.26,23.29,23.32,23.29,23.39,23.54,23.57,23.64,23.82
100.5,-17.61,-18.8,-20.24,-21.08,-21.68,-21.74,-21.14,-20.71,-20.49,-19.93,...,23.2,23.26,23.32,23.39,23.39,23.39,23.51,23.54,23.64,23.79


In [7]:
wd.grid

array([[ 100.   ,   10.   ],
       [ 100.125,   10.   ],
       [ 100.25 ,   10.   ],
       ..., 
       [ 129.75 ,   40.   ],
       [ 129.875,   40.   ],
       [ 130.   ,   40.   ]])

In [8]:
# more /mnt/disk/weather_project/data1/161126/GH-850/16112608.006

In [9]:
# more /mnt/disk/weather_project/data1/160515/windshear-speed-700-sfc/16051508.006

In [10]:
# WeatherData('/mnt/disk/weather_project/data1/160515/windshear-speed-700-sfc/16051508.006')

In [11]:
# xx = '/mnt/disk/weather_project/data'
# for root, dirs, files in os.walk(xx):
#     for f in files:
#         filename = os.path.join(root, f)
#         print filename
#         wd = WeatherData(filename)
        
#         print wd.latitude_start, wd.latitude_end, wd.latitude_interval, wd.longitude_start, wd.longitude_end, wd.longitude_interval

# hehehe

In [12]:
def read_meta(meta_path='feature_meta2'):

    metas = []
    _first_path = ''
    for line in open(meta_path):
        _meta = map(string.strip, line.split('\t'))
        if len(_meta) > 1 and _meta[0]:
            _first_path = _meta[0]
        
        _meta[0] = _meta[0] if _meta[0] else _first_path
        if len(_meta) > 1 and not _meta[1]:
            continue
        metas.append(_meta)
    
    return metas

In [13]:
meta = read_meta()

In [14]:
# def closest_node(node, nodes):
#     if np.all(nodes[0] < node) and np.all(node < nodes[-1]):
#         nodes = np.asarray(nodes)
#         dist_2 = np.sum((nodes - node)**2, axis=1)

#         return nodes[np.argsort(dist_2)[:4]]
#     return None

# def closest_point(node, nodes):
#     # todo check node in nodes or pass

#     if np.all(nodes[0] < node) and np.all(node < nodes[-1]):
#         nodes = np.asarray(nodes)
#         dist = np.sum((nodes - node)**2, axis=1)

#         return nodes[np.argsort(dist)[:4]]
#     return None

def closest_point(node, nodes, start):
    # todo check node in nodes or pass

    if np.all(nodes[0] < node) and np.all(node < nodes[-1]):
        nodes = np.asarray(nodes)
        dist = np.sum((nodes - node)**2, axis=1)

        around_points = nodes[np.argsort(dist)[:4]]
        
        dist = np.sum((start - around_points)**2, axis=1)
        return around_points[np.argsort(dist)[0]]
    
    return None

In [15]:
def closest_points(nodes, points, start):
    ic = []
    for node in nodes:
        index, column = closest_point(node, points, start)
        ic.append([index, column])
        
    return np.asarray(ic)

In [16]:
# more /mnt/disk/weather2016/rain3/r3/16010120.000

In [17]:
class ActualData(object):
    
    def __init__(self, path):
        
        self.actual_date = os.path.split(path)[-1].split('.', 1)[0]
        
        str_data = open(path, 'r').read().replace('TESTB', '999999')
        self.actual_data = np.asarray([line.split()[0:5] for line in str_data.splitlines()[4:]], dtype=np.float32)
        


In [18]:
ad = ActualData('/mnt/disk/weather2016/rain3/r3/16010108.000')

ad.actual_data[0, 1:3]

array([ 121.58499908,   29.80699921], dtype=float32)

In [19]:
ad.actual_data

array([[  9.99999000e+05,   1.21584999e+02,   2.98069992e+01,
          0.00000000e+00,   0.00000000e+00],
       [  4.66860000e+04,   1.21222000e+02,   2.50750008e+01,
          2.57000008e+01,   0.00000000e+00],
       [  4.66880000e+04,   1.21433998e+02,   2.49930000e+01,
          9.69999981e+00,   0.00000000e+00],
       ..., 
       [  7.99160000e+04,   1.18300003e+02,   2.74389992e+01,
          1.67199997e+02,   0.00000000e+00],
       [  7.99170000e+04,   1.17754997e+02,   2.73759995e+01,
          2.37000000e+02,   0.00000000e+00],
       [  7.99180000e+04,   1.18124001e+02,   2.75240002e+01,
          1.87000000e+02,   1.00000001e-01]], dtype=float32)

In [20]:
closest_point(ad.actual_data[0, 1:3], wd.grid, [wd.latitude_start, wd.longitude_start])

array([ 121.5  ,   29.875])

In [21]:
index, column = closest_point(ad.actual_data[0, 1:3], wd.grid, [wd.latitude_end, wd.longitude_start])

In [22]:
wd.df[column][index]

1.1699999999999999

In [23]:
ad.actual_data[0]

array([  9.99999000e+05,   1.21584999e+02,   2.98069992e+01,
         0.00000000e+00,   0.00000000e+00], dtype=float32)

In [43]:
actual_root = '/mnt/disk/weather2016/rain3/r3' # 16010108.000
feature_root = '/mnt/disk/weather_project/data' # 16010108.003
clear_root = '/mnt/disk/weather_project/data_clear'

In [25]:
feature_names = ['-'.join(m) for m in meta]

In [26]:
id_ic = pickle.load(open('id_ic.pkl'))id_ic = pickle.load(open('id_ic.pkl'))

In [27]:
# index column

# for f in os.listdir(actual_root):
#     print f
#     fp = os.path.join(actual_root, f)
#     ad = ActualData(fp)
    
#     # latitude, longitude
    
#     for _id, latitude, longitude in ad.actual_data[:, 0:3]:
#         for fn in feature_names:
#             _key = '%s_%s' % (_id, fn)
#             if _key not in id_ic:
#                 fp = os.path.join('/mnt/disk/weather_project/data/160101', fn)
#                 pred_data_files = os.listdir(fp)
#                 pred_data_files.sort()
#                 pred_data_file = os.path.join(fp, pred_data_files[0])
#                 wd = WeatherData(pred_data_file)

#                 r = closest_point([latitude, longitude], wd.grid, [wd.latitude_end, wd.longitude_start])
#                 if r is not None:
#                     index, column = r
#                     id_ic[_key] = [index, column]
#                 else:
#                     id_ic[_key] = [None, None]

In [28]:
# pickle.dump(id_ic, open('id_ic.pkl', 'wb')) # id + feature_name

In [29]:
def doo(actual_file):
    try:
        print 'Start: %s' % actual_file

        actual_path = os.path.join(actual_root, actual_file)
        ad = ActualData(actual_path)

        columns = ['id', 'datetime', 'actual_data']
        columns.extend(['-'.join(m) for m in meta])
        feature_data = pd.DataFrame(np.zeros([ad.actual_data.shape[0], 129]), index=ad.actual_data[:, 0].astype(np.int), columns=columns)
    #     feature_data.index = feature_data.index.map(np.float32)
        feature_data['id'] = ad.actual_data[:, 0]
        feature_data['datetime'] = ad.actual_date
        feature_data['actual_data'] = ad.actual_data[:, 4]


        actual_date = datetime.datetime.strptime(actual_file.split('.', 1)[0], '%y%m%d%H')
        if actual_date.hour >= 8 and actual_date.hour < 20:
            _pred_date = actual_date - datetime.timedelta(hours=8)
            pred_date = datetime.datetime(year=_pred_date.year, month=_pred_date.month, day=_pred_date.day, hour=8)

        else:
            _pred_date = actual_date - datetime.timedelta(hours=20)
            pred_date = datetime.datetime(year=_pred_date.year, month=_pred_date.month, day=_pred_date.day, hour=20)

        pred_file = '%s.00%s' % (pred_date.strftime('%y%m%d%H'), _pred_date.hour)
        pred_date = pred_file[:6]
        
        
#         pred_date = actual_file[:6]
        date_path = os.path.join(feature_root, pred_date)

        feature_paths = [os.path.join(date_path, '-'.join(m)) for m in meta]

        del_index = []
        for feature_path in feature_paths:

            feature_name = os.path.split(feature_path)[-1]
            
            file_path = os.path.join(feature_path, pred_file)

            wd = WeatherData(file_path)

            fea_column = []
            fea_index = []
            fea_data = []
            for node in ad.actual_data[:, 0:3]:
#                     index, column = closest_point(node[1:3], wd.grid, [wd.latitude_end, wd.longitude_start])

                _key = '%s_%s' % (node[0], feature_name)
                if _key in id_ic:
                    index, column = id_ic[_key]
                else:
                    print _key

                    ii = node[0].astype(np.int)
                    if ii not in del_index:
                        del_index.append(ii)
                    continue

                point_data = wd.df[column][index]

                feature_data.iloc[feature_data.index.get_loc(node[0].astype(np.int)), feature_data.columns.get_loc(feature_name)] = point_data

        for inx in del_index:
            if inx in feature_data.index:
                feature_data.drop(inx, inplace=True)

        f = os.path.join(clear_root, '%s.csv' % actual_file)
        feature_data.to_csv(open(f, 'wb'), index=False)
        print 'Over: %s' % f
    except Exception as e:
        print e
        raise

In [54]:
# pool = ThreadPoolExecutor(20)
pool = ProcessPoolExecutor(16)

In [None]:
actual_files = os.listdir(actual_root)
actual_files.sort()

i = 0
futures = []
for actual_file in actual_files:
    f = os.path.join(clear_root, '%s.csv' % actual_file)
    if os.path.exists(f):
        print 'Pass: %s' % actual_file
        continue
    
    futures.append(pool.submit(doo, actual_file))
    
#     doo(actual_file)
#     i += 1
#     if i > 10: break

# as_completed(futures)
# results = [r.done() for r in as_completed(futures)]
wait(futures)
pool.shutdown()
del pool

print 'Done.'

