In [18]:
import timeit
start = timeit.default_timer()

import os
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'
import sys
print(sys.version)

3.10.9 (main, Mar  1 2023, 18:23:06) [GCC 11.2.0]


In [19]:
from datetime import datetime, timezone
import folium
from folium.plugins import MarkerCluster
import geohash_tools as gh
import numpy as np
import pyarrow as pa
import pyspark as ps

In [20]:
sp = spark.read.load('hdfs://orion11:14001/nam/2019/01/05/*',
                     format='csv',
                     sep='\t',
                     inferSchema='true',
                     header='true')
schema = sp.schema

sp = spark.read.schema(schema).load('hdfs://orion11:14001/nam/201*/*/*/*',
                     format='csv',
                     sep='\t',
                     header='true')

                                                                                

In [21]:
sp = sp.select(['1_time','2_lat','3_lon','temperature_surface','pressure_maximum_wind',\
                'relative_humidity_zerodegc_isotherm','total_precipitation_surface_3_hour_accumulation',\
                'vegetation_surface','visibility_surface'])

In [22]:
def sum_parts(a,b):
    return tuple(a[i]+b[i] for i in range(len(a)))

def avg_parts(x):
    n = x[-1]
    return tuple(e/n for e in x[:-1])

In [23]:
# ( ( geohash yearmonth ) ( temp wind hmdy prcp veg vis 1 ) )
# -> sum, avg, sort features by ( geohash yearmonth )

brdd = sp.rdd \
    .map(lambda x: ( ( gh.encode(x[1],x[2],precision=2),\
                     datetime.fromtimestamp(x[0]/1000, timezone.utc).year*100 + datetime.fromtimestamp(x[0]/1000, timezone.utc).month ),\
                     ( x[3],x[4],x[5],x[6],x[7],x[8],1 ) )) \
    .reduceByKey(lambda a,b: sum_parts(a,b)) \
    .map(lambda x: ( x[0], avg_parts(x[1] ))) \
    .sortBy(lambda x: x[0]) \
    .cache()

                                                                                

In [24]:
def to_lists(x):
    return np.vstack(x)

def appends(a,b):
    return np.hstack((a,np.vstack(b)))

def extends(a,b):
    return np.hstack((a,b))

def means_stdevs(x):
    return np.hstack((np.vstack(np.mean(x,axis=1)),np.vstack(np.std(x,axis=1))))

def pcc_corrs(features, means_stds, n):
    features -= means_stds[:,:1]
    features = np.sum(features[1:]*features[0], axis=1) / n
    stdvs = means_stds[1:,1]*means_stds[0,1]
    return np.divide(features, stdvs, out=np.zeros_like(features), where=stdvs!=0)

In [25]:
gh2s = brdd.map(lambda x: x[0][0]).distinct().sortBy(lambda x: x[0]).collect()
n = brdd.filter(lambda x: x[0][0] == gh2s[0]).count()
n_fts = len(sp.columns)-3

                                                                                

In [26]:
# for every geohash
# filter for specific geohash
# ( geohash ( temp wind hmdy prcp veg vis ) ) # 60 records per geohash; averages of all features per month
# -> np.array w/ shape 6,60, basically transposing the RDD's into an np array; here npa[0,:] is 60 temperature values
# -> ( features_npa, means_and_stddevs_npa )
#      features_npa.shape = 6,60, means_and_stddevs_npa.shape = 6,2; [:,0] is means [:,1] is stddevs
# -> pcc_correlations_npa

pcc_corr_by_gh2 = np.empty((0,5),dtype=np.float32)
for i in range(len(gh2s)):
    nar = brdd \
        .filter(lambda x: x[0][0] == gh2s[i]) \
        .map(lambda x: ( x[0][0], x[1] )) \
        .combineByKey(to_lists, appends, extends) \
        .map(lambda x: (x[1], means_stdevs(x[1]))) \
        .map(lambda x: pcc_corrs(x[0],x[1],n)) \
        .collect()[0]
    pcc_corr_by_gh2 = np.vstack((pcc_corr_by_gh2, nar))

