Permalink
Browse files

Fix Python tests; Change default timetype to timestamp; Clean up inte…

…rnal references (#33)
  • Loading branch information...
icexelloss committed Jul 24, 2018
1 parent 6ece042 commit 9a85f5dac871a427e3f94d526c8d8aa5df37d30e
@@ -18,7 +18,7 @@
import os
import sys
import collections
from tests.ts.base_test_case import BaseTestCase
from tests.base_test_case import BaseTestCase


class SparkTestCase(BaseTestCase):
@@ -42,7 +42,9 @@ def __setup(cls, options=None):

default_options = (SparkConf()
.setAppName(cls.__name__)
.setMaster("local"))
.setMaster("local")
.set("spark.sql.session.timeZone", "UTC")
.set("spark.flint.timetype", "timestamp"))
setattr(cls, '_env', dict(os.environ))
setattr(cls, '_path', list(sys.path))
options = collections.ChainMap(options, default_options)
@@ -55,6 +57,7 @@ def __setup(cls, options=None):

@classmethod
def __teardown(cls):
from pyspark import SparkContext
'''Shuts down spark and removes attributes sc, and sqlContext'''
cls.sc.stop()
cls.sc._gateway.shutdown()
@@ -480,37 +480,6 @@ def test_summarizeWindows(self):
], ["time", "id", "volume", "volume_sum"])
assert_same(new_pdf2, expected_pdf2)

interval_with_id = self.flintContext.read.pandas(make_pdf([
(1000, 3),
(1000, 7),
(1050, 3),
(1050, 7),
(1100, 3),
(1150, 3),
(1150, 7),
(1200, 3),
(1200, 7),
(1250, 7),
], ["time", "id"]))

new_pdf3 = (interval_with_id.summarizeWindows(w,
summarizers.sum("volume"),
key="id",
other=vol).toPandas())
expected_pdf3 = make_pdf([
(1000, 3, 200.0),
(1000, 7, 100.0),
(1050, 3, 500.0),
(1050, 7, 500.0),
(1100, 3, 800.0),
(1150, 3, 1200.0),
(1150, 7, 1400.0),
(1200, 3, 1600.0),
(1200, 7, 1800.0),
(1250, 7, 2200.0),
], ["time", "id", "volume_sum"])
assert_same(new_pdf3, expected_pdf3)

def test_summarizeWindows_udf(self):
from ts.flint import udf
from ts.flint import windows
@@ -520,126 +489,6 @@ def test_summarizeWindows_udf(self):
vol = self.vol()
w = windows.past_absolute_time('99s')

df = self.flintContext.read.pandas(make_pdf([
(1000, 3, 10.0),
(1000, 7, 20.0),
(1050, 3, 30.0),
(1050, 7, 40.0),
(1100, 3, 50.0),
(1150, 3, 60.0),
(1150, 7, 70.0),
(1200, 3, 80.0),
(1200, 7, 90.0),
(1250, 7, 100.0),
], ['time', 'id', 'v']))

result1 = df.summarizeWindows(
w,
OrderedDict([
('mean', udf(lambda time, window: window.mean(), DoubleType())(df['time'], vol['volume']))
]),
key="id",
other=vol).toPandas()
expected1 = make_pdf([
(1000, 3, 10.0, 200.0),
(1000, 7, 20.0, 100.0),
(1050, 3, 30.0, 250.0),
(1050, 7, 40.0, 250.0),
(1100, 3, 50.0, 400.0),
(1150, 3, 60.0, 600.0),
(1150, 7, 70.0, 700.0),
(1200, 3, 80.0, 800.0),
(1200, 7, 90.0, 900.0),
(1250, 7, 100.0, 1100.0),
], ['time', 'id', 'v', 'mean'])
assert_same(result1, expected1)

result2 = df.summarizeWindows(
w,
OrderedDict([
('mean', udf(lambda window: window.mean(), DoubleType())(vol['volume']))
]),
key='id',
other=vol).toPandas()
expected2 = expected1
assert_same(result2, expected2)

