Skip to content

Commit

Permalink
version 3.0.0 gnssir with multiple processors
Browse files Browse the repository at this point in the history
  • Loading branch information
kristinemlarson committed Mar 15, 2024
1 parent 579690b commit 3d59adc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 63 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).


## 3.0.0

parallel processing for gnssir, makes it much much faster

## 2.6.0

beta version of multi-processing for gnssir. It is called gnssir2 for now. I will
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# gnssrefl v2.6.0
# gnssrefl v3.0.0

[![PyPI Version](https://img.shields.io/pypi/v/gnssrefl.svg)](https://pypi.python.org/pypi/gnssrefl)

Expand Down
95 changes: 34 additions & 61 deletions gnssrefl/gnssir_cl.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def gnssir(station: str, year: int, doy: int, snr: int = 66, plt: bool = False,
be twice as fast, three years of data, three times as fast. I have added an option par=-99 which
only works for now on a single year. but it will create up to 10 simultaneous processes, so very slick.
Ultimately of course I will put these together and make this the official version. Huge thank you to
Aaryan Rampal for getting this up and running.
AaryanRampal for getting this up and running.
Examples
--------
Expand Down Expand Up @@ -230,7 +230,7 @@ def gnssir(station: str, year: int, doy: int, snr: int = 66, plt: bool = False,
# in case you want to analyze only data within one year
year_st = year # renaming for confusing variable naming when dealing with year calculations below
if year_end is None:
year_end = year_st
year_end = year

# default will be to overwrite
#if nooverwrite is None:
Expand Down Expand Up @@ -301,7 +301,7 @@ def gnssir(station: str, year: int, doy: int, snr: int = 66, plt: bool = False,
print(lsp['e1'], lsp['e2'], ' min and max elevation angles')
# added this because ellist is a new option and was not necessarily created in old json files
if 'ellist' not in lsp:
print('did not find ellist')
#print('did not find ellist')
if float(lsp['e1']) < float(lsp['pele'][0]):
print('emin is smaller than the minimum eangle (pele) used for direct signal removed.')
print('This is Forbidden. Fix the records set in the json created by gnssir_analysis')
Expand Down Expand Up @@ -330,22 +330,27 @@ def gnssir(station: str, year: int, doy: int, snr: int = 66, plt: bool = False,
# queue which handles any exceptions any of the processes encounter
manager = multiprocessing.Manager()
error_queue = manager.Queue()
additional_args = { "year_end": year_end, "year_st": year_st, "doy": doy, "doy_end": doy_end, "args": args, "error_queue": error_queue }
additional_args = { "args": args, "error_queue": error_queue }

t1 = time.time()
if (year == year_end) & (doy == doy_end):
if par:
print('analyze only one day - no reason to submit multiple processes')
par = None

if not par:
# analyze one year at a time in the current code
# FWIW, this should be changed to MJD too.
for year in year_list:
process_year(year, **additional_args)
print('No parallel processing')
process_year(year,year_end,doy,doy_end, **additional_args)

else:
print('You have chosen parallel processing')
if par > 10:
print('For now we will only allow ten simultaneous processes. Submit again. Exiting.')
sys.exit()
else:
numproc = par
print(year,doy,year_end,doy_end)
d,numproc=guts2.make_parallel_proc_lists_mjd(year, doy, year_end, doy_end, numproc)
print(d)

# make a list of process IDs
index_list = list(range(numproc))
Expand All @@ -359,17 +364,6 @@ def gnssir(station: str, year: int, doy: int, snr: int = 66, plt: bool = False,
pool.close()
pool.join()

if False:
# I think we can get rid of this now and use MJD for both parts ...
print('Using process year with pools')
pool = multiprocessing.Pool(processes=par)
partial_process_year = partial(process_year, **additional_args)
print(year_list)
type(partial_process_year)

pool.map(partial_process_year, year_list)
pool.close()
pool.join()

if not error_queue.empty():
print("One (or more) of the processes encountered errors. Will not proceed until errors are fixed.")
Expand All @@ -381,55 +375,42 @@ def gnssir(station: str, year: int, doy: int, snr: int = 66, plt: bool = False,
sys.exit(1)

t2 = time.time()
print('Time to compute ', round(t2-t1,2))
print('Time to compute: ', round(t2-t1,2), ' seconds')


def process_year(year, year_end, year_st, doy, doy_end, args, error_queue):
def process_year(year, year_end, doy, doy_end, args, error_queue):
"""
Code that does the processing for a specific year. Refactored to separate
function to allow for parallel processes
Parameters
----------
year : int
the year you are currently analyzing
the start year
year_end : int
end year. This was the last year you plan to analyze
year_st: int
This is starting year you were planning to analyze
doy : integer
Day of year
POOR VARIABLE NAME. SHOULD BE CHANGED. i believe it is the start doy on the start year.
Day of year to start processing
doy_end : int
end day of year on the last year you plan to analyze
Default is None.
args : dict
arguments passed into gnssir through commandline (or python)
"""
try:
if year != year_end:
doy_en = 366
else:
doy_en = doy_end
# an infinitely better way
MJD1 = int(g.ydoy2mjd(year,doy))
MJD2 = int(g.ydoy2mjd(year_end,doy_end))
for modjul in range(MJD1, MJD2+1):
y, d = g.modjul_to_ydoy(modjul)
args['year'] = y
args['doy'] = d
try:
guts2.gnssir_guts_v2(**args)
except:
warnings.warn(f'error processing {y} {d}');

return

if year == year_st:
doy_list = list(range(doy, doy_en+1))
else:
doy_list = list(range(1, doy_en+1))

# so really this is looking at only a single year
# looping through day of year. I think?

args['year'] = year
for doy in doy_list:
args['doy'] = doy
try:
guts2.gnssir_guts_v2(**args)
except:
warnings.warn(f'error processing {year} {doy}');
except Exception as e:
error_queue.put(e)

def process_year_dictionary(index,args,datelist,error_queue):
"""
Expand All @@ -444,20 +425,14 @@ def process_year_dictionary(index,args,datelist,error_queue):
arguments passed into gnssir through commandline (or python)
should have the new arguments for sublists
datelist : dict
list of dates you want to analyze in simple year, doy1, doy2 format
should have up to 10 sets of dates, from 0 to 9, e.g. for five processes
dd = { 0: [2020, 251, 260], 1:[2020, 261, 270], 2: [2020, 271, 280], 3:[2020, 281, 290], 4:[2020,291,300] }
list of dates you want to analyze in pairs of MJD
could have up to 10 sets of dates, from 0 to 9, e.g. for two processes
dd = { 0: [MJD1, MJD2], 1:[MJD1,MJD2] }
"""


try:
d1 = datelist[index][0]; d2 = datelist[index][1]

mjd_list = list(range(d1, d2+1))


# now store year and doy in args dictionary, which is somewhat silly

for MJD in mjd_list:
year, doy = g.modjul_to_ydoy(MJD)
Expand All @@ -466,8 +441,6 @@ def process_year_dictionary(index,args,datelist,error_queue):
print(f'Processing MJD {MJD} Year {year} DOY {doy}');

guts2.gnssir_guts_v2(**args)

# warnings.warn(f'error processing {year} {doy}');
except Exception as e:
error_queue.put(e)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
]
setup(
name="gnssrefl",
version="2.6.0",
version="3.0.0",
author="Kristine Larson",
author_email="kristinem.larson@gmail.com",
description="A GNSS reflectometry software package ",
Expand Down

0 comments on commit 3d59adc

Please sign in to comment.