In [1]:
print("hello")

hello


In [3]:
import pytest
from itertools import product
from collections import defaultdict

import numpy as np
from numpy import nan
import pandas as pd
from pandas.core import common as com
from pandas import DataFrame, MultiIndex, merge, concat, Series, compat
from pandas.util import testing as tm
from pandas.util.testing import assert_frame_equal, assert_series_equal
from pandas.core.sorting import (is_int64_overflow_possible,
                                 decons_group_index,
                                 get_group_index,
                                 nargsort,
                                 lexsort_indexer)


class TestSorting(tm.TestCase):

    @pytest.mark.slow
    def test_int64_overflow(self):

        B = np.concatenate((np.arange(1000), np.arange(1000), np.arange(500)))
        A = np.arange(2500)
        df = DataFrame({'A': A,
                        'B': B,
                        'C': A,
                        'D': B,
                        'E': A,
                        'F': B,
                        'G': A,
                        'H': B,
                        'values': np.random.randn(2500)})

        lg = df.groupby(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'])
        rg = df.groupby(['H', 'G', 'F', 'E', 'D', 'C', 'B', 'A'])

        left = lg.sum()['values']
        right = rg.sum()['values']

        exp_index, _ = left.index.sortlevel()
        tm.assert_index_equal(left.index, exp_index)

        exp_index, _ = right.index.sortlevel(0)
        tm.assert_index_equal(right.index, exp_index)

        tups = list(map(tuple, df[['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'
                                   ]].values))
        tups = com._asarray_tuplesafe(tups)

        expected = df.groupby(tups).sum()['values']

        for k, v in compat.iteritems(expected):
            assert left[k] == right[k[::-1]]
            assert left[k] == v
        assert len(left) == len(right)

    def test_int64_overflow_moar(self):

        # GH9096
        values = range(55109)
        data = pd.DataFrame.from_dict({'a': values,
                                       'b': values,
                                       'c': values,
                                       'd': values})
        grouped = data.groupby(['a', 'b', 'c', 'd'])
        assert len(grouped) == len(values)

        arr = np.random.randint(-1 << 12, 1 << 12, (1 << 15, 5))
        i = np.random.choice(len(arr), len(arr) * 4)
        arr = np.vstack((arr, arr[i]))  # add sume duplicate rows

        i = np.random.permutation(len(arr))
        arr = arr[i]  # shuffle rows

        df = DataFrame(arr, columns=list('abcde'))
        df['jim'], df['joe'] = np.random.randn(2, len(df)) * 10
        gr = df.groupby(list('abcde'))

        # verify this is testing what it is supposed to test!
        assert is_int64_overflow_possible(gr.grouper.shape)

        # mannually compute groupings
        jim, joe = defaultdict(list), defaultdict(list)
        for key, a, b in zip(map(tuple, arr), df['jim'], df['joe']):
            jim[key].append(a)
            joe[key].append(b)

        assert len(gr) == len(jim)
        mi = MultiIndex.from_tuples(jim.keys(), names=list('abcde'))

        def aggr(func):
            f = lambda a: np.fromiter(map(func, a), dtype='f8')
            arr = np.vstack((f(jim.values()), f(joe.values()))).T
            res = DataFrame(arr, columns=['jim', 'joe'], index=mi)
            return res.sort_index()

        assert_frame_equal(gr.mean(), aggr(np.mean))
        assert_frame_equal(gr.median(), aggr(np.median))

    def test_lexsort_indexer(self):
        keys = [[nan] * 5 + list(range(100)) + [nan] * 5]
        # orders=True, na_position='last'
        result = lexsort_indexer(keys, orders=True, na_position='last')
        exp = list(range(5, 105)) + list(range(5)) + list(range(105, 110))
        tm.assert_numpy_array_equal(result, np.array(exp, dtype=np.intp))

        # orders=True, na_position='first'
        result = lexsort_indexer(keys, orders=True, na_position='first')
        exp = list(range(5)) + list(range(105, 110)) + list(range(5, 105))
        tm.assert_numpy_array_equal(result, np.array(exp, dtype=np.intp))

        # orders=False, na_position='last'
        result = lexsort_indexer(keys, orders=False, na_position='last')
        exp = list(range(104, 4, -1)) + list(range(5)) + list(range(105, 110))
        tm.assert_numpy_array_equal(result, np.array(exp, dtype=np.intp))

        # orders=False, na_position='first'
        result = lexsort_indexer(keys, orders=False, na_position='first')
        exp = list(range(5)) + list(range(105, 110)) + list(range(104, 4, -1))
        tm.assert_numpy_array_equal(result, np.array(exp, dtype=np.intp))

    def test_nargsort(self):
        # np.argsort(items) places NaNs last
        items = [nan] * 5 + list(range(100)) + [nan] * 5
        # np.argsort(items2) may not place NaNs first
        items2 = np.array(items, dtype='O')

        try:
            # GH 2785; due to a regression in NumPy1.6.2
            np.argsort(np.array([[1, 2], [1, 3], [1, 2]], dtype='i'))
            np.argsort(items2, kind='mergesort')
        except TypeError:
            pytest.skip('requested sort not available for type')

        # mergesort is the most difficult to get right because we want it to be
        # stable.

        # According to numpy/core/tests/test_multiarray, """The number of
        # sorted items must be greater than ~50 to check the actual algorithm
        # because quick and merge sort fall over to insertion sort for small
        # arrays."""

        # mergesort, ascending=True, na_position='last'
        result = nargsort(items, kind='mergesort', ascending=True,
                          na_position='last')
        exp = list(range(5, 105)) + list(range(5)) + list(range(105, 110))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=True, na_position='first'
        result = nargsort(items, kind='mergesort', ascending=True,
                          na_position='first')
        exp = list(range(5)) + list(range(105, 110)) + list(range(5, 105))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=False, na_position='last'
        result = nargsort(items, kind='mergesort', ascending=False,
                          na_position='last')
        exp = list(range(104, 4, -1)) + list(range(5)) + list(range(105, 110))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=False, na_position='first'
        result = nargsort(items, kind='mergesort', ascending=False,
                          na_position='first')
        exp = list(range(5)) + list(range(105, 110)) + list(range(104, 4, -1))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=True, na_position='last'
        result = nargsort(items2, kind='mergesort', ascending=True,
                          na_position='last')
        exp = list(range(5, 105)) + list(range(5)) + list(range(105, 110))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=True, na_position='first'
        result = nargsort(items2, kind='mergesort', ascending=True,
                          na_position='first')
        exp = list(range(5)) + list(range(105, 110)) + list(range(5, 105))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=False, na_position='last'
        result = nargsort(items2, kind='mergesort', ascending=False,
                          na_position='last')
        exp = list(range(104, 4, -1)) + list(range(5)) + list(range(105, 110))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)

        # mergesort, ascending=False, na_position='first'
        result = nargsort(items2, kind='mergesort', ascending=False,
                          na_position='first')
        exp = list(range(5)) + list(range(105, 110)) + list(range(104, 4, -1))
        tm.assert_numpy_array_equal(result, np.array(exp), check_dtype=False)