result3 = df.summarizeWindows(
w,
OrderedDict([
('mean', udf(lambda window: window.mean(), DoubleType())(vol['volume'])),
('count', udf(lambda time, window: len(window), LongType())(df['time'], vol['volume']))
]),
key='id',
other=vol).toPandas()
expected3 = make_pdf([
(1000, 3, 10.0, 200.0, 1),
(1000, 7, 20.0, 100.0, 1),
(1050, 3, 30.0, 250.0, 2),
(1050, 7, 40.0, 250.0, 2),
(1100, 3, 50.0, 400.0, 2),
(1150, 3, 60.0, 600.0, 2),
(1150, 7, 70.0, 700.0, 2),
(1200, 3, 80.0, 800.0, 2),
(1200, 7, 90.0, 900.0, 2),
(1250, 7, 100.0, 1100.0, 2),
], ['time', 'id', 'v', 'mean', 'count'])
assert_same(result3, expected3)

@udf('double')
def window_udf(time, window):
return (time - window.time).mean().seconds + window.volume.mean()

result4 = df.summarizeWindows(
w,
OrderedDict([
('mean', window_udf(df['time'], vol[['time', 'volume']])),
]),
key='id',
other=vol).toPandas()

expected4 = make_pdf([
(1000, 3, 10.0, 200.0),
(1000, 7, 20.0, 100.0),
(1050, 3, 30.0, 275.0),
(1050, 7, 40.0, 275.0),
(1100, 3, 50.0, 425.0),
(1150, 3, 60.0, 625.0),
(1150, 7, 70.0, 725.0),
(1200, 3, 80.0, 825.0),
(1200, 7, 90.0, 925.0),
(1250, 7, 100.0, 1125.0),
], ['time', 'id', 'v', 'mean'])
assert_same(result4, expected4)

@udf(DoubleType())
def foo5(row, window):
return (row[0] - window.time).mean().seconds + window.volume.mean()

result5 = df.summarizeWindows(
w,
OrderedDict([
('mean', foo5(df[['time', 'v']], vol[['time', 'volume']])),
]),
key='id',
other=vol).toPandas()
expected5 = expected4
assert_same(result5, expected5)

@udf((DoubleType(), LongType()))
def mean_and_count(v):
return v.mean(), len(v)

result6 = df.summarizeWindows(
w,
OrderedDict([
[('mean', 'count'), mean_and_count(vol['volume'])]
]),
key='id',
other=vol).toPandas()
expected6 = expected3
assert_same(result6, expected6)

@udf(DoubleType())
def mean(v):
return v.mean()
@@ -791,55 +640,6 @@ def mean_np_df_2(left, window):
expected4 = expected1
assert_same(result4, expected4)

result5 = df.summarizeWindows(
w,
{'mean': mean_np_2(df['v'], vol[['time', 'volume']])},
other = vol,
key ='id'
).toPandas()
expected5 = make_pdf([
(1000, 3, 10.0, 210.0),
(1000, 7, 20.0, 120.0),
(1050, 3, 30.0, 280.0),
(1050, 7, 40.0, 290.0),
(1100, 3, 50.0, 450.0),
(1150, 3, 60.0, 660.0),
(1150, 7, 70.0, 770.0),
(1200, 3, 80.0, 880.0),
(1200, 7, 90.0, 990.0),
(1250, 7, 100.0, 1200.0),
], ['time', 'id', 'v', 'mean'])
assert_same(result5, expected5)

result6 = df.summarizeWindows(
w,
{'mean': mean_np_df_2(df[['v']], vol[['time', 'volume']])},
other = vol,
key ='id'
).toPandas()
expected6 = result6
assert_same(result6, expected6)

