In [None]:
#https://stats.stackexchange.com/questions/13169/defining-quantiles-over-a-weighted-sample

In [None]:
def approximate_median_acs(range_list, sampling_percentage=None):
    """
    Estimate a median and approximate the margin of error.
    Follows the U.S. Census Bureau's `official guidelines`_ for estimation using a design factor.
    Useful for generating medians for measures like household income and age when aggregating census geographies.
    Args:
        range_list (list): A list of dictionaries that divide the full range of data values into continuous categories.
            Each dictionary should have three keys:
                * min (int): The minimum value of the range
                * max (int): The maximum value of the range
                * n (int): The number of people, households or other unit in the range
            The minimum value in the first range and the maximum value in the last range can be tailored to the dataset
            by using the "jam values" provided in the `American Community Survey's technical documentation`_.
        sampling_percentage (float, optional): A statistical input used to correct variance for finite population. This value
            represents the percentage of the population that was sampled to create the data. If you do not provide this input, a
            margin of error will not be returned.
    Returns:
        A two-item tuple with the median followed by the approximated margin of error.
        (42211.096153846156, 10153.200960954948)
    Examples:
        Estimating the median for a range of household incomes.
        >>> household_income_2013_acs5 = [
            dict(min=2499, max=9999, n=186),
            dict(min=10000, max=14999, n=78),
            dict(min=15000, max=19999, n=98),
            dict(min=20000, max=24999, n=287),
            dict(min=25000, max=29999, n=142),
            dict(min=30000, max=34999, n=90),
            dict(min=35000, max=39999, n=107),
            dict(min=40000, max=44999, n=104),
            dict(min=45000, max=49999, n=178),
            dict(min=50000, max=59999, n=106),
            dict(min=60000, max=74999, n=177),
            dict(min=75000, max=99999, n=262),
            dict(min=100000, max=124999, n=77),
            dict(min=125000, max=149999, n=100),
            dict(min=150000, max=199999, n=58),
            dict(min=200000, max=250001, n=18)
        ]
        >>> approximate_median_acs(household_income_2013_acs5, sampling_percentage=5*2.5)
        (42211.096153846156, 4706.522752733644)
    ... _official guidelines:
        https://www.documentcloud.org/documents/6165603-2013-2017AccuracyPUMS.html#document/p18
    ... _American Community Survey's technical documentation
        https://www.documentcloud.org/documents/6165752-2017-SummaryFile-Tech-Doc.html#document/p20/a508561
    ... _the bureau's reference material:
        https://www.census.gov/programs-surveys/acs/technical-documentation/pums/documentation.html
    """
    # Sort the list
    range_list.sort(key=lambda x: x['min'])

    # For each range calculate its min and max value along the universe's scale
    cumulative_n = 0
    for range_ in range_list:
        range_['n_min'] = cumulative_n
        cumulative_n += range_['n']
        range_['n_max'] = cumulative_n

    # What is the total number of observations in the universe?
    n = sum([d['n'] for d in range_list])

    # What is the estimated midpoint of the n?
    n_midpoint = n / 2.0

    # Now use those to determine which group contains the midpoint.
    n_midpoint_range = next(d for d in range_list if n_midpoint >= d['n_min'] and n_midpoint <= d['n_max'])

    # How many households in the midrange are needed to reach the midpoint?
    n_midrange_gap = n_midpoint - n_midpoint_range['n_min']

    # What is the proportion of the group that would be needed to get the midpoint?
    n_midrange_gap_percent = n_midrange_gap / n_midpoint_range['n']

    # Apply this proportion to the width of the midrange
    n_midrange_gap_adjusted = (n_midpoint_range['max'] - n_midpoint_range['min']) * n_midrange_gap_percent

    # Estimate the median
    estimated_median = n_midpoint_range['min'] + n_midrange_gap_adjusted

    # If there's no sampling percentage, we can't calculate a margin of error
    if not sampling_percentage:
        # Let's throw a warning, but still return the median
        warnings.warn("", SamplingPercentageWarning)
        return estimated_median, None

    # Get the standard error for this dataset
    standard_error = (math.sqrt(((100 - sampling_percentage) / (n * sampling_percentage)) * (50**2))) / 100

    # Use the standard error to calculate the p values
    p_lower = (.5 - standard_error)
    p_upper = (.5 + standard_error)

    # Estimate the p_lower and p_upper n values
    p_lower_n = n * p_lower
    p_upper_n = n * p_upper

    # Find the ranges the p values fall within
    try:
        p_lower_range_i, p_lower_range = next(
            (i, d) for i, d in enumerate(range_list)
            if p_lower_n >= d['n_min'] and p_lower_n <= d['n_max']
        )
    except StopIteration:
        raise DataError(f"The n's lower p value {p_lower_n} does not fall within a data range.")

    try:
        p_upper_range_i, p_upper_range = next(
            (i, d) for i, d in enumerate(range_list)
            if p_upper_n >= d['n_min'] and p_upper_n <= d['n_max']
        )
    except StopIteration:
        raise DataError(f"The n's upper p value {p_upper_n} does not fall within a data range.")

    # Use these values to estimate the lower bound of the confidence interval
    p_lower_a1 = p_lower_range['min']
    try:
        p_lower_a2 = range_list[p_lower_range_i + 1]['min']
    except IndexError:
        p_lower_a2 = p_lower_range['max']
    p_lower_c1 = p_lower_range['n_min'] / n
    try:
        p_lower_c2 = range_list[p_lower_range_i + 1]['n_min'] / n
    except IndexError:
        p_lower_c2 = p_lower_range['n_max'] / n
    lower_bound = ((p_lower - p_lower_c1) / (p_lower_c2 - p_lower_c1)) * (p_lower_a2 - p_lower_a1) + p_lower_a1

    # Same for the upper bound
    p_upper_a1 = p_upper_range['min']
    try:
        p_upper_a2 = range_list[p_upper_range_i + 1]['min']
    except IndexError:
        p_upper_a2 = p_upper_range['max']
    p_upper_c1 = p_upper_range['n_min'] / n
    try:
        p_upper_c2 = range_list[p_upper_range_i + 1]['n_min'] / n
    except IndexError:
        p_upper_c2 = p_upper_range['n_max'] / n
    upper_bound = ((p_upper - p_upper_c1) / (p_upper_c2 - p_upper_c1)) * (p_upper_a2 - p_upper_a1) + p_upper_a1

    # Calculate the standard error of the median
    standard_error_median = 0.5 * (upper_bound - lower_bound)

    # Calculate the margin of error at the 90% confidence level
    margin_of_error = 1.645 * standard_error_median

    # Return the result
    return estimated_median, margin_of_error

