Skip to content

Commit

Permalink
#299 mock pythesint in all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
akorosov committed Mar 20, 2018
1 parent 1297c1d commit 0427c91
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 313 deletions.
54 changes: 54 additions & 0 deletions nansat/tests/nansat_test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# ------------------------------------------------------------------------------
# Name: nansat_test_base.py
# Purpose: Basic class for Nansat tests
#
# Author: Anton Korosov
#
# Created: 20.03.2018
# Copyright: (c) NERSC
# Licence: This file is part of NANSAT. You can redistribute it or modify
# under the terms of GNU General Public License, v.3
# http://www.gnu.org/licenses/gpl-3.0.html
# ------------------------------------------------------------------------------
from __future__ import print_function, absolute_import, division
import os
import sys
import unittest
import tempfile
import pythesint

if sys.version_info.major == 2:
from mock import patch, PropertyMock, Mock, MagicMock, DEFAULT
else:
from unittest.mock import patch, PropertyMock, Mock, MagicMock, DEFAULT

from nansat.tests import nansat_test_data as ntd

class NansatTestBase(unittest.TestCase):
def setUp(self):
self.test_file_gcps = os.path.join(ntd.test_data_path, 'gcps.tif')
self.test_file_stere = os.path.join(ntd.test_data_path, 'stere.tif')
self.test_file_complex = os.path.join(ntd.test_data_path, 'complex.nc')
self.test_file_arctic = os.path.join(ntd.test_data_path, 'arctic.nc')
self.tmp_data_path = ntd.tmp_data_path
self.default_mapper = 'generic'
fd, self.tmpfilename = tempfile.mkstemp(suffix='.nc', dir=ntd.tmp_data_path)
os.close(fd)

if not os.path.exists(self.test_file_gcps):
raise ValueError('No test data available')
# Mock several Pythesint functions to avoid network connection
self.patcher = patch.multiple(pythesint, get_wkv_variable=DEFAULT,
get_gcmd_instrument=DEFAULT,
get_gcmd_platform=DEFAULT)
self.mock_pti = self.patcher.start()
self.mock_pti['get_gcmd_instrument'].return_value=dict(short_name='MODIS')
self.mock_pti['get_gcmd_platform'].return_value=dict(short_name='AQUA')
self.mock_pti['get_wkv_variable'].return_value=dict(short_name='swathmask')

def tearDown(self):
self.patcher.stop()
try:
os.unlink(self.tmpfilename)
except OSError:
pass
118 changes: 51 additions & 67 deletions nansat/tests/test_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
import os
import sys
import json
import time
import logging
import unittest
import warnings
import datetime
import tempfile
from xml.sax.saxutils import unescape
if sys.version_info.major == 2:
from mock import patch, PropertyMock, Mock, MagicMock, DEFAULT
Expand All @@ -43,54 +40,41 @@
from netCDF4 import Dataset

from nansat import Nansat, Domain, NSR

from nansat.tests import nansat_test_data as ntd

from nansat.warnings import NansatFutureWarning
from nansat.tests.nansat_test_base import NansatTestBase

warnings.simplefilter("always", NansatFutureWarning)
warnings.simplefilter("always", UserWarning)


class ExporterTest(unittest.TestCase):
def setUp(self):
self.test_file_gcps = os.path.join(ntd.test_data_path, 'gcps.tif')
self.test_file_stere = os.path.join(ntd.test_data_path, 'stere.tif')
self.test_file_complex = os.path.join(ntd.test_data_path, 'complex.nc')
self.test_file_arctic = os.path.join(ntd.test_data_path, 'arctic.nc')
fd, self.tmpfilename = tempfile.mkstemp(suffix='.nc', dir=ntd.tmp_data_path)
os.close(fd)

if not os.path.exists(self.test_file_gcps):
raise ValueError('No test data available')

class ExporterTest(NansatTestBase):
def test_geolocation_of_exportedNC_vs_original(self):
''' Lon/lat in original and exported file should coincide '''
orig = Nansat(self.test_file_gcps)
orig = Nansat(self.test_file_gcps, mapper=self.default_mapper)
orig.export(self.tmpfilename)

copy = Nansat(self.tmpfilename)
copy = Nansat(self.tmpfilename, mapper=self.default_mapper)
lon0, lat0 = orig.get_geolocation_grids()
lon1, lat1 = copy.get_geolocation_grids()
np.testing.assert_allclose(lon0, lon1)
np.testing.assert_allclose(lat0, lat1)