In [27]:
# temp wind hmdy prcp veg vis
n_cfts = n_fts-1
features = sp.columns[-n_cfts:]

sorted_idxs = pcc_corr_by_gh2.argsort(axis=None)
max_idxs = sorted_idxs[-5:][::-1]
min_idxs = sorted_idxs[:5]

global_maxes = np.hstack((np.vstack(max_idxs//n_cfts), np.vstack(max_idxs%n_cfts)))
global_mins = np.hstack((np.vstack(min_idxs//n_cfts), np.vstack(min_idxs%n_cfts)))

feature_maxes = np.hstack((np.vstack(pcc_corr_by_gh2.argmax(axis=0)), np.vstack(np.arange(n_cfts))))
feature_mins = np.hstack((np.vstack(pcc_corr_by_gh2.argmin(axis=0)), np.vstack(np.arange(n_cfts))))

feature_avgs = np.array([pcc_corr_by_gh2[:,i].mean(where=pcc_corr_by_gh2[:,i]!=0)
                         for i in range(n_cfts)])

print('\n>>>> Global Max Positive Correlations <<<<')
for idx in global_maxes:
    print("\nGeohash region '{}', feature '{}'".format(gh2s[idx[0]], features[idx[1]]))
    print(' ', pcc_corr_by_gh2[idx[0],idx[1]])

print('\n\n>>>> Global Max Negative Correlations <<<<')
for idx in global_mins:
    print("\nGeohash region '{}', feature '{}'".format(gh2s[idx[0]], features[idx[1]]))
    print(' ', pcc_corr_by_gh2[idx[0],idx[1]])

print('\n\n>>>> Feature-Wise Max Positive Correlations <<<<')
for idx in feature_maxes:
    print("\nGeohash region '{}', feature '{}'".format(gh2s[idx[0]], features[idx[1]]))
    print(' ', pcc_corr_by_gh2[idx[0],idx[1]])

print('\n\n>>>> Feature-Wise Max Negative Correlations <<<<')
for idx in feature_mins:
    print("\nGeohash region '{}', feature '{}'".format(gh2s[idx[0]], features[idx[1]]))
    print(' ', pcc_corr_by_gh2[idx[0],idx[1]])

print('\n\n>>>> Feature-Wise Correlation Stats <<<<')
for idx in range(len(feature_avgs)):
    print("\nFeature '{}'".format(features[idx]))
    print('  Mean:', feature_avgs[idx])
print()


>>>> Global Max Positive Correlations <<<<

Geohash region 'dh', feature 'vegetation_surface'
  0.964572490339564

Geohash region 'dp', feature 'vegetation_surface'
  0.9592737922153067

Geohash region 'f2', feature 'vegetation_surface'
  0.9583824254826753

Geohash region 'c2', feature 'vegetation_surface'
  0.9550114720408455

Geohash region 'cc', feature 'vegetation_surface'
  0.9515366447433111


>>>> Global Max Negative Correlations <<<<

Geohash region 'f8', feature 'relative_humidity_zerodegc_isotherm'
  -0.8498466690525379

Geohash region '9m', feature 'vegetation_surface'
  -0.8295065501993546

Geohash region 'fb', feature 'relative_humidity_zerodegc_isotherm'
  -0.8118559372483265

Geohash region 'dr', feature 'relative_humidity_zerodegc_isotherm'
  -0.809929596789645

Geohash region 'cb', feature 'relative_humidity_zerodegc_isotherm'
  -0.8024115730606475


>>>> Feature-Wise Max Positive Correlations <<<<

Geohash region 'dm', feature 'pressure_maximum_wind'
  0.54304274842

In [28]:
print(timeit.default_timer()-start)

205.48678222799208
