Skip to content

Commit

Permalink
Merge pull request #174 from jbouffard/fastavro
Browse files Browse the repository at this point in the history
FastAvro
  • Loading branch information
Jacob Bouffard committed May 5, 2017
2 parents 5354b95 + d6a43a9 commit d5fff9f
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 47 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
Expand Down
2 changes: 1 addition & 1 deletion docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ WHL := archives/geopyspark-0.1.0-py3-none-any.whl
JAR := archives/geotrellis-backend-assembly-0.1.0.jar
GDAL-BLOB := archives/gdal-and-friends.tar.gz
PYTHON-BLOB := archives/geopyspark-and-friends.tar.gz
VERSION := 1
VERSION := 2
STAGE0 := jamesmcclain/jupyter-geopyspark:stage0
STAGE1 := quay.io/geodocker/jupyter-geopyspark:$(VERSION)

Expand Down
8 changes: 4 additions & 4 deletions docker/config/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
affine==2.0.0.post1
alembic==0.8.9
appdirs==1.4.3
avro-python3==1.8.1
backports-abc==0.5
bleach==1.5.0
click==6.7
Expand All @@ -10,14 +9,18 @@ cligj==0.4.0
cycler==0.10.0
decorator==4.0.10
entrypoints==0.2.2
fastavro==0.13.0
Fiona==1.7.1
Flask==0.12.1
Flask-Cors==3.0.2
GDAL==2.1.3
geopyspark==0.1.0
html5lib==0.9999999
ipykernel==4.7.0.dev0
ipython==5.1.0
ipython-genutils==0.1.0
ipywidgets==6.0.0
itsdangerous==0.24
Jinja2==2.9.4
jsonschema==2.5.1
jupyter-client==4.4.0
Expand Down Expand Up @@ -67,6 +70,3 @@ traitlets==4.3.1
virtualenv==13.1.2
wcwidth==0.1.7
widgetsnbextension==2.0.0
itsdangerous==0.24
Flask==0.12.1
Flask-Cors==3.0.2
2 changes: 1 addition & 1 deletion docker/scripts/gather-libs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ tar axvf /archives/gdal-and-friends.tar.gz
cd $HOME/
chown -R root:root $HOME/.cache/pip
pip3 install --user appdirs==1.4.3
pip3 install --user avro-python3==1.8.1
pip3 install --user fastavro==0.13.0
pip3 install --user numpy==1.12.1
pip3 install --user pyparsing==2.2.0
pip3 install --user six==1.10.0
Expand Down
13 changes: 13 additions & 0 deletions geopyspark/avroregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def tuple_decoder(schema_dict, key_decoder=None, value_decoder=None):
def _get_decoder(cls, name):
if name == "Tile":
return cls.tile_decoder
elif name == 'Projected':
pass
else:
raise Exception("Could not find value type that matches", name)

Expand Down Expand Up @@ -154,6 +156,15 @@ def create_dict(index):

return {'bands': tile_datums}

@classmethod
def projected_extent_encoder(cls, obj):
if obj.get('epsg'):
obj['proj4'] = 'null'
else:
obj['epsg'] = 'null'

return obj

@staticmethod
def tuple_encoder(obj, key_encoder=None, value_encoder=None):
"""Encodes a tuple to send to scala..
Expand Down Expand Up @@ -212,5 +223,7 @@ def create_partial_tuple_encoder(cls, key_type=None, value_type=None):
def _get_encoder(cls, name):
if name == "Tile":
return cls.tile_encoder
elif name == "Projected":
return cls.projected_extent_encoder
else:
raise Exception("Could not find value type that matches", name)
35 changes: 4 additions & 31 deletions geopyspark/avroserializer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""The class which serializes/deserializes values in a RDD to/from python."""
import io
import avro
import avro.io
from fastavro import schemaless_writer, schemaless_reader

from pyspark.serializers import Serializer, FramedSerializer