result7 = df.summarizeWindows(
w,
{'mean': mean_np_df(vol[['time', 'volume']])},
other = vol,
key ='id'
).toPandas()
expected7 = make_pdf([
(1000, 3, 10.0, 200.0),
(1000, 7, 20.0, 100.0),
(1050, 3, 30.0, 250.0),
(1050, 7, 40.0, 250.0),
(1100, 3, 50.0, 400.0),
(1150, 3, 60.0, 600.0),
(1150, 7, 70.0, 700.0),
(1200, 3, 80.0, 800.0),
(1200, 7, 90.0, 900.0),
(1250, 7, 100.0, 1100.0),
], ['time', 'id', 'v', 'mean'])
assert_same(result7, expected7)

result8 = vol.summarizeWindows(
w,
{('mean', 'sum'): mean_and_sum_np(vol['volume'])},
@@ -1018,15 +818,6 @@ def test_shiftTime_windows(self):
result1 = dates.shiftTime(windows.future_absolute_time('1day')).toPandas()
assert_same(result1, expected1)

expected2= make_pdf([
(monday,),
(tuesday,),
(wednesday,),
(thrusday,),
], ['time'])
result2 = dates.shiftTime(windows.future_trading_time('1day', 'US')).toPandas()
assert_same(result2, expected2)

def test_uniform_clocks(self):
from ts.flint import clocks
df = clocks.uniform(self.sqlContext, '1d', '0s', '2016-11-07', '2016-11-17')
@@ -1037,7 +828,7 @@ def test_uniform_clocks(self):
def test_read_uniform_clock(self):
expected_exclusive = pd.date_range('20171116 12:00:05am',
tz='Asia/Tokyo', periods=2880,
freq='30s')
freq='30s').tz_convert("UTC").tz_localize(None)
actual_exclusive = (self.flintContext.read
.range('2017-11-16', '2017-11-17 12:00:05am',
'Asia/Tokyo')
@@ -1047,7 +838,7 @@ def test_read_uniform_clock(self):
assert np.all(expected_exclusive == actual_exclusive)

expected_inclusive = pd.date_range('20171116', periods=2881,
freq='30s')
freq='30s').tz_localize(None)
actual_inclusive = (self.flintContext.read
.range('2017-11-16', '2017-11-17')
.clock('uniform', '30s')
@@ -170,13 +170,14 @@ def test_read_dataframe_begin_end(self):
# Data goes from time 1000 to 1250
from tests.test_data import VOL_DATA
pdf = make_pdf(VOL_DATA, ['time', 'id', 'volume'])
pdf['time'] = pdf.time.astype('long')

df = self.sqlContext.createDataFrame(pdf)
begin_nanos, end_nanos = 1100, 1200
begin_nanos = 1100 * 1e9
end_nanos = 1200 * 1e9

df = self.flintContext.read.range(begin_nanos, end_nanos).dataframe(df)
expected_df = df.filter(df.time >= make_timestamp(begin_nanos)) \
.filter(df.time < make_timestamp(end_nanos))
expected_df = df.filter(df.time >= make_timestamp(begin_nanos, 'ns')) \
.filter(df.time < make_timestamp(end_nanos, 'ns'))
expected = expected_df.count()
assert(df.count() == expected)

@@ -40,8 +40,8 @@ def make_pdf(data, schema, dtypes=None):

return df

def make_timestamp(seconds):
return pd.to_datetime(seconds, unit='s')
def make_timestamp(value, unit='s'):
return pd.to_datetime(value, unit=unit)

def custom_isclose(a, b, rel_prec=1e-9):
return abs(a - b) <= rel_prec * max(abs(a), abs(b))
@@ -68,5 +68,5 @@ def assert_false(a, cmd=""):
assert not a, cmd

def get_test_base():
from tests.pypusa_test_case import PypusaTestCase
return PypusaTestCase
from tests.spark_test_case import SparkTestCase
return SparkTestCase
@@ -14,4 +14,4 @@
# limitations under the License.
#
spark.master local
spark.jars _FLINT_ROOT_/target/scala-2.11/flint-assembly-0.2.0-SNAPSHOT.jar
spark.jars _FLINT_ROOT_/target/scala-2.11/flint-assembly-0.6.0-SNAPSHOT.jar
Oops, something went wrong.

0 comments on commit 9a85f5d

Please sign in to comment.