def test_special_characters_in_exported_metadata(self):
orig = Nansat(self.test_file_gcps)
orig = Nansat(self.test_file_gcps, mapper=self.default_mapper)
orig.vrt.dataset.SetMetadataItem('jsonstring', json.dumps({'meta1':
'hei', 'meta2': 'derr'}))
orig.export(self.tmpfilename)
copy = Nansat(self.tmpfilename)
copy = Nansat(self.tmpfilename, mapper=self.default_mapper)
dd = json.loads(unescape(copy.get_metadata('jsonstring'), {'"':
'"'}))
self.assertIsInstance(dd, dict)

def test_time_coverage_metadata_of_exported_equals_original(self):
orig = Nansat(self.test_file_gcps)
orig = Nansat(self.test_file_gcps, mapper=self.default_mapper)
orig.set_metadata('time_coverage_start', '2010-01-02T08:49:02.347809')
orig.set_metadata('time_coverage_end', '2010-01-02T08:50:03.599373')
orig.export(self.tmpfilename)
copy = Nansat(self.tmpfilename)
copy = Nansat(self.tmpfilename, mapper=self.default_mapper)

self.assertEqual(orig.get_metadata('time_coverage_start'),
copy.get_metadata('time_coverage_start'))
Expand All @@ -101,15 +85,15 @@ def test_export_netcdf(self):
''' Test export and following import of data with bands containing
np.nan values
'''
n = Nansat(self.test_file_gcps)
n = Nansat(self.test_file_gcps, mapper=self.default_mapper)
arrNoNaN = np.random.randn(n.shape()[0], n.shape()[1])
n.add_band(arrNoNaN, {'name': 'testBandNoNaN'})
arrWithNaN = arrNoNaN.copy()
arrWithNaN[int(n.shape()[0] / 2.) - 10:int(n.shape()[0] / 2 + 10),
int(n.shape()[1] / 2.) - 10:int(n.shape()[1] / 2 + 10)] = np.nan
n.add_band(arrWithNaN, {'name': 'testBandWithNaN'})
n.export(self.tmpfilename)
exported = Nansat(self.tmpfilename)
exported = Nansat(self.tmpfilename, mapper=self.default_mapper)
earrNoNaN = exported['testBandNoNaN']
# Use allclose to allow some roundoff errors
self.assertTrue(np.allclose(arrNoNaN, earrNoNaN))
Expand All @@ -118,17 +102,17 @@ def test_export_netcdf(self):

def test_export_gcps_filename_warning(self):
''' Should export file with GCPs and write correct bands'''
n0 = Nansat(self.test_file_gcps, log_level=40, mapper='generic')
tmpfilename = os.path.join(ntd.tmp_data_path, 'temp.nc')
n0 = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
tmpfilename = os.path.join(self.tmp_data_path, 'temp.nc')
with warnings.catch_warnings(record=True) as w:
n0.export(fileName=tmpfilename)
self.assertEqual(len(w), 1)
self.assertIn('Nansat.export(fileName', str(w[0].message))

def test_export_gcps_to_netcdf(self):
''' Should export file with GCPs and write correct bands'''
n0 = Nansat(self.test_file_gcps, log_level=40, mapper='generic')
tmpfilename = os.path.join(ntd.tmp_data_path, 'nansat_export_gcps.nc')
n0 = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
tmpfilename = os.path.join(self.tmp_data_path, 'nansat_export_gcps.nc')
n0.export(tmpfilename)

ncf = Dataset(tmpfilename)
Expand All @@ -138,7 +122,7 @@ def test_export_gcps_to_netcdf(self):
self.assertTrue('GCPPixel' in ncf.variables)
self.assertTrue('GCPLine' in ncf.variables)

n1 = Nansat(tmpfilename, mapper='generic')
n1 = Nansat(tmpfilename, mapper=self.default_mapper)
b0 = n0['L_469']
b1 = n1['L_469']
np.testing.assert_allclose(b0, b1)
Expand All @@ -150,13 +134,13 @@ def test_export_gcps_to_netcdf(self):

def test_export_gcps_complex_to_netcdf(self):
''' Should export file with GCPs and write correct complex bands'''
n0 = Nansat(self.test_file_gcps, log_level=40)
n0 = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
b0 = n0['L_469']

n1 = Nansat.from_domain(n0)
n1.add_band(b0.astype('complex64'), parameters={'name': 'L_469'})

tmpfilename = os.path.join(ntd.tmp_data_path, 'nansat_export_gcps_complex.nc')
tmpfilename = os.path.join(self.tmp_data_path, 'nansat_export_gcps_complex.nc')
n1.export(tmpfilename)

ncf = Dataset(tmpfilename)
Expand All @@ -166,7 +150,7 @@ def test_export_gcps_complex_to_netcdf(self):
self.assertTrue('GCPPixel' in ncf.variables)
self.assertTrue('GCPLine' in ncf.variables)

