Skip to content

Commit

Permalink
Merge pull request #1 from IntelPython/master
Browse files Browse the repository at this point in the history
[BUG] Fixed problems with generation parquet files (IntelPython#93)
  • Loading branch information
kozlov-alexey committed Jul 18, 2019
2 parents c6397f4 + 37a5ba8 commit 793d66e
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 119 deletions.
209 changes: 118 additions & 91 deletions hpat/tests/gen_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,102 +5,129 @@
import pandas as pd


class ParquetGenerator:
GEN_KDE_PQ_CALLED = False
GEN_PQ_TEST_CALLED = False

@classmethod
def gen_kde_pq(cls, file_name='kde.parquet', N=101):
if not cls.GEN_KDE_PQ_CALLED:
df = pd.DataFrame({'points': np.random.random(N)})
table = pa.Table.from_pandas(df)
row_group_size = 128
pq.write_table(table, file_name, row_group_size)
cls.GEN_KDE_PQ_CALLED = True

@classmethod
def gen_pq_test(cls):
if not cls.GEN_PQ_TEST_CALLED:
df = pd.DataFrame(
{
'one': [-1, np.nan, 2.5, 3., 4., 6., 10.0],
'two': ['foo', 'bar', 'baz', 'foo', 'bar', 'baz', 'foo'],
'three': [True, False, True, True, True, False, False],
# float without NA
'four': [-1, 5.1, 2.5, 3., 4., 6., 11.0],
# str with NA
'five': ['foo', 'bar', 'baz', None, 'bar', 'baz', 'foo'],
}
)
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example.parquet')
pq.write_table(table, 'example2.parquet', row_group_size=2)
cls.GEN_PQ_TEST_CALLED = True


def generate_spark_data():
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType, StructField, DateType, TimestampType)

# test datetime64, spark dates
dt1 = pd.DatetimeIndex(['2017-03-03 03:23',
'1990-10-23', '1993-07-02 10:33:01'])
df = pd.DataFrame({'DT64': dt1, 'DATE': dt1.copy()})
df.to_parquet('pandas_dt.pq')

spark = SparkSession.builder.appName("GenSparkData").getOrCreate()
schema = StructType([StructField('DT64', DateType(), True),
StructField('DATE', TimestampType(), True)])
sdf = spark.createDataFrame(df, schema)
sdf.write.parquet('sdf_dt.pq', 'overwrite')

spark.stop()


def gen_lr(file_name, N, D):
points = np.random.random((N,D))
points = np.random.random((N, D))
responses = np.random.random(N)
f = h5py.File(file_name, "w")
dset1 = f.create_dataset("points", (N,D), dtype='f8')
dset1 = f.create_dataset("points", (N, D), dtype='f8')
dset1[:] = points
dset2 = f.create_dataset("responses", (N,), dtype='f8')
dset2[:] = responses
f.close()

def gen_kde_pq(file_name, N):
df = pd.DataFrame({'points': np.random.random(N)})
table = pa.Table.from_pandas(df)
row_group_size = 128
pq.write_table(table, file_name, row_group_size)

def gen_pq_test(file_name):
df = pd.DataFrame({'one': [-1, np.nan, 2.5, 3., 4., 6., 10.0],
'two': ['foo', 'bar', 'baz', 'foo', 'bar', 'baz', 'foo'],
'three': [True, False, True, True, True, False, False],
'four': [-1, 5.1, 2.5, 3., 4., 6., 11.0], # float without NA
'five': ['foo', 'bar', 'baz', None, 'bar', 'baz', 'foo'], # str with NA
})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example.parquet')
pq.write_table(table, 'example2.parquet', row_group_size=2)

N = 101
D = 10
gen_lr("lr.hdf5", N, D)

arr = np.arange(N)
f = h5py.File("test_group_read.hdf5", "w")
g1 = f.create_group("G")
dset1 = g1.create_dataset("data", (N,), dtype='i8')
dset1[:] = arr
f.close()

gen_kde_pq('kde.parquet', N)
gen_pq_test('example.parquet')

df = pd.DataFrame({'A': ['bc']+["a"]*3+ ["bc"]*3+['a'], 'B': [-8,1,2,3,1,5,6,7]})
df.to_parquet("groupby3.pq")

df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
"bar", "bar", "bar", "bar"],
"B": ["one", "one", "one", "two", "two",
"one", "one", "two", "two"],
"C": ["small", "large", "large", "small",
"small", "large", "small", "small",
"large"],
"D": [1, 2, 2, 6, 3, 4, 5, 6, 9]})
df.to_parquet("pivot2.pq")