In [217]:
def approximate_median_pums(range_list, sampling_percentage=None, design_factor=None):
    """
    Estimate a weighted median and approximate the margin of error.
    Args:
        range_list (list): A list of dictionaries that divide the full range of data values into continuous categories.
            Each dictionary should have three keys:
                * val (float): value for individual
                * w (int): weight for the individual
        design_factor (float, optional): A statistical input used to tailor the standard error to the
            variance of the dataset. This is only needed for data coming from PUMS. The Census Bureau publishes design factors as
            part of its PUMS Accuracy statement.
            Find the value for the dataset you are estimating by referring to `the bureau's reference material`_.
            If you do not provide this input, the default is one which will have no effect on the margin of error.
        sampling_percentage (float, optional): A statistical input used to correct variance for finite population. This value
            represents the percentage of the population that was sampled to create the data. If you do not provide this input, a
            margin of error will not be returned.
    Returns:
        A two-item tuple with the weighted median followed by the approximated margin of error.
        (42211.096153846156, 10153.200960954948)
    Examples:
        Estimating the median for a range of household incomes.
        >>> dummy_PUMS_data = [
            dict(val=5000,  w=186/2068),
            dict(val=12000,  w=78/2068),
            dict(val=17000, w=98/2068),
            dict(val=23000,  w=287/2068),
            dict(val=27000,  w=142/2068),
            dict(val=31000,  w=90/2068),
            dict(val=36000,  w=107/2068),
            dict(val=42000,  w=104/2068),
            dict(val=48000,  w=178/2068),
            dict(val=59000,  w=106/2068),
            dict(val=63000,  w=177/2068),
            dict(val=90000,  w=262/2068),
            dict(val=110000,  w=77/2068),
            dict(val=135000,  w=100/2068),
            dict(val=180000,  w=58/2068),
            dict(val=210000,  w=18/2068)
]
        >>> approximate_median_pums(dummy_PUMS_data, design_factor=1, sampling_percentage=1)
        (41865.16853932583, 20257.481963744707)
        """

    est = weighted_quantile(range_list, 0.5)
    
    # If there's no sampling percentage, we can't calculate a margin of error
    if not sampling_percentage:
        # Let's throw a warning, but still return the median
        warnings.warn("", SamplingPercentageWarning)
        return estimated_median, None

    
    # What is the total number of observations in the universe?
    n = len(range_list)
    
    # Get the standard error for this dataset
    standard_error = (design_factor * math.sqrt(((100 - sampling_percentage) / (n * sampling_percentage)) * (50**2))) / 100
    
    #https://www.itl.nist.gov/div898/software/dataplot/refman2/auxillar/quantse.htm
    est75 = weighted_quantile(range_list, 0.75)
    est25 = weighted_quantile(range_list, 0.25)
    
    h = (1.2 * (est75-est25))/(n ** (1/5))

    
    val = []
    for range_ in range_list:
        val.append(range_['val'])
    
    val=numpy.array(val)
    
    # nint = how many data points fall in est +/- h
    k = numpy.where(( val<=est+h) & ( val>=est-h)) 

    nint = len(k[0])
    
    fhat= nint/(2*n*h)
   
    standard_error_median = 1/(2*math.sqrt(n)*fhat)
    
    # Calculate the margin of error at the 90% confidence level
    margin_of_error = 1.645 * standard_error_median

    # Return the result
    return est, margin_of_error

