-
Notifications
You must be signed in to change notification settings - Fork 0
/
Python_Code
284 lines (225 loc) · 12.6 KB
/
Python_Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import findspark
findspark.init('/data/spark-1.6.0-bin-hadoop2.6')
from pyspark import SparkContext, HiveContext
from pyspark.sql import functions as F
from pyspark.sql import Window as w
%matplotlib inline
import seaborn as sns
import pandas as pd
import numpy as np
import math
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType, DoubleType, IntegerType
from shapely.wkb import loads
from shapely import wkt
## package installed
# load Spark and HiveContext
sc = SparkContext()
hc = HiveContext(sc)
# At first, dealing with weather data to fill each seconds into the whole dataset.
start_time = '2015-07-01 00:00:00'
end_time = '2015-07-02 00:00:00'
all_times = pd.date_range(start_time,end_time, freq='s', closed='left')
all_times = hc.createDataFrame(pd.DataFrame((all_times).astype(str), columns=['ts']))
# generate one days timedata by seconds and stored a dataframe into all_times.
#all_times.head(10)
#all_times.count()
weather_table = "birds.weather"
weather = hc.read.table(weather_table).persist() # read weather data
#weather.head(10)
weather_sec = weather.withColumn('ts', F.date_format('dt', 'yyyy-MM-dd HH:mm:ss'))['dt','ts']
# add a new column 'ts' with new time format in order to drop the half seconds in the original data and extract this two column
ts = None
for delta_sec in range(31):
if ts is None:
ts = hc.createDataFrame(pd.DataFrame((all_times+(24*60*60)*delta_sec).astype(str), columns=['ts']))
else:
ts = ts.unionAll(hc.createDataFrame(pd.DataFrame((all_times+(24*60*60)*delta_sec).astype(str), columns=['ts'])))
# ts.head(100)
# generate one month time in a for loop
#weather_full = ts.join(weather_sec, on=weather_sec.ts==ts.ts, how='left').drop(weather_sec.ts)
weather_full = all_times.join(weather_sec, on=weather_sec.ts==all_times.ts, how='left').drop(weather_sec.ts)
weather_full = weather_full.orderBy('ts').dropDuplicates(['ts'])
#weather_full.count()
# join two tables, one is with all seconds, one is weather data in order to get the weather information by each seconds
# since there are misssing data, we need to fill in it by its previous weather information, but the chanllenge is the
# misssing interval is flexible
window = w.partitionBy().orderBy('ts')
lag_windows = [F.lag(F.col('dt'),count=lag).over(window) for lag in range(1,10)]
# create the function to fill in the data for interval from 1 to 10,interval depends on the data,in some day the interval time is rather bi
# function # notice the interval time vary from 1 to .... on different days
weather_filld = (weather_full
.withColumn('dt_new',F.coalesce(*lag_windows))
.withColumn('dt', F.when(F.col('dt').isNull(), F.col('dt_new')).otherwise(F.col('dt'))))['ts','dt']
#weather_filld.head(100)
# fill the data in terms of the time
weather_fulldata= (weather_filld
.join(weather, on=weather.dt==weather_filld.dt, how='left')
.drop(weather.dt)
.drop(weather_filld.dt)
.drop(weather.id)
)
#full_data.count()
# fill the data with all information
df = weather_filld.filter(weather_filld.dt.isNull()).toPandas()
df.groupby(pd.to_datetime(df.ts).dt.date).count()
ts.filter(ts.ts<'2015-07-01 01:20:55').filter(ts.ts>'2015-07-01 01:20:49').head(100)
# check the data, some days have large interval of missing data , dropping it may be a good idea.
trackestimate_table = "birds.trackestimate"
trackestimate = hc.read.table(trackestimate_table)
# read trackestimate table which contanis each location
track_subset = (trackestimate.where("dt > '2015-07-01 00:00:00'")
.where("dt < '2015-07-01 23:59:59'")).persist()
#.where("classification_id != 10")
#.where("classification_id != 5")
#.where("classification_id != 1"))
#.where("distance_travelled < 10000"))
# select one day data
trackestimate_subset = track_subset.withColumn('dt', F.date_format('dt', 'yyyy-MM-dd HH:mm:ss'))
# transform time format in order to drop the half seconds in the original data and extract this two column( the same as the weather table)
udf_x = UserDefinedFunction(lambda x: str(loads(x,hex=True).__geo_interface__['coordinates'][0]), StringType())
# develop a function to get the coordinates from specific value(longitude)
udf_y = UserDefinedFunction(lambda x: str(loads(x,hex=True).__geo_interface__['coordinates'][1]), StringType())
# develop a function to get the coordinates from specific value(latitude)
trackestimate_subset_coord=trackestimate_subset.withColumn('position_x', udf_x(F.col('position')))
trackestimate_subset_coord=trackestimate_subset_coord.withColumn('position_y', udf_y(F.col('position')))
# transform it into position_x(longitude) and position_y(latitude) in terms of the function above
trackestimate_subset_coord=trackestimate_subset_coord.withColumn('position_x',F.col('position_x').astype('float'))
trackestimate_subset_coord=trackestimate_subset_coord.withColumn('position_y',F.col('position_y').astype('float'))
# transform the type from string to float
#trackestimate_subset.count()
data_join = (trackestimate_subset_coord
.join(weather_fulldata,on=trackestimate_subset_coord.dt==weather_fulldata.ts,how='left')
.drop(weather_fulldata.ts)
.drop(weather_fulldata.dewpoint)
.drop(weather_fulldata.relativehumidity)
.drop(trackestimate_subset_coord.position)
.persist()
)
#data.filter(data.relativehumidity.isNull()).count() - data.count() # check if all NULL
#data.filter(data.dewpoint.isNull()).count() - data.count() # check if all NULL
#join weather full data and trackeastimate data in terms of the timestamp and drop two features which didn't contain any value
# so now we get most of the information at each seconds
track_table = "birds.track"
track = hc.read.table(track_table)
# read track table which contanis trajectory to get some additional information
track = track.select('id','classification_id','species_id','distance_travelled','score')
data_join = (data_join
.join(track,on=track.id==data_join.track_id)
.drop(track.id))
# join data and track in terms of the id to get more inforamtion
# determine the fixed area
l_lon = 4.705
r_lat = 52.368
r_lon = 4.815
l_lat = 52.283
dlon = r_lon-l_lon
dlat = r_lat-l_lat
length0 = r_lon*111.699 * np.cos(r_lat * np.pi/180) - l_lon*111.699 * np.cos(r_lat * np.pi/180)
length = (int(np.ceil(length0*10)))/10.
width0 = r_lat*110.574 - l_lat*110.574
width = (int(np.ceil(width0*10)))/10.
# the area is from the images of bird control team and get the coordinates from google maps approximately and calculate the
# length and width
# the potential extended distance
#track_table = "birds.track"
#track = hc.read.table(track_table) # input track table which contanis trajectory
#track_trajectory = track.select('trajectory')
#def distance(a):
# return ((a[0][0]*111.321*cos(a[0][1]*np.pi/180) - (a[1][0]*111.321*cos(a[1][1]*np.pi/180)))**2
# + ((a[0][1] - a[1][1])*111)**2)**0.5
#udf = UserDefinedFunction(lambda x: str(loads(x,hex=True).__geo_interface__['coordinates']), StringType())
#udf = UserDefinedFunction(lambda x: str(loads(x,hex=True)), StringType())
#a = track_trajectory.withColumn('text', udf(F.col('trajectory')))
# the potential extended distance
track_table = "birds.track"
track = hc.read.table(track_table).where("classification_id != 1")
# read thetrack table which contanis trajectory and drop the flight trajectory
def max_dist(b):
return max(((np.array(b)[1:,0]*111.321*np.cos(np.array(b)[1:,1]*np.pi/180)
- b[0][0]*111.321*np.cos(b[0][1]*np.pi/180))**2 +
((np.array(b)[1:,1] - b[0][1])*111)**2)**.5)
# define the function of calculating the maximum direct distance between two coordinates
udf = UserDefinedFunction(lambda x: str(max_dist(loads(x,hex=True).__geo_interface__['coordinates'])), StringType())
#udf = UserDefinedFunction(lambda x: max_dist(loads(x,hex=True).__geo_interface__['coordinates']), DoubleType())
# develop the function to apply on each row
track_trajectory = (track
.select('trajectory')
.dropna()
.withColumn('max_dist', udf(F.col('trajectory')))
.withColumn('max_dist', F.col('max_dist').astype('float'))
.select('max_dist')
)
# get the maximum distance from each trajectory and transform it into proper type.
nth_percentile = math.ceil(track_trajectory
.sort(track_trajectory.max_dist.desc())
.limit(int((1-0.99) * track_trajectory.count()))
.sort(track_trajectory.max_dist.asc())
.first()[0]
)
# calculate the 99% largest distance and to extend the exist area in terms of this value
# since some of insane geese could produce crazy long distance, we set 99% as threshold
print nth_percentile
lat_det = nth_percentile/111
lon_det = nth_percentile/(np.cos(l_lat*np.pi/180)*111.321)
bound_x = [(l_lon-lon_det,l_lat-lat_det)]
bound_y = [(r_lon+lon_det,r_lat+lat_det)]
# calculate the whole area to determine the grid
n= 10
# determine number of cells in each dimension( this could be shifted in terms of some conditions)
index_cells = pd.RangeIndex(1,n+1,1)
#index_cells = hc.createDataFrame(pd.DataFrame((index_cells), columns=['index']))
x_cells=pd.qcut(([l_lon-lon_det,r_lon+lon_det]),n,retbins=True)[1:]
y_cells=pd.qcut(([l_lat-lat_det,r_lat+lat_det]),n,retbins=True)[1:]
# determine the bound of each cells and the index notice it calculate through pandas
bins_x=np.array(x_cells).tolist()[0]
bins_y=np.array(y_cells).tolist()[0]
# get the bound value
interval_lon = (r_lon+lon_det-(l_lon-lon_det))/n
interval_lat = (r_lat+lat_det-(l_lat-lat_det))/n
# get the interval value of each cells
min_lon = l_lon-lon_det
min_lat = l_lat-lat_det
data= (data_join.filter(data_join.position_x > bound_x[0][0])
.filter(data_join.position_x < bound_y[0][0])
.filter(data_join.position_y > bound_x[0][1])
.filter(data_join.position_y < bound_y[0][1])).persist()
# drop those location out of the area
data_x = data.withColumn('x_categories', F.ceil((F.col('position_x') - min_lon)/interval_lon))
data = data_x.withColumn('y_categories', F.ceil((F.col('position_y') - min_lat)/interval_lat))
data = data.fillna(0)
# assign those location( longitude and latitude) into a index in terms of the grid number set before
time_interval = 30
# set the time interval( this could be changed by other conditions)
start_timestep = 1435708800 - 7200
# 2015-07-01 00:00:00 2 hours difference from the utc time zone
data = (data
.withColumn('timestep', F.ceil((F.unix_timestamp('dt')-start_timestep)/time_interval))
)
# assign the each seconds into a time index in terms of the time interval
#udf_lon = UserDefinedFunction(lambda x: round((x-(l_lon-lon_det))/interval_lon + 0.5), DoubleType())
#udf_lat = UserDefinedFunction(lambda x: round((x-(l_lat-lat_det))/interval_lat + 0.5), DoubleType())
#data_x= data.withColumn('x_categories',udf_lon(F.col('position_x')))
#data = data_x.withColumn('y_categories',udf_lat(F.col('position_y')))
# assign assign each location into corresponding index value #remark not fancy function,simple calculation could do it directly
# assign na value into 0
data_count=data.groupBy('x_categories', 'y_categories', 'timestep').count()
data_mean=data.groupBy('x_categories', 'y_categories', 'timestep').mean('position_x','position_y',
'velocity','airspeed',
'heading','heading_vertical',
'peak_mass','mass','mass_correction',
'barometricpressure','airtemperature',
'winddirectiontrue','winddirectionmagnetic',
'windspeed','distance_travelled','score')
# group by the condition and calculate the mean and count the number of location in each cells
cond = [data_count.x_categories == data_mean.x_categories,
data_count.y_categories == data_mean.y_categories,
data_count.timestep == data_mean.timestep]
# set the join condition
data_grid = (data_mean.join(data_count, cond, 'inner')
.drop(data_mean.timestep)
.drop(data_mean.y_categories)
.drop(data_mean.x_categories)
)
data_grid = data_grid.orderBy('timestep','x_categories','y_categories')
data_grid.head()