# test datetime64, spark dates
dt1 = pd.DatetimeIndex(['2017-03-03 03:23', '1990-10-23', '1993-07-02 10:33:01'])
df = pd.DataFrame({'DT64': dt1, 'DATE': dt1.copy()})
df.to_parquet('pandas_dt.pq')

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, TimestampType

spark = SparkSession.builder.appName("GenSparkData").getOrCreate()
schema = StructType([StructField('DT64', DateType(), True), StructField('DATE', TimestampType(), True)])
sdf = spark.createDataFrame(df, schema)
sdf.write.parquet('sdf_dt.pq', 'overwrite')

spark.stop()

# CSV reader test
data = ("0,2.3,4.6,47736\n"
"1,2.3,4.6,47736\n"
"2,2.3,4.6,47736\n"
"4,2.3,4.6,47736\n")

with open("csv_data1.csv", "w") as f:
f.write(data)


with open("csv_data_infer1.csv", "w") as f:
f.write('A,B,C,D\n'+data)

data = ("0,2.3,2015-01-03,47736\n"
"1,2.3,1966-11-13,47736\n"
"2,2.3,1998-05-21,47736\n"
"4,2.3,2018-07-11,47736\n")

with open("csv_data_date1.csv", "w") as f:
f.write(data)

# generated data for parallel merge_asof testing
df1 = pd.DataFrame({'time': pd.DatetimeIndex(
['2017-01-03', '2017-01-06', '2017-02-15', '2017-02-21']),
'B': [4, 5, 9, 6]})
df2 = pd.DataFrame({'time': pd.DatetimeIndex(
['2017-01-01', '2017-01-14', '2017-01-16', '2017-02-23', '2017-02-23',
'2017-02-25']), 'A': [2,3,7,8,9,10]})
df1.to_parquet("asof1.pq")
df2.to_parquet("asof2.pq")

def generate_other_data():
N = 101
D = 10
gen_lr("lr.hdf5", N, D)

arr = np.arange(N)
f = h5py.File("test_group_read.hdf5", "w")
g1 = f.create_group("G")
dset1 = g1.create_dataset("data", (N,), dtype='i8')
dset1[:] = arr
f.close()

df = pd.DataFrame({'A': ['bc']+["a"]*3+ ["bc"]*3+['a'], 'B': [-8,1,2,3,1,5,6,7]})
df.to_parquet("groupby3.pq")

df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
"bar", "bar", "bar", "bar"],
"B": ["one", "one", "one", "two", "two",
"one", "one", "two", "two"],
"C": ["small", "large", "large", "small",
"small", "large", "small", "small",
"large"],
"D": [1, 2, 2, 6, 3, 4, 5, 6, 9]})
df.to_parquet("pivot2.pq")

# CSV reader test
data = ("0,2.3,4.6,47736\n"
"1,2.3,4.6,47736\n"
"2,2.3,4.6,47736\n"
"4,2.3,4.6,47736\n")

with open("csv_data1.csv", "w") as f:
f.write(data)

with open("csv_data_infer1.csv", "w") as f:
f.write('A,B,C,D\n'+data)

data = ("0,2.3,2015-01-03,47736\n"
"1,2.3,1966-11-13,47736\n"
"2,2.3,1998-05-21,47736\n"
"4,2.3,2018-07-11,47736\n")

with open("csv_data_date1.csv", "w") as f:
f.write(data)

# generated data for parallel merge_asof testing
df1 = pd.DataFrame({'time': pd.DatetimeIndex(
['2017-01-03', '2017-01-06', '2017-02-15', '2017-02-21']),
'B': [4, 5, 9, 6]})
df2 = pd.DataFrame({'time': pd.DatetimeIndex(
['2017-01-01', '2017-01-14', '2017-01-16', '2017-02-23', '2017-02-23',
'2017-02-25']), 'A': [2,3,7,8,9,10]})
df1.to_parquet("asof1.pq")
df2.to_parquet("asof2.pq")


if __name__ == "__main__":
print('generation phase')
ParquetGenerator.gen_kde_pq()
ParquetGenerator.gen_pq_test()
generate_spark_data()
generate_other_data()
24 changes: 14 additions & 10 deletions hpat/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from hpat.tests.test_utils import (count_array_REPs, count_parfor_REPs,
count_parfor_OneDs, count_array_OneDs, dist_IR_contains, get_start_end)

from .gen_test_data import ParquetGenerator