class TestMerge(tm.TestCase):

    @pytest.mark.slow
    def test_int64_overflow_issues(self):

        # #2690, combinatorial explosion
        df1 = DataFrame(np.random.randn(1000, 7),
                        columns=list('ABCDEF') + ['G1'])
        df2 = DataFrame(np.random.randn(1000, 7),
                        columns=list('ABCDEF') + ['G2'])

        # it works!
        result = merge(df1, df2, how='outer')
        assert len(result) == 2000

        low, high, n = -1 << 10, 1 << 10, 1 << 20
        left = DataFrame(np.random.randint(low, high, (n, 7)),
                         columns=list('ABCDEFG'))
        left['left'] = left.sum(axis=1)

        # one-2-one match
        i = np.random.permutation(len(left))
        right = left.iloc[i].copy()
        right.columns = right.columns[:-1].tolist() + ['right']
        right.index = np.arange(len(right))
        right['right'] *= -1

        out = merge(left, right, how='outer')
        assert len(out) == len(left)
        assert_series_equal(out['left'], - out['right'], check_names=False)
        result = out.iloc[:, :-2].sum(axis=1)
        assert_series_equal(out['left'], result, check_names=False)
        assert result.name is None

        out.sort_values(out.columns.tolist(), inplace=True)
        out.index = np.arange(len(out))
        for how in ['left', 'right', 'outer', 'inner']:
            assert_frame_equal(out, merge(left, right, how=how, sort=True))

        # check that left merge w/ sort=False maintains left frame order
        out = merge(left, right, how='left', sort=False)
        assert_frame_equal(left, out[left.columns.tolist()])

        out = merge(right, left, how='left', sort=False)
        assert_frame_equal(right, out[right.columns.tolist()])

        # one-2-many/none match
        n = 1 << 11
        left = DataFrame(np.random.randint(low, high, (n, 7)).astype('int64'),
                         columns=list('ABCDEFG'))

        # confirm that this is checking what it is supposed to check
        shape = left.apply(Series.nunique).values
        assert is_int64_overflow_possible(shape)

        # add duplicates to left frame
        left = concat([left, left], ignore_index=True)

        right = DataFrame(np.random.randint(low, high, (n // 2, 7))
                          .astype('int64'),
                          columns=list('ABCDEFG'))

        # add duplicates & overlap with left to the right frame
        i = np.random.choice(len(left), n)
        right = concat([right, right, left.iloc[i]], ignore_index=True)

        left['left'] = np.random.randn(len(left))
        right['right'] = np.random.randn(len(right))

        # shuffle left & right frames
        i = np.random.permutation(len(left))
        left = left.iloc[i].copy()
        left.index = np.arange(len(left))

        i = np.random.permutation(len(right))
        right = right.iloc[i].copy()
        right.index = np.arange(len(right))

        # manually compute outer merge
        ldict, rdict = defaultdict(list), defaultdict(list)

        for idx, row in left.set_index(list('ABCDEFG')).iterrows():
            ldict[idx].append(row['left'])

        for idx, row in right.set_index(list('ABCDEFG')).iterrows():
            rdict[idx].append(row['right'])

        vals = []
        for k, lval in ldict.items():
            rval = rdict.get(k, [np.nan])
            for lv, rv in product(lval, rval):
                vals.append(k + tuple([lv, rv]))

        for k, rval in rdict.items():
            if k not in ldict:
                for rv in rval:
                    vals.append(k + tuple([np.nan, rv]))

        def align(df):
            df = df.sort_values(df.columns.tolist())
            df.index = np.arange(len(df))
            return df

        def verify_order(df):
            kcols = list('ABCDEFG')
            assert_frame_equal(df[kcols].copy(),
                               df[kcols].sort_values(kcols, kind='mergesort'))

        out = DataFrame(vals, columns=list('ABCDEFG') + ['left', 'right'])
        out = align(out)

        jmask = {'left': out['left'].notnull(),
                 'right': out['right'].notnull(),
                 'inner': out['left'].notnull() & out['right'].notnull(),
                 'outer': np.ones(len(out), dtype='bool')}

        for how in 'left', 'right', 'outer', 'inner':
            mask = jmask[how]
            frame = align(out[mask].copy())
            assert mask.all() ^ mask.any() or how == 'outer'

            for sort in [False, True]:
                res = merge(left, right, how=how, sort=sort)
                if sort:
                    verify_order(res)

                # as in GH9092 dtypes break with outer/right join
                assert_frame_equal(frame, align(res),
                                   check_dtype=how not in ('right', 'outer'))


def test_decons():

    def testit(label_list, shape):
        group_index = get_group_index(label_list, shape, sort=True, xnull=True)
        label_list2 = decons_group_index(group_index, shape)

        for a, b in zip(label_list, label_list2):
            assert (np.array_equal(a, b))

    shape = (4, 5, 6)
    label_list = [np.tile([0, 1, 2, 3, 0, 1, 2, 3], 100), np.tile(
        [0, 2, 4, 3, 0, 1, 2, 3], 100), np.tile(
            [5, 1, 0, 2, 3, 0, 5, 4], 100)]
    testit(label_list, shape)

    shape = (10000, 10000)
    label_list = [np.tile(np.arange(10000), 5), np.tile(np.arange(10000), 5)]
    testit(label_list, shape)

AttributeError: module 'pandas.util.testing' has no attribute 'TestCase'

In [6]:
import os
import locale
import codecs
import sys
from uuid import uuid4
from collections import OrderedDict

import pytest
from pandas.util._move import move_into_mutable_buffer, BadMove, stolenbuf
from pandas.util.decorators import deprecate_kwarg
from pandas.util.validators import (validate_args, validate_kwargs,
                                    validate_args_and_kwargs,
                                    validate_bool_kwarg)

import pandas.util.testing as tm

CURRENT_LOCALE = locale.getlocale()
LOCALE_OVERRIDE = os.environ.get('LOCALE_OVERRIDE', None)


class TestDecorators(tm.TestCase):

    def setUp(self):
        @deprecate_kwarg('old', 'new')
        def _f1(new=False):
            return new

        @deprecate_kwarg('old', 'new', {'yes': True, 'no': False})
        def _f2(new=False):
            return new

        @deprecate_kwarg('old', 'new', lambda x: x + 1)
        def _f3(new=0):
            return new

        self.f1 = _f1
        self.f2 = _f2
        self.f3 = _f3

    def test_deprecate_kwarg(self):
        x = 78
        with tm.assert_produces_warning(FutureWarning):
            result = self.f1(old=x)
        self.assertIs(result, x)
        with tm.assert_produces_warning(None):
            self.f1(new=x)

    def test_dict_deprecate_kwarg(self):
        x = 'yes'
        with tm.assert_produces_warning(FutureWarning):
            result = self.f2(old=x)
        self.assertEqual(result, True)

    def test_missing_deprecate_kwarg(self):
        x = 'bogus'
        with tm.assert_produces_warning(FutureWarning):
            result = self.f2(old=x)
        self.assertEqual(result, 'bogus')

    def test_callable_deprecate_kwarg(self):
        x = 5
        with tm.assert_produces_warning(FutureWarning):
            result = self.f3(old=x)
        self.assertEqual(result, x + 1)
        with pytest.raises(TypeError):
            self.f3(old='hello')

    def test_bad_deprecate_kwarg(self):
        with pytest.raises(TypeError):
            @deprecate_kwarg('old', 'new', 0)
            def f4(new=None):
                pass


def test_rands():
    r = tm.rands(10)
    assert(len(r) == 10)


def test_rands_array():
    arr = tm.rands_array(5, size=10)
    assert(arr.shape == (10,))
    assert(len(arr[0]) == 5)

    arr = tm.rands_array(7, size=(10, 10))
    assert(arr.shape == (10, 10))
    assert(len(arr[1, 1]) == 7)


class TestValidateArgs(tm.TestCase):
    fname = 'func'

    def test_bad_min_fname_arg_count(self):
        msg = "'max_fname_arg_count' must be non-negative"
        with tm.assertRaisesRegexp(ValueError, msg):
            validate_args(self.fname, (None,), -1, 'foo')

    def test_bad_arg_length_max_value_single(self):
        args = (None, None)
        compat_args = ('foo',)

        min_fname_arg_count = 0
        max_length = len(compat_args) + min_fname_arg_count
        actual_length = len(args) + min_fname_arg_count
        msg = (r"{fname}\(\) takes at most {max_length} "
               r"argument \({actual_length} given\)"
               .format(fname=self.fname, max_length=max_length,
                       actual_length=actual_length))

        with tm.assertRaisesRegexp(TypeError, msg):
            validate_args(self.fname, args,
                          min_fname_arg_count,
                          compat_args)

    def test_bad_arg_length_max_value_multiple(self):
        args = (None, None)
        compat_args = dict(foo=None)

        min_fname_arg_count = 2
        max_length = len(compat_args) + min_fname_arg_count
        actual_length = len(args) + min_fname_arg_count
        msg = (r"{fname}\(\) takes at most {max_length} "
               r"arguments \({actual_length} given\)"
               .format(fname=self.fname, max_length=max_length,
                       actual_length=actual_length))

        with tm.assertRaisesRegexp(TypeError, msg):
            validate_args(self.fname, args,
                          min_fname_arg_count,
                          compat_args)

    def test_not_all_defaults(self):
        bad_arg = 'foo'
        msg = ("the '{arg}' parameter is not supported "
               r"in the pandas implementation of {func}\(\)".
               format(arg=bad_arg, func=self.fname))

        compat_args = OrderedDict()
        compat_args['foo'] = 2
        compat_args['bar'] = -1
        compat_args['baz'] = 3

        arg_vals = (1, -1, 3)

        for i in range(1, 3):
            with tm.assertRaisesRegexp(ValueError, msg):
                validate_args(self.fname, arg_vals[:i], 2, compat_args)

    def test_validation(self):
        # No exceptions should be thrown
        validate_args(self.fname, (None,), 2, dict(out=None))

        compat_args = OrderedDict()
        compat_args['axis'] = 1
        compat_args['out'] = None

        validate_args(self.fname, (1, None), 2, compat_args)


class TestValidateKwargs(tm.TestCase):
    fname = 'func'

    def test_bad_kwarg(self):
        goodarg = 'f'
        badarg = goodarg + 'o'

        compat_args = OrderedDict()
        compat_args[goodarg] = 'foo'
        compat_args[badarg + 'o'] = 'bar'
        kwargs = {goodarg: 'foo', badarg: 'bar'}
        msg = (r"{fname}\(\) got an unexpected "
               r"keyword argument '{arg}'".format(
                   fname=self.fname, arg=badarg))

        with tm.assertRaisesRegexp(TypeError, msg):
            validate_kwargs(self.fname, kwargs, compat_args)

    def test_not_all_none(self):
        bad_arg = 'foo'
        msg = (r"the '{arg}' parameter is not supported "
               r"in the pandas implementation of {func}\(\)".
               format(arg=bad_arg, func=self.fname))

        compat_args = OrderedDict()
        compat_args['foo'] = 1
        compat_args['bar'] = 's'
        compat_args['baz'] = None

        kwarg_keys = ('foo', 'bar', 'baz')
        kwarg_vals = (2, 's', None)

        for i in range(1, 3):
            kwargs = dict(zip(kwarg_keys[:i],
                              kwarg_vals[:i]))

            with tm.assertRaisesRegexp(ValueError, msg):
                validate_kwargs(self.fname, kwargs, compat_args)

    def test_validation(self):
        # No exceptions should be thrown
        compat_args = OrderedDict()
        compat_args['f'] = None
        compat_args['b'] = 1
        compat_args['ba'] = 's'
        kwargs = dict(f=None, b=1)
        validate_kwargs(self.fname, kwargs, compat_args)

    def test_validate_bool_kwarg(self):
        arg_names = ['inplace', 'copy']
        invalid_values = [1, "True", [1, 2, 3], 5.0]
        valid_values = [True, False, None]

        for name in arg_names:
            for value in invalid_values:
                with tm.assertRaisesRegexp(ValueError,
                                           ("For argument \"%s\" expected "
                                            "type bool, received type %s") %
                                           (name, type(value).__name__)):
                    validate_bool_kwarg(value, name)

            for value in valid_values:
                assert validate_bool_kwarg(value, name) == value


class TestValidateKwargsAndArgs(tm.TestCase):
    fname = 'func'

    def test_invalid_total_length_max_length_one(self):
        compat_args = ('foo',)
        kwargs = {'foo': 'FOO'}
        args = ('FoO', 'BaZ')

        min_fname_arg_count = 0
        max_length = len(compat_args) + min_fname_arg_count
        actual_length = len(kwargs) + len(args) + min_fname_arg_count
        msg = (r"{fname}\(\) takes at most {max_length} "
               r"argument \({actual_length} given\)"
               .format(fname=self.fname, max_length=max_length,
                       actual_length=actual_length))

        with tm.assertRaisesRegexp(TypeError, msg):
            validate_args_and_kwargs(self.fname, args, kwargs,
                                     min_fname_arg_count,
                                     compat_args)

    def test_invalid_total_length_max_length_multiple(self):
        compat_args = ('foo', 'bar', 'baz')
        kwargs = {'foo': 'FOO', 'bar': 'BAR'}
        args = ('FoO', 'BaZ')

        min_fname_arg_count = 2
        max_length = len(compat_args) + min_fname_arg_count
        actual_length = len(kwargs) + len(args) + min_fname_arg_count
        msg = (r"{fname}\(\) takes at most {max_length} "
               r"arguments \({actual_length} given\)"
               .format(fname=self.fname, max_length=max_length,
                       actual_length=actual_length))

        with tm.assertRaisesRegexp(TypeError, msg):
            validate_args_and_kwargs(self.fname, args, kwargs,
                                     min_fname_arg_count,
                                     compat_args)

    def test_no_args_with_kwargs(self):
        bad_arg = 'bar'
        min_fname_arg_count = 2

        compat_args = OrderedDict()
        compat_args['foo'] = -5
        compat_args[bad_arg] = 1

        msg = (r"the '{arg}' parameter is not supported "
               r"in the pandas implementation of {func}\(\)".
               format(arg=bad_arg, func=self.fname))

        args = ()
        kwargs = {'foo': -5, bad_arg: 2}
        tm.assertRaisesRegexp(ValueError, msg,
                              validate_args_and_kwargs,
                              self.fname, args, kwargs,
                              min_fname_arg_count, compat_args)

        args = (-5, 2)
        kwargs = {}
        tm.assertRaisesRegexp(ValueError, msg,
                              validate_args_and_kwargs,
                              self.fname, args, kwargs,
                              min_fname_arg_count, compat_args)

    def test_duplicate_argument(self):
        min_fname_arg_count = 2
        compat_args = OrderedDict()
        compat_args['foo'] = None
        compat_args['bar'] = None
        compat_args['baz'] = None
        kwargs = {'foo': None, 'bar': None}
        args = (None,)  # duplicate value for 'foo'

        msg = (r"{fname}\(\) got multiple values for keyword "
               r"argument '{arg}'".format(fname=self.fname, arg='foo'))

        with tm.assertRaisesRegexp(TypeError, msg):
            validate_args_and_kwargs(self.fname, args, kwargs,
                                     min_fname_arg_count,
                                     compat_args)

    def test_validation(self):
        # No exceptions should be thrown
        compat_args = OrderedDict()
        compat_args['foo'] = 1
        compat_args['bar'] = None
        compat_args['baz'] = -2
        kwargs = {'baz': -2}
        args = (1, None)

        min_fname_arg_count = 2
        validate_args_and_kwargs(self.fname, args, kwargs,
                                 min_fname_arg_count,
                                 compat_args)


class TestMove(tm.TestCase):

    def test_cannot_create_instance_of_stolenbuffer(self):
        """Stolen buffers need to be created through the smart constructor
        ``move_into_mutable_buffer`` which has a bunch of checks in it.
        """
        msg = "cannot create 'pandas.util._move.stolenbuf' instances"
        with tm.assertRaisesRegexp(TypeError, msg):
            stolenbuf()

    def test_more_than_one_ref(self):
        """Test case for when we try to use ``move_into_mutable_buffer`` when
        the object being moved has other references.
        """
        b = b'testing'

        with pytest.raises(BadMove) as e:
            def handle_success(type_, value, tb):
                self.assertIs(value.args[0], b)
                return type(e).handle_success(e, type_, value, tb)  # super

            e.handle_success = handle_success
            move_into_mutable_buffer(b)

    def test_exactly_one_ref(self):
        """Test case for when the object being moved has exactly one reference.
        """
        b = b'testing'

        # We need to pass an expression on the stack to ensure that there are
        # not extra references hanging around. We cannot rewrite this test as
        #   buf = b[:-3]
        #   as_stolen_buf = move_into_mutable_buffer(buf)
        # because then we would have more than one reference to buf.
        as_stolen_buf = move_into_mutable_buffer(b[:-3])

        # materialize as bytearray to show that it is mutable
        self.assertEqual(bytearray(as_stolen_buf), b'test')

    @pytest.mark.skipif(
        sys.version_info[0] > 2,
        reason='bytes objects cannot be interned in py3',
    )
    def test_interned(self):
        salt = uuid4().hex

        def make_string():
            # We need to actually create a new string so that it has refcount
            # one. We use a uuid so that we know the string could not already
            # be in the intern table.
            return ''.join(('testing: ', salt))

        # This should work, the string has one reference on the stack.
        move_into_mutable_buffer(make_string())

        refcount = [None]  # nonlocal

        def ref_capture(ob):
            # Subtract two because those are the references owned by this
            # frame:
            #   1. The local variables of this stack frame.
            #   2. The python data stack of this stack frame.
            refcount[0] = sys.getrefcount(ob) - 2
            return ob

        with pytest.raises(BadMove):
            # If we intern the string it will still have one reference but now
            # it is in the intern table so if other people intern the same
            # string while the mutable buffer holds the first string they will
            # be the same instance.
            move_into_mutable_buffer(ref_capture(intern(make_string())))  # noqa

        self.assertEqual(
            refcount[0],
            1,
            msg='The BadMove was probably raised for refcount reasons instead'
            ' of interning reasons',
        )


def test_numpy_errstate_is_default():
    # The defaults since numpy 1.6.0
    expected = {'over': 'warn', 'divide': 'warn', 'invalid': 'warn',
                'under': 'ignore'}
    import numpy as np
    from pandas.compat import numpy  # noqa
    # The errstate should be unchanged after that import.
    assert np.geterr() == expected


class TestLocaleUtils(tm.TestCase):

    @classmethod
    def setUpClass(cls):
        super(TestLocaleUtils, cls).setUpClass()
        cls.locales = tm.get_locales()

        if not cls.locales:
            pytest.skip("No locales found")

        tm._skip_if_windows()

    @classmethod
    def tearDownClass(cls):
        super(TestLocaleUtils, cls).tearDownClass()
        del cls.locales

    def test_get_locales(self):
        # all systems should have at least a single locale
        assert len(tm.get_locales()) > 0

    def test_get_locales_prefix(self):
        if len(self.locales) == 1:
            pytest.skip("Only a single locale found, no point in "
                        "trying to test filtering locale prefixes")
        first_locale = self.locales[0]
        assert len(tm.get_locales(prefix=first_locale[:2])) > 0

    def test_set_locale(self):
        if len(self.locales) == 1:
            pytest.skip("Only a single locale found, no point in "
                        "trying to test setting another locale")

        if all(x is None for x in CURRENT_LOCALE):
            # Not sure why, but on some travis runs with pytest,
            # getlocale() returned (None, None).
            pytest.skip("CURRENT_LOCALE is not set.")

        if LOCALE_OVERRIDE is None:
            lang, enc = 'it_CH', 'UTF-8'
        elif LOCALE_OVERRIDE == 'C':
            lang, enc = 'en_US', 'ascii'
        else:
            lang, enc = LOCALE_OVERRIDE.split('.')

        enc = codecs.lookup(enc).name
        new_locale = lang, enc

        if not tm._can_set_locale(new_locale):
            with pytest.raises(locale.Error):
                with tm.set_locale(new_locale):
                    pass
        else:
            with tm.set_locale(new_locale) as normalized_locale:
                new_lang, new_enc = normalized_locale.split('.')
                new_enc = codecs.lookup(enc).name
                normalized_locale = new_lang, new_enc
                self.assertEqual(normalized_locale, new_locale)

        current_locale = locale.getlocale()
        self.assertEqual(current_locale, CURRENT_LOCALE)

ModuleNotFoundError: No module named 'pandas.util.validators'

In [9]:
# -*- coding: utf-8 -*-

import pytest

import numpy as np

from pandas import Series, Timestamp
from pandas.compat import range, lmap
import pandas.core.common as com
import pandas.util.testing as tm


def test_mut_exclusive():
    msg = "mutually exclusive arguments: '[ab]' and '[ab]'"
    with tm.assertRaisesRegexp(TypeError, msg):
        com._mut_exclusive(a=1, b=2)
    assert com._mut_exclusive(a=1, b=None) == 1
    assert com._mut_exclusive(major=None, major_axis=None) is None


def test_get_callable_name():
    from functools import partial
    getname = com._get_callable_name

    def fn(x):
        return x

    lambda_ = lambda x: x
    part1 = partial(fn)
    part2 = partial(part1)

    class somecall(object):

        def __call__(self):
            return x  # noqa

    assert getname(fn) == 'fn'
    assert getname(lambda_)
    assert getname(part1) == 'fn'
    assert getname(part2) == 'fn'
    assert getname(somecall()) == 'somecall'
    assert getname(1) is None


def test_any_none():
    assert (com._any_none(1, 2, 3, None))
    assert (not com._any_none(1, 2, 3, 4))


def test_all_not_none():
    assert (com._all_not_none(1, 2, 3, 4))
    assert (not com._all_not_none(1, 2, 3, None))
    assert (not com._all_not_none(None, None, None, None))


def test_iterpairs():
    data = [1, 2, 3, 4]
    expected = [(1, 2), (2, 3), (3, 4)]

    result = list(com.iterpairs(data))

    assert (result == expected)


def test_split_ranges():
    def _bin(x, width):
        "return int(x) as a base2 string of given width"
        return ''.join(str((x >> i) & 1) for i in range(width - 1, -1, -1))

    def test_locs(mask):
        nfalse = sum(np.array(mask) == 0)

        remaining = 0
        for s, e in com.split_ranges(mask):
            remaining += e - s

            assert 0 not in mask[s:e]

        # make sure the total items covered by the ranges are a complete cover
        assert remaining + nfalse == len(mask)

    # exhaustively test all possible mask sequences of length 8
    ncols = 8
    for i in range(2 ** ncols):
        cols = lmap(int, list(_bin(i, ncols)))  # count up in base2
        mask = [cols[i] == 1 for i in range(len(cols))]
        test_locs(mask)

    # base cases
    test_locs([])
    test_locs([0])
    test_locs([1])


def test_map_indices_py():
    data = [4, 3, 2, 1]
    expected = {4: 0, 3: 1, 2: 2, 1: 3}

    result = com.map_indices_py(data)

    assert (result == expected)


def test_union():
    a = [1, 2, 3]
    b = [4, 5, 6]

    union = sorted(com.union(a, b))

    assert ((a + b) == union)


def test_difference():
    a = [1, 2, 3]
    b = [1, 2, 3, 4, 5, 6]

    inter = sorted(com.difference(b, a))

    assert ([4, 5, 6] == inter)


def test_intersection():
    a = [1, 2, 3]
    b = [1, 2, 3, 4, 5, 6]

    inter = sorted(com.intersection(a, b))

    assert (a == inter)


def test_groupby():
    values = ['foo', 'bar', 'baz', 'baz2', 'qux', 'foo3']
    expected = {'f': ['foo', 'foo3'],
                'b': ['bar', 'baz', 'baz2'],
                'q': ['qux']}

    grouped = com.groupby(values, lambda x: x[0])

    for k, v in grouped:
        assert v == expected[k]


def test_random_state():
    import numpy.random as npr
    # Check with seed
    state = com._random_state(5)
    assert state.uniform() == npr.RandomState(5).uniform()

    # Check with random state object
    state2 = npr.RandomState(10)
    assert (com._random_state(state2).uniform() ==
            npr.RandomState(10).uniform())

    # check with no arg random state
    assert com._random_state() is np.random

    # Error for floats or strings
    with pytest.raises(ValueError):
        com._random_state('test')

    with pytest.raises(ValueError):
        com._random_state(5.5)


def test_maybe_match_name():

    matched = com._maybe_match_name(
        Series([1], name='x'), Series(
            [2], name='x'))
    assert (matched == 'x')

    matched = com._maybe_match_name(
        Series([1], name='x'), Series(
            [2], name='y'))
    assert (matched is None)

    matched = com._maybe_match_name(Series([1]), Series([2], name='x'))
    assert (matched is None)

    matched = com._maybe_match_name(Series([1], name='x'), Series([2]))
    assert (matched is None)

    matched = com._maybe_match_name(Series([1], name='x'), [2])
    assert (matched == 'x')

    matched = com._maybe_match_name([1], Series([2], name='y'))
    assert (matched == 'y')


def test_dict_compat():
    data_datetime64 = {np.datetime64('1990-03-15'): 1,
                       np.datetime64('2015-03-15'): 2}
    data_unchanged = {1: 2, 3: 4, 5: 6}
    expected = {Timestamp('1990-3-15'): 1, Timestamp('2015-03-15'): 2}
    assert (com._dict_compat(data_datetime64) == expected)
    assert (com._dict_compat(expected) == expected)
    assert (com._dict_compat(data_unchanged) == data_unchanged)