Expand Down Expand Up @@ -39,43 +38,21 @@ def __init__(self,
else:
self.encoding_method = None

@property
def schema(self):
"""The parsed AvroSchema."""
return avro.schema.Parse(self.schema_string)

@property
def schema_name(self):
"""The name of the schema."""
return self.schema().name

@property
def schema_dict(self):
"""The schema values in a dict."""
import json

return json.loads(self.schema_string)

@property
def reader(self):
"""The reader function used to read values in the RDD."""
return avro.io.DatumReader(self.schema)

@property
def datum_writer(self):
"""The write function used to serialize values in the RDD."""
return avro.io.DatumWriter(self.schema)

def _dumps(self, obj):
bytes_writer = io.BytesIO()

encoder = avro.io.BinaryEncoder(bytes_writer)

if self.encoding_method:
datum = self.encoding_method(obj)
self.datum_writer.write(datum, encoder)
schemaless_writer(bytes_writer, self.schema_dict, datum)
else:
self.datum_writer.write(obj, encoder)
schemaless_writer(bytes_writer, self.schema_dict, datum)

return bytes_writer.getvalue()

Expand Down Expand Up @@ -109,13 +86,9 @@ def loads(self, obj):
"""

buf = io.BytesIO(obj)

decoder = avro.io.BinaryDecoder(buf)
schema_dict = self.reader.read(decoder)
schema_dict = schemaless_reader(buf, self.schema_dict)

if self.decoding_method:
return [self.decoding_method(schema_dict)]
else:
return [schema_dict]


2 changes: 0 additions & 2 deletions geopyspark/geopyspark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def add_pyspark_path():
raise ValueError("Could not find the py4j zip in", path.join(pyspark_home, 'python', 'lib'))



def setup_environment():
"""Sets up various environment variables that are needed to run GeoPySpark.
Expand All @@ -52,7 +51,6 @@ def setup_environment():
path.abspath(path.join(os.getcwd(), 'jars/')),
path.abspath(path.join(os.getcwd(), '../geopyspark/jars/'))
]

possible_jars = [path.join(prefix, '*.jar') for prefix in local_prefixes]
jar = path.abspath(resource_filename('geopyspark.jars', JAR_FILE))
jar_dir = os.path.dirname(jar)
Expand Down
8 changes: 4 additions & 4 deletions geopyspark/geotrellis/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def from_numpy_rdd(cls, geopysc, rdd_type, numpy_rdd):
key = geopysc.map_key_input(rdd_type, False)

schema = geopysc.create_schema(key)
ser = geopysc.create_tuple_serializer(schema, key_type=None, value_type=TILE)
ser = geopysc.create_tuple_serializer(schema, key_type="Projected", value_type=TILE)
reserialized_rdd = numpy_rdd._reserialize(ser)

if rdd_type == SPATIAL:
Expand All @@ -122,7 +122,7 @@ def to_numpy_rdd(self):
"""

result = self.srdd.toAvroRDD()
ser = self.geopysc.create_tuple_serializer(result._2(), value_type=TILE)
ser = self.geopysc.create_tuple_serializer(result._2(), key_type="Projected", value_type=TILE)
return self.geopysc.create_python_rdd(result._1(), ser)

def to_tiled_layer(self, extent=None, layout=None, crs=None, tile_size=256,
Expand Down Expand Up @@ -345,7 +345,7 @@ def from_numpy_rdd(cls, geopysc, rdd_type, numpy_rdd, metadata):
key = geopysc.map_key_input(rdd_type, True)

schema = geopysc.create_schema(key)
ser = geopysc.create_tuple_serializer(schema, key_type=None, value_type=TILE)
ser = geopysc.create_tuple_serializer(schema, key_type="Projected", value_type=TILE)
reserialized_rdd = numpy_rdd._reserialize(ser)

if rdd_type == SPATIAL:
Expand Down Expand Up @@ -424,7 +424,7 @@ def to_numpy_rdd(self):
RDD
"""
result = self.srdd.toAvroRDD()
ser = self.geopysc.create_tuple_serializer(result._2(), value_type=TILE)
ser = self.geopysc.create_tuple_serializer(result._2(), key_type="Projected", value_type=TILE)
return self.geopysc.create_python_rdd(result._1(), ser)

def reproject(self, target_crs, extent=None, layout=None, scheme=FLOAT, tile_size=256,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pytest>=3.0.6
numpy>=1.8
avro-python3>=1.8
fastavro>=0.13.0
shapely>=1.6b3
rasterio>=1.0a7
setuptools
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
long_description=open('README.rst').read(),
license='LICENSE',
install_requires=[
'avro-python3>=1.8',
'fastavro>=0.13.0',
'numpy>=1.8',
'shapely>=1.6b3'
],
Expand All @@ -23,7 +23,7 @@
'geopyspark.geotrellis',
'geopyspark.tests',
'geopyspark.tests.schema_tests',
'geopyspark.jars'
'geopyspark.jars',
],
include_package_data=True,
scripts=[],
Expand Down

0 comments on commit d5fff9f

Please sign in to comment.