n2 = Nansat(tmpfilename)
n2 = Nansat(tmpfilename, mapper=self.default_mapper)
b2 = n2['L_469']

lon0, lat0 = n0.get_geolocation_grids()
Expand All @@ -175,53 +159,53 @@ def test_export_gcps_complex_to_netcdf(self):
np.testing.assert_allclose(lat0, lat2)

def test_export_gtiff(self):
n = Nansat(self.test_file_gcps, log_level=40)
tmpfilename = os.path.join(ntd.tmp_data_path, 'nansat_export.tif')
n = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
tmpfilename = os.path.join(self.tmp_data_path, 'nansat_export.tif')
n.export(tmpfilename, driver='GTiff')

self.assertTrue(os.path.exists(tmpfilename))

def test_export_band(self):
n = Nansat(self.test_file_gcps, log_level=40)
tmpfilename = os.path.join(ntd.tmp_data_path,
n = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_export_band.tif')
n.export(tmpfilename, bands=[1], driver='GTiff')
n = Nansat(tmpfilename, mapper='generic')
n = Nansat(tmpfilename, mapper=self.default_mapper)

self.assertTrue(os.path.exists(tmpfilename))
self.assertEqual(n.vrt.dataset.RasterCount, 1)

def test_export_band_by_name(self):
n = Nansat(self.test_file_gcps, log_level=40)
tmpfilename = os.path.join(ntd.tmp_data_path,
n = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_export_band.tif')
n.export(tmpfilename, bands=['L_645'], driver='GTiff')
n = Nansat(tmpfilename, mapper='generic')
n = Nansat(tmpfilename, mapper=self.default_mapper)

self.assertTrue(os.path.exists(tmpfilename))
self.assertEqual(n.vrt.dataset.RasterCount, 1)

def test_reproject_and_export_band(self):
n1 = Nansat(self.test_file_gcps, log_level=40)
n2 = Nansat(self.test_file_stere, log_level=40)
n1 = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
n2 = Nansat(self.test_file_stere, log_level=40, mapper=self.default_mapper)
n1.reproject(n2)
tmpfilename = os.path.join(ntd.tmp_data_path,
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_reproject_export_band.nc')
n1.export(tmpfilename, bands=[1])

n = Nansat(tmpfilename, mapper='generic')
n = Nansat(tmpfilename, mapper=self.default_mapper)
self.assertTrue(os.path.exists(tmpfilename))
self.assertEqual(n.vrt.dataset.RasterCount, 1)

def test_export_selected_bands(self):
n = Nansat(self.test_file_gcps)
n = Nansat(self.test_file_gcps, mapper=self.default_mapper)
resfile = 'tmp.nc'
new_band = np.random.randn(n.shape()[0], n.shape()[1])
n.add_band(new_band, {'name': 'newBand'})
# Test with band numbers
n.export(resfile, bands=[4, 2])
self.assertTrue(os.path.exists(resfile))
nn = Nansat(resfile)
nn = Nansat(resfile, mapper=self.default_mapper)
self.assertTrue(nn.has_band('newBand'))
self.assertTrue(nn.has_band('L_555'))
os.unlink(resfile)
Expand All @@ -233,14 +217,14 @@ def test_export_selected_bands(self):
# os.unlink(resfile)

def test_export_option(self):
n = Nansat(self.test_file_arctic)
tmpfilename = os.path.join(ntd.tmp_data_path,
n = Nansat(self.test_file_arctic, mapper=self.default_mapper)
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_export_option.nc')
# Test with band numbers
n.export(tmpfilename, options='WRITE_LONLAT=YES')
n.export(tmpfilename + '2', options=['WRITE_LONLAT=NO'])
nn = Nansat(tmpfilename, mapper='generic')
nn2 = Nansat(tmpfilename + '2', mapper='generic')
nn = Nansat(tmpfilename, mapper=self.default_mapper)
nn2 = Nansat(tmpfilename + '2', mapper=self.default_mapper)
self.assertTrue(nn.has_band('lon'))
self.assertTrue(nn.has_band('lat'))
self.assertTrue(nn.has_band('Bristol'))
Expand All @@ -249,8 +233,8 @@ def test_export_option(self):
self.assertTrue(nn2.has_band('Bristol'))

def test_export2thredds_arctic_long_lat(self):
n = Nansat(self.test_file_arctic, mapper='generic', log_level=40)
tmpfilename = os.path.join(ntd.tmp_data_path,
n = Nansat(self.test_file_arctic, mapper=self.default_mapper, log_level=40)
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_export2thredds_arctic.nc')
bands = {
'Bristol': {'type': '>i2'},
Expand Down Expand Up @@ -286,16 +270,16 @@ def test_export2thredds_arctic_long_lat(self):
'%s is wrong: %f'%(test_metadata_key, medata_value))

def test_dont_export2thredds_gcps(self):
n = Nansat(self.test_file_gcps, log_level=40)
n = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
n2 = Nansat.from_domain(n)
n.add_band(np.ones(n2.shape(), np.float32))
tmpfilename = os.path.join(ntd.tmp_data_path,
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_export2thredds.nc')
self.assertRaises(ValueError, n2.export2thredds, tmpfilename,
['L_645'])

def test_export2thredds_longlat_list(self):
n = Nansat(self.test_file_gcps, log_level=40)
n = Nansat(self.test_file_gcps, log_level=40, mapper=self.default_mapper)
with self.assertRaises(ValueError):
n.export2thredds('aa', ['L_469'])

Expand All @@ -307,7 +291,7 @@ def test_export2thredds_longlat_dict(self):
parameters={'name': 'L_469'})
n.set_metadata('time_coverage_start', '2016-01-19')

tmpfilename = os.path.join(ntd.tmp_data_path,
tmpfilename = os.path.join(self.tmp_data_path,
'nansat_export2thredds_longlat.nc')
n.export2thredds(tmpfilename, {'L_469': {'type': '>i1'}})
ncI = Dataset(tmpfilename, 'r')
Expand All @@ -317,49 +301,49 @@ def test_export2thredds_longlat_dict(self):


def test_export_netcdf_complex_remove_meta(self):
n = Nansat(self.test_file_complex)
n = Nansat(self.test_file_complex, mapper=self.default_mapper)
self.assertEqual(n.get_metadata('PRODUCT_TYPE'), 'SLC')
with warnings.catch_warnings(record=True) as recorded_warnings:
n.export(self.tmpfilename, rmMetadata=['PRODUCT_TYPE'])
self.assertEqual(recorded_warnings[0].category, NansatFutureWarning)
exported = Nansat(self.tmpfilename)
exported = Nansat(self.tmpfilename, mapper=self.default_mapper)
with self.assertRaises(ValueError):
exported.get_metadata('PRODUCT_TYPE')
self.assertTrue((n[1] == exported[1]).any())

def test_export_netcdf_arctic(self):
n = Nansat(self.test_file_arctic)
n = Nansat(self.test_file_arctic, mapper=self.default_mapper)
n.export(self.tmpfilename)
exported = Nansat(self.tmpfilename)
exported = Nansat(self.tmpfilename, mapper=self.default_mapper)
self.assertTrue((n[1] == exported[1]).any())
self.assertTrue((n[2] == exported[2]).any())
self.assertTrue((n[3] == exported[3]).any())

def test_export_netcdf_arctic_hardcopy(self):
n = Nansat(self.test_file_arctic)
n = Nansat(self.test_file_arctic, mapper=self.default_mapper)
n.export(self.tmpfilename, hardcopy=True)
exported = Nansat(self.tmpfilename)
exported = Nansat(self.tmpfilename, mapper=self.default_mapper)
self.assertTrue((n[1] == exported[1]).any())
self.assertTrue((n[2] == exported[2]).any())
self.assertTrue((n[3] == exported[3]).any())

@patch('nansat.exporter.VRT._add_geolocation')
def test_export_add_geoloc(self, mock_add_geolocation):
n = Nansat(self.test_file_arctic)
n = Nansat(self.test_file_arctic, mapper=self.default_mapper)
with warnings.catch_warnings(record=True) as recorded_warnings:
n.export(self.tmpfilename, addGeoloc=True)
self.assertEqual(recorded_warnings[0].category, NansatFutureWarning)
self.assertTrue(mock_add_geolocation.called)

def test_export_add_gcps(self):
n = Nansat(self.test_file_arctic)
n = Nansat(self.test_file_arctic, mapper=self.default_mapper)
with warnings.catch_warnings(record=True) as recorded_warnings:
n.export(self.tmpfilename, addGCPs=True, bottomup=True)
self.assertEqual(recorded_warnings[0].category, NansatFutureWarning)
self.assertEqual(recorded_warnings[1].category, NansatFutureWarning)

def test_export2thredds_rmmetadata(self):
n = Nansat(self.test_file_arctic, mapper='generic', log_level=40)
n = Nansat(self.test_file_arctic, mapper=self.default_mapper, log_level=40)
with warnings.catch_warnings(record=True) as recorded_warnings:
n.export2thredds(self.tmpfilename, {'Bristol': {'type': '>i2'}},
time=datetime.datetime(2016, 1, 20),
Expand Down

0 comments on commit 0427c91

Please sign in to comment.