@hpat.jit
def inner_get_column(df):
Expand Down Expand Up @@ -579,10 +581,10 @@ def test_impl(df):
hpat_func = hpat.jit(test_impl)
self.assertTrue((hpat_func(df) == sorted_df.B.values).all())

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
def test_sort_parallel_single_col(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

# TODO: better parallel sort test
def test_impl():
df = pd.read_parquet('kde.parquet')
Expand All @@ -596,14 +598,15 @@ def test_impl():
try:
hpat.hiframes.sort.MIN_SAMPLES = 10
res = hpat_func()
self.assertTrue((np.diff(res)>=0).all())
self.assertTrue((np.diff(res) >= 0).all())
finally:
hpat.hiframes.sort.MIN_SAMPLES = save_min_samples # restore global val
# restore global val
hpat.hiframes.sort.MIN_SAMPLES = save_min_samples

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
def test_sort_parallel(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

# TODO: better parallel sort test
def test_impl():
df = pd.read_parquet('kde.parquet')
Expand All @@ -618,9 +621,10 @@ def test_impl():
try:
hpat.hiframes.sort.MIN_SAMPLES = 10
res = hpat_func()
self.assertTrue((np.diff(res)>=0).all())
self.assertTrue((np.diff(res) >= 0).all())
finally:
hpat.hiframes.sort.MIN_SAMPLES = save_min_samples # restore global val
# restore global val
hpat.hiframes.sort.MIN_SAMPLES = save_min_samples

def test_itertuples(self):
def test_impl(df):
Expand Down
51 changes: 36 additions & 15 deletions hpat/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from hpat.tests.test_utils import (count_array_REPs, count_parfor_REPs,
count_parfor_OneDs, count_array_OneDs, dist_IR_contains, get_start_end)

from .gen_test_data import ParquetGenerator

_cov_corr_series = [(pd.Series(x), pd.Series(y)) for x, y in [
(
[np.nan, -2., 3., 9.1],
Expand Down Expand Up @@ -1156,10 +1158,18 @@ def test_impl(S):
S = pd.Series([1.0, np.nan, 3.0, 2.0, np.nan, 4.0])
np.testing.assert_array_equal(hpat_func(S).values, test_impl(S).values)

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
@unittest.skip('AssertionError - fix needed\n'
'Arrays are not equal\n'
'Mismatch: 100%\n'
'Max absolute difference: 0.04361003\n'
'Max relative difference: 9.04840049\n'
'x: array([0.04843 , 0.05106 , 0.057625, 0.0671 ])\n'
'y: array([0.00482 , 0.04843 , 0.05106 , 0.057625])\n'
'NUMA_PES=3 build')
def test_series_nlargest_parallel1(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

def test_impl():
df = pq.read_table('kde.parquet').to_pandas()
S = df.points
Expand Down Expand Up @@ -1196,10 +1206,18 @@ def test_impl(S):
S = pd.Series([1.0, np.nan, 3.0, 2.0, np.nan, 4.0])
np.testing.assert_array_equal(hpat_func(S).values, test_impl(S).values)

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
@unittest.skip('AssertionError - fix needed\n'
'Arrays are not equal\n'
'Mismatch: 50%\n'
'Max absolute difference: 0.01813261\n'
'Max relative difference: 0.50757593\n'
'x: array([0.007431, 0.024095, 0.035724, 0.053857])\n'
'y: array([0.007431, 0.024095, 0.031374, 0.035724])\n'
'NUMA_PES=3 build')
def test_series_nsmallest_parallel1(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

def test_impl():
df = pq.read_table('kde.parquet').to_pandas()
S = df.points
Expand Down Expand Up @@ -1282,10 +1300,13 @@ def test_impl(S):
S = pd.Series(np.random.ranf(m))
self.assertEqual(hpat_func(S), test_impl(S))

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
@unittest.skip('AssertionError - fix needed\n'
'nan != 0.45894510159707225\n'
'NUMA_PES=3 build')
def test_series_median_parallel1(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

def test_impl():
df = pq.read_table('kde.parquet').to_pandas()
S = df.points
Expand All @@ -1294,10 +1315,10 @@ def test_impl():
hpat_func = hpat.jit(test_impl)
self.assertEqual(hpat_func(), test_impl())

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
def test_series_argsort_parallel(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

def test_impl():
df = pq.read_table('kde.parquet').to_pandas()
S = df.points
Expand Down Expand Up @@ -1350,10 +1371,10 @@ def test_impl(A, B):
hpat_func = hpat.jit(test_impl)
pd.testing.assert_series_equal(hpat_func(A, B), test_impl(A, B))

@unittest.skip('OSError - fix needed\n'
'Failed in hpat mode pipeline (step: convert DataFrames)\n'
'Passed non-file path: kde.parquet\n')
def test_series_sort_values_parallel1(self):
# create `kde.parquet` file
ParquetGenerator.gen_kde_pq()

def test_impl():
df = pq.read_table('kde.parquet').to_pandas()
S = df.points
Expand Down
Loading

0 comments on commit 793d66e

Please sign in to comment.