From ad38516469c3c2bba443a17b9ac930ee9abc7c0e Mon Sep 17 00:00:00 2001 From: Will Boxx Date: Thu, 19 May 2022 15:12:57 -0500 Subject: [PATCH] Updated Hazard Plot function --- src/antenna_intensity_modeler/parabolic.py | 305 +++++++-------------- 1 file changed, 95 insertions(+), 210 deletions(-) diff --git a/src/antenna_intensity_modeler/parabolic.py b/src/antenna_intensity_modeler/parabolic.py index 9124d7d..20755d9 100644 --- a/src/antenna_intensity_modeler/parabolic.py +++ b/src/antenna_intensity_modeler/parabolic.py @@ -14,6 +14,7 @@ from typing import Union, Callable from concurrent.futures import ProcessPoolExecutor, as_completed from tqdm import tqdm +import itertools # from .helpers import Either, Left, Right from pymonad.tools import curry @@ -30,7 +31,7 @@ SIN = np.sin RAD = 1.0 DEG = PI / 180 -TO_DEG = 180. / PI +TO_DEG = 180.0 / PI # Other units GHZ = 1e9 * HERTZ @@ -91,10 +92,8 @@ def parameters( print( ( "WARNING - Diameter should be at least ten times " - + - "the wavelength for this method to work. Currently " - + - "the diameter is {} times the wavelength." + + "the wavelength for this method to work. Currently " + + "the diameter is {} times the wavelength." ).format(ratio) ) @@ -110,11 +109,11 @@ def parameters( ) h_value = scipy.optimize.newton(hansons_fun, 0.1) - ffmin = 2 * diam**2 / _lamda - ffpwrden = eirp / (4 * PI * ffmin**2) + ffmin = 2 * diam ** 2 / _lamda + ffpwrden = eirp / (4 * PI * ffmin ** 2) k = 2 * PI / _lamda - min_range = 0.5 * diam * (diam / _lamda)**(1/3) + min_range = 0.5 * diam * (diam / _lamda) ** (1 / 3) return_dict = { "radius_meters": radius_meters, @@ -129,7 +128,7 @@ def parameters( "k": k, "a_value": a_value, "n_value": n_value, - "min_range": min_range + "min_range": min_range, } return return_dict @@ -138,11 +137,11 @@ def _bessel_func( x: float, f: Union[np.cos, np.sin], H: float, u: float, d: float ) -> Callable: return ( - 1. - * scipy.special.i0(PI * H * (1. - x ** 2)) # **(1 / 2)) + 1.0 + * scipy.special.i0(PI * H * (1.0 - x ** 2)) # **(1 / 2)) # * (1 - x**2) * scipy.special.j0(u * x) - * f(PI * x ** 2 / 8. / d) + * f(PI * x ** 2 / 8.0 / d) * x ) @@ -157,20 +156,16 @@ def _romberg_integration( def _check_theta(theta, a, z, lamda): - if (PI * a**2 / lamda / z) * SIN(theta)**2 > 0.08: + if (PI * a ** 2 / lamda / z) * SIN(theta) ** 2 > 0.08: print( ( - "WARNING - Failed theta check for theta = {}, " - + - "a = {}, and z = {}" + "WARNING - Failed theta check for theta = {}, " + "a = {}, and z = {}" ).format(theta * TO_DEG, a, z) ) def _run_near_field_corrections( - d: float, - parameters: dict, - xbar: float + d: float, parameters: dict, xbar: float, verbose=True ) -> float: # Get parameters radius = parameters.get("radius_meters") @@ -181,7 +176,8 @@ def _run_near_field_corrections( xbarR = xbar * radius theta = np.arctan(xbarR / (d * ffmin)) - _check_theta(theta, xbarR, d*ffmin, lamda) + if verbose: + _check_theta(theta, xbarR, d * ffmin, lamda) u = k * radius * np.sin(theta) # Get Bessel Functions @@ -196,7 +192,7 @@ def _run_near_field_corrections( @curry(2) def final_reduction(x, y): - return (1. + np.cos(theta)) / d * abs(x - 1.j * y) + return (1.0 + np.cos(theta)) / d * abs(x - 1.0j * y) # return (1 + np.cos(theta)) / d * abs(Ep1_1 - 1j * Ep2_1) return (final_reduction << Ep1 & Ep2).then(_square) @@ -276,7 +272,7 @@ def near_field_corrections( ffmin = parameters.get("ffmin") if min_range < 0.1 * ffmin: - my_func = (lambda x: min_range - (10**x * ffmin)) + my_func = lambda x: min_range - (10 ** x * ffmin) pow = scipy.optimize.newton(my_func, -2) else: pow = -2 @@ -287,7 +283,7 @@ def run(f, my_iter): iter_length = len(my_iter) with tqdm(total=iter_length) as pbar: # let's give it some more threads: - with ProcessPoolExecutor(max_workers=5) as executor: + with ProcessPoolExecutor(max_workers=10) as executor: futures = {executor.submit(f, arg): arg for arg in my_iter} results = {} for future in as_completed(futures): @@ -313,7 +309,7 @@ def run(f, my_iter): def delta_xbar_split(delta_xbar: tuple, parameters: dict): (d, xbar) = delta_xbar[0], delta_xbar[1] - return _run_near_field_corrections(d, parameters, xbar) + return _run_near_field_corrections(d, parameters, xbar, verbose=False) def get_normalized_power_tensor( @@ -326,29 +322,61 @@ def get_normalized_power_tensor( step = xbar_max / density * 10 xbars = np.arange(0, xbar_max, step) - delta_xbars = np.reshape(np.array(np.meshgrid(delta, xbars)).T, (-1, 2)) + delta_xbars = np.dstack(np.meshgrid(delta, xbars)).reshape(-1, 2) + results = list(itertools.product(delta, xbars)) + # results = ["{}, {}".format(xx, yy) for xx in delta for yy in xbars] + # reshaped = np.reshape(results, (len(xbars), len(delta)), order="C") + + # test = list(itertools.product(delta, xbars)) chunksize = 100 run_corrections_with_params = partial(delta_xbar_split, parameters=parameters) - with ProcessPoolExecutor() as p: - # map each delta, xbars tuple to the run_with_params partial function - mtrx = np.array( - list( - p.map( - run_corrections_with_params, - delta_xbars, - chunksize=chunksize - ) - ) - ) - # Reshape the resulting flattened array into a 2-d tensor representing - # 2-d space from txr center to farfield and down to txr edge - mtrx_reshaped = np.reshape(mtrx, (len(xbars), len(delta)), order="F") + + def run(f, my_iter): + iter_length = len(my_iter) + with tqdm(total=iter_length) as pbar: + # let's give it some more threads: + with ProcessPoolExecutor(max_workers=10) as executor: + # futures = {executor.submit(f, arg): arg for arg in my_iter} + futures = { + executor.submit(f, my_iter[i]): i for i in range(iter_length) + } + results = {} + for future in as_completed(futures): + arg = futures[future] + results[arg] = future.result() + pbar.update(1) + return results + + Ep = [] + + Ep_dict = run(run_corrections_with_params, delta_xbars) + for key in sorted(Ep_dict.keys()): + Ep.append(Ep_dict[key]) + + ff_val = Ep[-1].value + Ep_monad = ListMonad(*Ep) + + power_norm = Ep_monad * _unpack * _normalize(ff_val) + + reshaped = np.reshape(power_norm, (len(xbars), len(delta)), order="C") + + # with ProcessPoolExecutor() as p: + # # map each delta, xbars tuple to the run_with_params partial function + # mtrx = np.array( + # list(p.map(run_corrections_with_params, delta_xbars, chunksize=chunksize)) + # ) + # # Reshape the resulting flattened array into a 2-d tensor representing + # # 2-d space from txr center to farfield and down to txr edge + # # mtrx_unpacked = [x.value for x in mtrx] + # mtrx_reshaped = np.reshape(mtrx, (len(xbars), len(delta)), order="F") + # return_val = mtrx_reshaped ** 2 / mtrx_reshaped[:, -1][:, np.newaxis] ** 2 + # Normalize each row of tensor to maximum level at far field - return mtrx_reshaped ** 2 / mtrx_reshaped[:, -1][:, np.newaxis] ** 2 + return reshaped -def test_hazard_plot( +def hazard_plot( parameters: dict, limit: float, density: int = 1000, @@ -407,7 +435,7 @@ def test_hazard_plot( ) # Subtract normalized limit from normalized power tensor - mtrx_limited = mtrx_normalized - limit / ffpwrden_boosted + mtrx_limited = abs(mtrx_normalized - limit / ffpwrden) pause = True # find the index of the largest values less than zero @@ -416,160 +444,16 @@ def test_hazard_plot( np.where(mtrx_limited < 0, mtrx_limited, -np.inf), axis=0 ) * (xbar_max / density * 10) + mtrx_arg_min = np.argmin(mtrx_limited, axis=0) * (xbar_max / density * 10) + # mtrx_reduced = np.maximum.reduce( # mtrx_limited, initial=-np.inf, where=[mtrx_limited < 0] # ) return pd.DataFrame( dict( range=delta * ffmin, - positives=mtrx_arg_max * radius_meters, - negatives=mtrx_arg_max * -radius_meters, - ) - ) - - -def hazard_plot( - parameters: dict, - limit: float, - density: int = 1000, - xbar_max: float = 1.0, - gain_boost_db: int = 0, -): - """Hazard plot for parabolic dish. - - Receives user input parameters and hazard limit - for parabolic dish. Computes and returns hazard distance - plot. - - Args: - parameters (dict): parameters dict created with parameters function - limit (float): power density limit - density (int): (optional) number of points for plot, if none density=1000 - xbar_max (float): (optional) maximum value for xbar, if none is given xbar_max=1 - gain_boost (int): (optional) additional gain in dB to add to output power - - Returns: - pandas.DataFrame: dataframe containing a range column, a positive values column, and a negative values column - - Example: - >>> from antenna_intensity_modeler import parabolic - >>> import matplotlib.pyplot as plt - >>> - >>> params = parabolic.parameters(2.4, 8.4e9, 400.0, 0.62, 20.0) - >>> limit = 10.0 - >>> df = parabolic.hazard_plot(params, limit) - >>> rng = df.range.values - >>> positives = df.positives.values - >>> negatives = df.negatives.values - >>> - >>> fig, ax = plt.subplots() - >>> ax.plot(range, positives, range, negatives) - >>> ax.grid(True, which='both') - >>> ax.minorticks_on() - >>> ax.set_title('Hazard Plot with limit: %s w/m^2' % limit) - >>> ax.set_xlabel('Distance From Antenna(m)') - >>> ax.set_ylabel('Off Axis Distance (m)') - - .. image:: _static/hazard_plot.png - """ - - radius_meters = parameters.get("radius_meters") - freq_mhz = parameters.get("freq_mhz") - power_watts = parameters.get("power_watts") - efficiency = parameters.get("efficiency") - side_lobe_ratio = parameters.get("side_lobe_ratio") - H = parameters.get("H") - ffmin = parameters.get("ffmin") - ffpwrden = parameters.get("ffpwrden") - k = parameters.get("k") - - n = density - delta = np.linspace(1.0, 0.001, n) # Normalized farfield distances - # delta = np.logspace(-2, 0, n) - # delta = delta[::-1] - xbarArray = np.zeros(n) # np.ones(n) - Ep = np.zeros(n) - - # xbars = np.linspace(0, 1, 10) - step = 0.01 - xbars = np.arange(0, xbar_max + step, step) - - gain_boost = 10 ** (gain_boost_db / 10.0) - ffpwrden = gain_boost * ffpwrden - - last = 1e-9 - count = 0 - ff_reference = 0 - - for d in delta: - for xbar in xbars: - xbarR = xbar * radius_meters - theta = np.arctan(xbarR / (d * ffmin)) - u = k * radius_meters * np.sin(theta) - - def fun1(x): - return ( - 1 - # * sp.special.iv(0, PI * H * (1 - x**2)**(1 / 2)) - * sp.special.jv(0, u * x) - * np.cos(PI * x ** 2 / 8 / d) - * x - ) - - # Ep1 = sp.integrate.romberg(fun1, 0, 1) - Ep1 = sum(fun1(np.linspace(0, 1, 1000))) - - def fun2(x): - return ( - 1 - # * sp.special.iv(0, PI * H * (1 - x**2)**(1 / 2)) - * sp.special.jv(0, u * x) - * np.sin(PI * x ** 2 / 8 / d) - * x - ) - - # Ep2 = sp.integrate.romberg(fun2, 0, 1) - Ep2 = sum(fun2(np.linspace(0, 1, 1000))) - - if d == 1.0 and xbar == 0.0: - ff_reference = (1 + np.cos(theta)) / d * abs(Ep1 - 1j * Ep2) - Ep = ff_reference - else: - Ep = (1 + np.cos(theta)) / d * abs(Ep1 - 1j * Ep2) - - power = ffpwrden * (Ep ** 2 / ff_reference ** 2) - if (power - limit) > 0: - # if power - limit > last: - xbarArray[count] = xbar - # last = power - limit - last = 1e-9 - count += 1 - - # fig, ax = plt.subplots() - # # ax.plot(delta[::-1] * ffmin, xbarArray[::-1] * radius_meters, - # # delta[::-1] * ffmin, xbarArray[::-1] * -radius_meters) - # _x = delta[::-1] * ffmin - # _y = xbarArray[::-1] * radius_meters - # _y_neg = -1 * _y - # degree = 0 - # rotate = [ - # [np.cos(degree * np.PI / 180), np.sin(degree * np.PI / 180)], - # [-np.sin(degree * np.PI / 180), np.cos(degree * np.PI / 180)] - # ] - # top = np.dot(np.column_stack((_x, _y)), rotate) - # bottom = np.dot(np.column_stack((_x, _y_neg)), rotate) - # ax.plot(top[:, 0], top[:, 1], bottom[:, 0], bottom[:, 1]) - # ax.grid(True, which='both') - # ax.minorticks_on() - # ax.set_title('Hazard Plot with limit: %s w/m^2' % limit) - # ax.set_xlabel('Distance From Antenna(m)') - # ax.set_ylabel('Off Axis Distance (m)') - # return fig, ax - return pd.DataFrame( - dict( - range=delta[::-1] * ffmin, - positives=xbarArray[::-1] * radius_meters, - negatives=xbarArray[::-1] * -radius_meters, + positives=mtrx_arg_min * radius_meters, + negatives=mtrx_arg_min * -radius_meters, ) ) @@ -768,9 +652,7 @@ def far_field_radiation_pattern(parameters, theta, N): .. image:: _static/ffPattern.png """ - run_with_params = partial( - _run_far_field_radiation_pattern, parameters=parameters - ) + run_with_params = partial(_run_far_field_radiation_pattern, parameters=parameters) def run(f, my_iter): iter_length = len(my_iter) @@ -827,20 +709,20 @@ def _next_pow_2(x): if __name__ == "__main__": - + params = parameters(2.4, 8400, 400.0, 0.62, 20.0) limit = 10.0 - df = test_hazard_plot(params, limit) + df = hazard_plot(params, limit) rng = df.range.values positives = df.positives.values negatives = df.negatives.values fig, ax = plt.subplots() ax.plot(rng, positives, rng, negatives) - ax.grid(True, which='both') + ax.grid(True, which="both") ax.minorticks_on() - ax.set_title('Hazard Plot with limit: %s w/m^2' % limit) - ax.set_xlabel('Distance From Antenna(m)') - ax.set_ylabel('Off Axis Distance (m)') + ax.set_title("Hazard Plot with limit: %s w/m^2" % limit) + ax.set_xlabel("Distance From Antenna(m)") + ax.set_ylabel("Off Axis Distance (m)") freq = 44.5 * GHZ lamda = C / freq @@ -862,18 +744,21 @@ def _next_pow_2(x): size = int(_next_pow_2(10 * D)) resolution = size / npix - theta = 1 * DEG - power_norm = far_field_radiation_pattern(params, theta, npix) fig, ax = plt.subplots() - delta = params.get("k") * R * SIN(np.linspace(0, 1 * DEG, npix)) - # ax.set_xlim([0, 12]) - # ax.set_ylim([-40, 0]) - ax.plot(delta, 10*np.log10(power_norm)) - - # power_norm = near_field_corrections(params, xbar, length) - # delta = np.logspace(-2, 0, npix) - # ax.semilogx(delta, 10*np.log10(power_norm)) - # ax.set_xlim([0.01, 1.0]) + + # Radiation pattern example + # theta = 1 * DEG + # power_norm = far_field_radiation_pattern(params, theta, npix) + # delta = params.get("k") * R * SIN(np.linspace(0, 1 * DEG, npix)) + # # ax.set_xlim([0, 12]) + # # ax.set_ylim([-40, 0]) + # ax.plot(delta, 10 * np.log10(power_norm)) + + # Near field corrections example + power_norm = near_field_corrections(params, xbar, npix) + delta = np.logspace(-2, 0, npix) + ax.semilogx(delta, 10 * np.log10(power_norm)) + ax.set_xlim([0.01, 1.0]) ax.grid(True, which="both") ax.minorticks_on()