/
util_list.py
1255 lines (1131 loc) · 42 KB
/
util_list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
util_list module. Contains the mflist class.
This classes encapsulates modflow-style list inputs away
from the individual packages. The end-user should not need to
instantiate this class directly.
some more info
"""
import os
import warnings
import numpy as np
import pandas as pd
from ..datbase import DataInterface, DataListInterface, DataType
from ..utils.recarray_utils import create_empty_recarray
class MfList(DataInterface, DataListInterface):
"""
a generic object for handling transient boundary condition lists
Parameters
----------
package : package object
The package object (of type :class:`flopy.pakbase.Package`) to which
this MfList will be added.
data : varies
the data of the transient list (optional). (the default is None)
Attributes
----------
mxact : int
the max number of active bc for any stress period
Methods
-------
add_record(kper,index,value) : None
add a record to stress period kper at index location
write_transient(f) : None
write the transient sequence to the model input file f
check_kij() : None
checks for boundaries outside of model domain - issues warnings only
See Also
--------
Notes
-----
Examples
--------
"""
def __init__(
self,
package,
data=None,
dtype=None,
model=None,
list_free_format=None,
binary=False,
):
if isinstance(data, MfList):
for attr in data.__dict__.items():
setattr(self, attr[0], attr[1])
if model is None:
self._model = package.parent
else:
self._model = model
self._package = package
return
self._package = package
if model is None:
self._model = package.parent
else:
self._model = model
if dtype is None:
assert isinstance(self.package.dtype, np.dtype)
self.__dtype = self.package.dtype
else:
self.__dtype = dtype
self.__binary = binary
self.__vtype = {}
self.__data = {}
if data is not None:
self.__cast_data(data)
self.__df = None
if list_free_format is None:
if package.parent.version == "mf2k":
list_free_format = False
self.list_free_format = list_free_format
@property
def name(self):
return self.package.name
@property
def mg(self):
return self._model.modelgrid
@property
def model(self):
return self._model
@property
def package(self):
return self._package
@property
def data_type(self):
return DataType.transientlist
@property
def plottable(self):
return True
def get_empty(self, ncell=0):
d = create_empty_recarray(ncell, self.dtype, default_value=-1.0e10)
return d
def export(self, f, **kwargs):
from .. import export
return export.utils.mflist_export(f, self, **kwargs)
def append(self, other):
"""append the recarrays from one MfList to another
Parameters
----------
other: variable: an item that can be cast in to an MfList
that corresponds with self
Returns
-------
dict of {kper:recarray}
"""
if not isinstance(other, MfList):
other = MfList(
self.package,
data=other,
dtype=self.dtype,
model=self._model,
list_free_format=self.list_free_format,
)
msg = f"MfList.append(): other arg must be MfList or dict, not {type(other)}"
assert isinstance(other, MfList), msg
other_kpers = list(other.data.keys())
other_kpers.sort()
self_kpers = list(self.data.keys())
self_kpers.sort()
new_dict = {}
for kper in range(self._model.nper):
other_data = other[kper].copy()
self_data = self[kper].copy()
other_len = other_data.shape[0]
self_len = self_data.shape[0]
if (other_len == 0 and self_len == 0) or (
kper not in self_kpers and kper not in other_kpers
):
continue
elif self_len == 0:
new_dict[kper] = other_data
elif other_len == 0:
new_dict[kper] = self_data
else:
new_len = other_data.shape[0] + self_data.shape[0]
new_data = np.recarray(new_len, dtype=self.dtype)
new_data[:self_len] = self_data
new_data[self_len : self_len + other_len] = other_data
new_dict[kper] = new_data
return new_dict
def drop(self, fields):
"""drop fields from an MfList
Parameters
----------
fields : list or set of field names to drop
Returns
-------
dropped : MfList without the dropped fields
"""
if not isinstance(fields, list):
fields = [fields]
names = [n for n in self.dtype.names if n not in fields]
dtype = np.dtype(
[(k, d) for k, d in self.dtype.descr if k not in fields]
)
spd = {}
for k, v in self.data.items():
# because np 1.9 doesn't support indexing by list of columns
newarr = np.array([self.data[k][n] for n in names]).transpose()
newarr = np.array(list(map(tuple, newarr)), dtype=dtype).view(
np.recarray
)
for n in dtype.names:
newarr[n] = self.data[k][n]
spd[k] = newarr
return MfList(self.package, spd, dtype=dtype)
@property
def data(self):
return self.__data
@property
def df(self):
if self.__df is None:
self.__df = self.get_dataframe()
return self.__df
@property
def vtype(self):
return self.__vtype
@property
def dtype(self):
return self.__dtype
# Get the itmp for a given kper
def get_itmp(self, kper):
if kper not in list(self.__data.keys()):
return None
if self.__vtype[kper] is None:
return -1
# If an external file, have to load it
if self.__vtype[kper] == str:
return self.__fromfile(self.__data[kper]).shape[0]
if self.__vtype[kper] == np.recarray:
return self.__data[kper].shape[0]
# If not any of the above, it must be an int
return self.__data[kper]
@property
def mxact(self):
mxact = 0
for kper in list(self.__data.keys()):
mxact = max(mxact, self.get_itmp(kper))
return mxact
@property
def fmt_string(self):
"""Returns a C-style fmt string for numpy savetxt that corresponds to
the dtype"""
if self.list_free_format is not None:
use_free = self.list_free_format
else:
use_free = True
if self.package.parent.has_package("bas6"):
use_free = self.package.parent.bas6.ifrefm
# mt3d list data is fixed format
if "mt3d" in self.package.parent.version.lower():
use_free = False
fmts = []
for field in self.dtype.descr:
vtype = field[1][1].lower()
if vtype in ("i", "b"):
if use_free:
fmts.append("%9d")
else:
fmts.append("%10d")
elif vtype == "f":
if use_free:
fmts.append("%15s")
else:
fmts.append("%10G")
elif vtype == "o":
if use_free:
fmts.append("%9s")
else:
fmts.append("%10s")
elif vtype == "s":
msg = (
"MfList.fmt_string error: 'str' type found in dtype. "
"This gives unpredictable results when "
"recarray to file - change to 'object' type"
)
raise TypeError(msg)
else:
raise TypeError(
f"MfList.fmt_string error: unknown vtype in field: {field}"
)
if use_free:
fmt_string = " " + " ".join(fmts)
else:
fmt_string = "".join(fmts)
return fmt_string
# Private method to cast the data argument
# Should only be called by the constructor
def __cast_data(self, data):
# If data is a list, then all we can do is try to cast it to
# an ndarray, then cast again to a recarray
if isinstance(data, list):
# warnings.warn("MfList casting list to array")
try:
data = np.array(data)
except Exception as e:
raise ValueError(
f"MfList error: casting list to ndarray: {e!s}"
)
# If data is a dict, the we have to assume it is keyed on kper
if isinstance(data, dict):
if not list(data.keys()):
raise ValueError("MfList error: data dict is empty")
for kper, d in data.items():
try:
kper = int(kper)
except Exception as e:
raise ValueError(
f"MfList error: data dict key {kper} not integer: "
f"{type(kper)}\n{e!s}"
)
# Same as before, just try...
if isinstance(d, list):
try:
d = np.array(d)
except Exception as e:
raise ValueError(
f"MfList error: casting list to ndarray: {e}"
)
if isinstance(d, np.recarray):
self.__cast_recarray(kper, d)
elif isinstance(d, np.ndarray):
self.__cast_ndarray(kper, d)
elif isinstance(d, pd.DataFrame):
self.__cast_dataframe(kper, d)
elif isinstance(d, int):
self.__cast_int(kper, d)
elif isinstance(d, str):
self.__cast_str(kper, d)
elif d is None:
self.__data[kper] = -1
self.__vtype[kper] = None
else:
raise ValueError(
"MfList error: unsupported data type: "
f"{type(d)} at kper {kper}"
)
# A single recarray - same MfList for all stress periods
elif isinstance(data, np.recarray):
self.__cast_recarray(0, data)
# A single ndarray
elif isinstance(data, np.ndarray):
self.__cast_ndarray(0, data)
# A single dataframe
elif isinstance(data, pd.DataFrame):
self.__cast_dataframe(0, data)
# A single filename
elif isinstance(data, str):
self.__cast_str(0, data)
else:
raise ValueError(
f"MfList error: unsupported data type: {type(data)}"
)
def __cast_str(self, kper, d):
# If d is a string, assume it is a filename and check that it exists
assert os.path.exists(d), (
f"MfList error: dict filename (string) '{d}' value for "
f"kper {kper} not found"
)
self.__data[kper] = d
self.__vtype[kper] = str
def __cast_int(self, kper, d):
# If d is an integer, then it must be 0 or -1
if d > 0:
raise ValueError(
"MfList error: dict integer value for "
"kper {:10d} must be 0 or -1, "
"not {:10d}".format(kper, d)
)
if d == 0:
self.__data[kper] = 0
self.__vtype[kper] = None
else:
self.__data[kper] = -1
self.__vtype[kper] = None
def __cast_recarray(self, kper, d):
assert d.dtype == self.__dtype, (
f"MfList error: recarray dtype: {d.dtype} doesn't match "
f"self dtype: {self.dtype}"
)
self.__data[kper] = d
self.__vtype[kper] = np.recarray
def __cast_ndarray(self, kper, d):
d = np.atleast_2d(d)
if d.dtype != self.__dtype:
assert d.shape[1] == len(self.dtype), (
f"MfList error: ndarray shape {d.shape} doesn't match "
f"dtype len: {len(self.dtype)}"
)
try:
self.__data[kper] = np.core.records.fromarrays(
d.transpose(), dtype=self.dtype
)
except Exception as e:
raise ValueError(
f"MfList error: casting ndarray to recarray: {e!s}"
)
self.__vtype[kper] = np.recarray
def __cast_dataframe(self, kper, d):
self.__cast_recarray(
kper, d.to_records(index=False).astype(self.dtype)
)
def get_dataframe(self, squeeze=False):
"""
Cast recarrays for stress periods into single
dataframe containing all stress periods.
Parameters
----------
squeeze : bool
Reduce number of rows in dataframe to only include
stress periods where a variable changes.
Returns
-------
df : dataframe
Dataframe of shape nrow = nper x ncells, ncol = nvar. If
the squeeze option is chosen, nper is the number of
stress periods where at least one cells is different,
otherwise it is equal to the number of keys in MfList.data.
"""
# make a dataframe of all data for all stress periods
names = ["per", "k", "i", "j"]
if "MNW2" in self.package.name:
names += ["wellid"]
# find relevant variable names
# may have to iterate over the first stress period
for per in range(self._model.nper):
if hasattr(self.data[per], "dtype"):
varnames = list(
[n for n in self.data[per].dtype.names if n not in names]
)
break
# create list of dataframes for each stress period
# each with index of per, k, i, j
dfs = []
for per in self.data.keys():
recs = self.data[per]
if recs is None or len(recs) == 0:
# add an empty dataframe if a stress period is
# empty (e.g. no pumping during a predevelopment
# period)
columns = names + varnames
dfi = pd.DataFrame(data=None, columns=columns)
dfi = dfi.set_index(names)
else:
dfi = pd.DataFrame.from_records(recs)
dfi.loc[:, "per"] = per
dfi = dfi.set_index(names)
dfs.append(dfi)
df = pd.concat(dfs, axis=0)
# add unique integer index
df.loc[:, "no"] = 1
df.loc[:, "no"] = df.groupby(["per", "k", "i", "j"])["no"].cumsum() - 1
df = df.set_index("no", append=True)
# squeeze: remove duplicate periods
if squeeze:
changed = (
df.groupby(["k", "i", "j", "no"]).diff().ne(0.0).any(axis=1)
)
changed = changed.groupby("per").transform(lambda s: s.any())
df = df.loc[changed, :]
df = df.reset_index()
df.loc[:, "node"] = df.loc[:, "i"] * self._model.ncol + df.loc[:, "j"]
df = df.loc[
:,
names
+ [
"node",
]
+ [v for v in varnames if not v == "node"],
]
return df
def add_record(self, kper, index, values):
# Add a record to possible already set list for a given kper
# index is a list of k,i,j or nodes.
# values is a list of floats.
# The length of index + values must be equal to the number of names
# in dtype
assert len(index) + len(values) == len(self.dtype), (
"MfList.add_record() error: length of index arg + "
"length of value arg != length of self dtype"
)
# If we already have something for this kper, then add to it
if kper in list(self.__data.keys()):
if self.vtype[kper] == int:
# If a 0 or -1, reset
self.__data[kper] = self.get_empty(1)
self.__vtype[kper] = np.recarray
elif self.vtype[kper] == str:
# If filename, load into recarray
d = self.__fromfile(self.data[kper])
d.resize(d.shape[0], d.shape[1])
self.__data[kper] = d
self.__vtype[kper] = np.recarray
elif self.vtype[kper] == np.recarray:
# Extend the recarray
self.__data[kper] = np.append(
self.__data[kper], self.get_empty(1)
)
else:
self.__data[kper] = self.get_empty(1)
self.__vtype[kper] = np.recarray
rec = list(index)
rec.extend(list(values))
try:
self.__data[kper][-1] = tuple(rec)
except Exception as e:
raise ValueError(
f"MfList.add_record() error: adding record to recarray: {e}"
)
def __getitem__(self, kper):
# Get the recarray for a given kper
# If the data entry for kper is a string,
# return the corresponding recarray,
# but don't reset the value in the data dict
# assert kper in list(self.data.keys()), "MfList.__getitem__() kper " + \
# str(kper) + " not in data.keys()"
try:
kper = int(kper)
except Exception as e:
raise ValueError(
f"MfList error: _getitem__() passed invalid kper index: {kper}"
)
if kper not in list(self.data.keys()):
if kper == 0:
return self.get_empty()
else:
return self.data[self.__find_last_kper(kper)]
if self.vtype[kper] == int:
if self.data[kper] == 0:
return self.get_empty()
else:
return self.data[self.__find_last_kper(kper)]
if self.vtype[kper] == str:
return self.__fromfile(self.data[kper])
if self.vtype[kper] == np.recarray:
return self.data[kper]
def __setitem__(self, kper, data):
if kper in list(self.__data.keys()):
if self._model.verbose:
print(f"removing existing data for kper={kper}")
self.data.pop(kper)
# If data is a list, then all we can do is try to cast it to
# an ndarray, then cast again to a recarray
if isinstance(data, list):
# warnings.warn("MfList casting list to array")
try:
data = np.array(data)
except Exception as e:
raise ValueError(
f"MfList error: casting list to ndarray: {e!s}"
)
# cast data
if isinstance(data, int):
self.__cast_int(kper, data)
elif isinstance(data, np.recarray):
self.__cast_recarray(kper, data)
# A single ndarray
elif isinstance(data, np.ndarray):
self.__cast_ndarray(kper, data)
# A single filename
elif isinstance(data, str):
self.__cast_str(kper, data)
else:
raise ValueError(
f"MfList error: unsupported data type: {type(data)}"
)
# raise NotImplementedError("MfList.__setitem__() not implemented")
def __fromfile(self, f):
# d = np.fromfile(f,dtype=self.dtype,count=count)
try:
d = np.genfromtxt(f, dtype=self.dtype)
except Exception as e:
raise ValueError(
f"MfList.__fromfile() error reading recarray from file {e!s}"
)
return d
def get_filenames(self):
kpers = list(self.data.keys())
kpers.sort()
filenames = []
first = kpers[0]
for kper in list(range(0, max(self._model.nper, max(kpers) + 1))):
# Fill missing early kpers with 0
if kper < first:
itmp = 0
kper_vtype = int
elif kper in kpers:
kper_vtype = self.__vtype[kper]
if (
self._model.array_free_format
and self._model.external_path is not None
):
# py_filepath = ''
# py_filepath = os.path.join(py_filepath,
# self._model.external_path)
filename = f"{self.package.name[0]}_{kper:04d}.dat"
filenames.append(filename)
return filenames
def get_filename(self, kper):
ext = "dat"
if self.binary:
ext = "bin"
return f"{self.package.name[0]}_{kper:04d}.{ext}"
@property
def binary(self):
return bool(self.__binary)
def write_transient(
self,
f,
single_per=None,
forceInternal=False,
write_header=True,
cln_data=None,
):
# forceInternal overrides isExternal (set below) for cases where
# external arrays are not supported (oh hello MNW1!)
# write the transient sequence described by the data dict
nr, nc, nl, nper = self._model.get_nrow_ncol_nlay_nper()
assert hasattr(
f, "read"
), "MfList.write() error: f argument must be a file handle"
kpers = list(self.data.keys())
pak_name_str = self.package.__class__.__name__.lower()
if (len(kpers) == 0) and (
pak_name_str == "mfusgwel"
): # must be cln wels
kpers += [
kper
for kper in list(cln_data.data.keys())
if kper not in kpers
]
kpers.sort()
first = kpers[0]
if single_per is None:
loop_over_kpers = list(range(0, max(nper, max(kpers) + 1)))
else:
if not isinstance(single_per, list):
single_per = [single_per]
loop_over_kpers = single_per
for kper in loop_over_kpers:
# Fill missing early kpers with 0
if (kper < first) or (
(kper not in self.data.keys()) and (pak_name_str == "mfusgwel")
): # clause for mfusg cases with only CLN wels, not GWF wels
itmp = 0
kper_vtype = int
elif kper in kpers:
kper_data = self.__data[kper]
kper_vtype = self.__vtype[kper]
if kper_vtype == str:
if not self._model.array_free_format:
kper_data = self.__fromfile(kper_data)
kper_vtype = np.recarray
itmp = self.get_itmp(kper)
if kper_vtype == np.recarray:
itmp = kper_data.shape[0]
elif (kper_vtype == int) or (kper_vtype is None):
itmp = kper_data
# Fill late missing kpers with -1
else:
itmp = -1
kper_vtype = int
if write_header:
if cln_data is None:
f.write(f" {itmp:9d} {0:9d} # stress period {kper + 1}\n")
elif cln_data.get_itmp(kper) is None:
f.write(f" {itmp:9d} {0:9d} # stress period {kper + 1}\n")
else:
itmpcln = cln_data.get_itmp(kper)
f.write(
f" {itmp:9d} {0:9d} {itmpcln:9d} # stress period {kper + 1}\n"
)
isExternal = False
if (
self._model.array_free_format
and self._model.external_path is not None
and forceInternal is False
):
isExternal = True
if self.__binary:
isExternal = True
if isExternal:
if kper_vtype == np.recarray:
py_filepath = ""
if self._model.model_ws is not None:
py_filepath = self._model.model_ws
if self._model.external_path is not None:
py_filepath = os.path.join(
py_filepath, self._model.external_path
)
filename = self.get_filename(kper)
py_filepath = os.path.join(py_filepath, filename)
model_filepath = filename
if self._model.external_path is not None:
model_filepath = os.path.join(
self._model.external_path, filename
)
self.__tofile(py_filepath, kper_data)
kper_vtype = str
kper_data = model_filepath
if kper_vtype == np.recarray:
name = f.name
if self.__binary:
f.close()
# switch file append mode to binary
with open(name, "ab+") as f:
self.__tofile(f, kper_data)
# continue back to non-binary
f = open(name, "a")
else:
self.__tofile(f, kper_data)
elif kper_vtype == str:
f.write(f" open/close {kper_data}")
if self.__binary:
f.write(" (BINARY)")
f.write("\n")
if cln_data is not None:
if cln_data.get_itmp(kper) is not None:
cln_data.write_transient(
f, single_per=kper, write_header=False
)
def __tofile(self, f, data):
# Write the recarray (data) to the file (or file handle) f
assert isinstance(
data, np.recarray
), "MfList.__tofile() data arg not a recarray"
# Add one to the kij indices
lnames = [name.lower() for name in self.dtype.names]
# --make copy of data for multiple calls
d = data.copy()
for idx in ["k", "i", "j", "node"]:
if idx in lnames:
d[idx] += 1
if self.__binary:
dtype2 = []
for name in self.dtype.names:
dtype2.append((name, np.float32))
dtype2 = np.dtype(dtype2)
d = np.array(d, dtype=dtype2)
d.tofile(f)
else:
np.savetxt(f, d, fmt=self.fmt_string, delimiter="")
def check_kij(self):
names = self.dtype.names
if ("k" not in names) or ("i" not in names) or ("j" not in names):
warnings.warn(
"MfList.check_kij(): index fieldnames 'k,i,j' "
f"not found in self.dtype names: {names}"
)
return
nr, nc, nl, nper = self._model.get_nrow_ncol_nlay_nper()
if nl == 0:
warnings.warn(
"MfList.check_kij(): unable to get dis info from model"
)
return
for kper in list(self.data.keys()):
out_idx = []
data = self[kper]
if data is not None:
k = data["k"]
k_idx = np.where(np.logical_or(k < 0, k >= nl))
if k_idx[0].shape[0] > 0:
out_idx.extend(list(k_idx[0]))
i = data["i"]
i_idx = np.where(np.logical_or(i < 0, i >= nr))
if i_idx[0].shape[0] > 0:
out_idx.extend(list(i_idx[0]))
j = data["j"]
j_idx = np.where(np.logical_or(j < 0, j >= nc))
if j_idx[0].shape[0]:
out_idx.extend(list(j_idx[0]))
if len(out_idx) > 0:
warn_str = (
"MfList.check_kij(): warning the following "
f"indices are out of bounds in kper {kper}:\n"
)
for idx in out_idx:
d = data[idx]
warn_str += " {:9d} {:9d} {:9d}\n".format(
d["k"] + 1, d["i"] + 1, d["j"] + 1
)
warnings.warn(warn_str)
def __find_last_kper(self, kper):
kpers = list(self.data.keys())
kpers.sort()
last = 0
for kkper in kpers[::-1]:
# if this entry is valid
if self.vtype[kkper] != int or self.data[kkper] != -1:
last = kkper
if kkper <= kper:
break
return kkper
def get_indices(self):
"""
a helper function for plotting - get all unique indices
"""
names = self.dtype.names
lnames = []
[lnames.append(name.lower()) for name in names]
if "k" not in lnames or "j" not in lnames:
raise NotImplementedError("MfList.get_indices requires kij")
kpers = list(self.data.keys())
kpers.sort()
indices = []
for i, kper in enumerate(kpers):
kper_vtype = self.__vtype[kper]
if (kper_vtype != int) or (kper_vtype is not None):
d = self.data[kper]
if not indices:
indices = list(zip(d["k"], d["i"], d["j"]))
else:
new_indices = list(zip(d["k"], d["i"], d["j"]))
for ni in new_indices:
if ni not in indices:
indices.append(ni)
return indices
def attribute_by_kper(self, attr, function=np.mean, idx_val=None):
assert attr in self.dtype.names
if idx_val is not None:
assert idx_val[0] in self.dtype.names
kpers = list(self.data.keys())
kpers.sort()
values = []
for kper in range(0, max(self._model.nper, max(kpers))):
if kper < min(kpers):
values.append(0)
elif kper > max(kpers) or kper not in kpers:
values.append(values[-1])
else:
kper_data = self.__data[kper]
if idx_val is not None:
kper_data = kper_data[
np.where(kper_data[idx_val[0]] == idx_val[1])
]
# kper_vtype = self.__vtype[kper]
v = function(kper_data[attr])
values.append(v)
return values
def plot(
self,
key=None,
names=None,
kper=0,
filename_base=None,
file_extension=None,
mflay=None,
**kwargs,
):
"""
Plot stress period boundary condition (MfList) data for a specified
stress period
Parameters
----------
key : str
MfList dictionary key. (default is None)
names : list
List of names for figure titles. (default is None)
kper : int
MODFLOW zero-based stress period number to return. (default is zero)
filename_base : str
Base file name that will be used to automatically generate file
names for output image files. Plots will be exported as image
files if file_name_base is not None. (default is None)
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only used
if filename_base is not None. (default is 'png')
mflay : int
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (default is None)
**kwargs : dict
axes : list of matplotlib.pyplot.axis
List of matplotlib.pyplot.axis that will be used to plot
data for each layer. If axes=None axes will be generated.
(default is None)
pcolor : bool
Boolean used to determine if matplotlib.pyplot.pcolormesh
plot will be plotted. (default is True)
colorbar : bool
Boolean used to determine if a color bar will be added to
the matplotlib.pyplot.pcolormesh. Only used if pcolor=True.
(default is False)
inactive : bool
Boolean used to determine if a black overlay in inactive
cells in a layer will be displayed. (default is True)
contour : bool
Boolean used to determine if matplotlib.pyplot.contour
plot will be plotted. (default is False)
clabel : bool
Boolean used to determine if matplotlib.pyplot.clabel
will be plotted. Only used if contour=True. (default is False)
grid : bool
Boolean used to determine if the model grid will be plotted
on the figure. (default is False)
masked_values : list
List of unique values to be excluded from the plot.
Returns
----------
out : list
Empty list is returned if filename_base is not None. Otherwise
a list of matplotlib.pyplot.axis is returned.
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> ml = flopy.modflow.Modflow.load('test.nam')
>>> ml.wel.stress_period_data.plot(ml.wel, kper=1)
"""
from ..plot import PlotUtilities
axes = PlotUtilities._plot_mflist_helper(
self,
key=key,
names=names,
kper=kper,
filename_base=filename_base,
file_extension=file_extension,
mflay=mflay,
**kwargs,
)
return axes
def to_shapefile(self, filename, kper=None):
"""
Export stress period boundary condition (MfList) data for a specified
stress period
Parameters
----------
filename : str
Shapefile name to write
kper : int
MODFLOW zero-based stress period number to return. (default is None)
Returns