In [None]:
# exceptions.py
class DesignFactorWarning(Warning):
    """
    Warns that you have not provided a design factor.
    """
    def __str__(self):
        return """A margin of error cannot be calculated unless you provide a design factor.
Design factors for different census surveys and tables can be found in the "PUMS Accuracy" CSV files. https://www.census.gov/programs-surveys/acs/technical-documentation/pums/documentation.html
"""

## init
from .exceptions import DesignFactorWarning

## test.py

def test_exception(self):
        DesignFactorWarning().__str__()
        
with self.assertWarns(DesignFactorWarning):
            m, moe = census_data_aggregator.approximate_median(income)
            self.assertTrue(moe == None)      
            
import


In [None]:
def helper_weighted_quantile(range_list, k):
    
    n = len(range_list)
   
    S_k = (k-1) * range_list[k-1]['w'] + (n - 1) * range_list[k-2]['w_max']
 
    return S_k

In [None]:
def weighted_quantile(data_list, p):
    
#https://stats.stackexchange.com/questions/13169/defining-quantiles-over-a-weighted-sample
    
    data_list.sort(key=lambda x: x['val'])
    
    cumulative_w = 0
    for range_ in data_list:
        range_['w_min'] = cumulative_w
        cumulative_w += range_['w']
        range_['w_max'] = cumulative_w 
    
    n=len(data_list)
    S_n = (n-1) * cumulative_w

    S_k = []
    S_k1 = []
    for i in range(n-1)[2:n-1]:
        S_k.append(helper_weighted_quantile(data_list, i))
        S_k1.append(helper_weighted_quantile(data_list,i+1))
    
    interp = numpy.array(numpy.divide(S_k, S_n))
    interp2 = numpy.array(numpy.divide(S_k1, S_n))

    k = numpy.where((interp<=p) & (p<=interp2))[0] 
    ## if 0 it's really 2 which is really 1 :/

    ## check these indices;
    k=k[0]
    est = data_list[k+1]['val']+(data_list[k+2]['val'] - data_list[k+1]['val'])*(p*S_n-S_k[k])/(S_k1[k]-S_k[k])
    return est
    

In [None]:
household_income_2013_acs5 = [
            dict(val=5000,  w=186/2068),
            dict(val=12000,  w=78/2068),
            dict(val=17000, w=98/2068),
            dict(val=23000,  w=287/2068),
            dict(val=27000,  w=142/2068),
            dict(val=31000,  w=90/2068),
            dict(val=36000,  w=107/2068),
            dict(val=42000,  w=104/2068),
            dict(val=48000,  w=178/2068),
            dict(val=59000,  w=106/2068),
            dict(val=63000,  w=177/2068),
            dict(val=90000,  w=262/2068),
            dict(val=110000,  w=77/2068),
            dict(val=135000,  w=100/2068),
            dict(val=180000,  w=58/2068),
            dict(val=210000,  w=18/2068)
]

weighted_quantile(household_income_2013_acs5,0.5)

In [None]:
helper_weighted_quantile(household_income_2013_acs5, 2)

In [218]:
approximate_median_pums(household_income_2013_acs5, sampling_percentage=1, design_factor=1)

(41865.16853932583, 20257.481963744707)

seems reasonable