Skip to content
This repository was archived by the owner on Nov 21, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions moztelemetry/heka_message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@
import ujson as json
import ssl

from telemetry.util.heka_message import unpack, BacktrackableFile
from telemetry.util.heka_message import unpack


def parse_heka_message(message, boundary_bytes=None):
def parse_heka_message(message):
try:
message = BacktrackableFile(message)

for record, total_bytes in unpack(message, backtrack=True):
for record, total_bytes in unpack(message):
yield _parse_heka_record(record)

if boundary_bytes and (total_bytes >= boundary_bytes):
message.close()
break

except ssl.SSLError:
pass # https://github.com/boto/boto/issues/2830

Expand Down
38 changes: 2 additions & 36 deletions moztelemetry/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,8 @@ def get_records(sc, source_name, **kwargs):
else:
sample = files

# TODO: Make sure that "bucket_name" matches the v4 bucket name, otherwise
# introduce a "bucket" parameter to _read_v4_ranges
parallelism = max(len(sample), sc.defaultParallelism)
ranges = sc.parallelize(sample, parallelism).flatMap(_read_v4_ranges).collect()

if len(ranges) == 0:
return sc.parallelize([])
else:
return sc.parallelize(ranges, len(ranges)).flatMap(_read_v4_range)
return sc.parallelize(sample, parallelism).flatMap(_read_v4)


def _get_data_sources():
Expand Down Expand Up @@ -362,12 +355,7 @@ def _get_pings_v4(sc, **kwargs):
else:
parallelism = len(sample)

ranges = sc.parallelize(sample, parallelism).flatMap(_read_v4_ranges).collect()

if len(ranges) == 0:
return sc.parallelize([])
else:
return sc.parallelize(ranges, parallelism).flatMap(_read_v4_range)
return sc.parallelize(sample, parallelism).flatMap(_read_v4)


def _get_filenames_v2(**kwargs):
Expand Down Expand Up @@ -431,28 +419,6 @@ def _read_v4(filename):
return []


def _read_v4_ranges(filename):
try:
key = _bucket_v4.get_key(filename)
if key is None:
return []
n_chunks = (key.size / _chunk_size) + 1
return zip([filename]*n_chunks, range(n_chunks))
except ssl.SSLError:
return []


def _read_v4_range(filename_chunk):
try:
filename, chunk = filename_chunk
start = _chunk_size*chunk
key = _bucket_v4.get_key(filename)
key.open_read(headers={'Range': "bytes={}-".format(start)})
return parse_heka_message(key, boundary_bytes=_chunk_size)
except ssl.SSLError:
return []


def _get_ping_properties(ping, paths, only_median, with_processes,
histograms_url, additional_histograms):
result = {}
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def run(self):

setup(cmdclass={'install': FetchExternal},
name='python_moztelemetry',
version='0.3.9.7',
version='0.3.9.8',
author='Roberto Agostino Vitillo',
author_email='rvitillo@mozilla.com',
description='Spark bindings for Mozilla Telemetry',
Expand Down