Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pendulum/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
long = long
unicode = unicode
basestring = basestring
FileNotFoundError = IOError

else:

FileNotFoundError = FileNotFoundError
long = int
unicode = str
basestring = str
Expand Down
128 changes: 116 additions & 12 deletions pendulum/tz/loader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
# -*- coding: utf-8 -*-

import inspect
import os
import pytz
import inspect

from .parser import Parser
from .._compat import decode
from datetime import datetime
from struct import unpack, calcsize

from .. import _compat
from .breakdown import local_time
from .transition import Transition
from .transition_type import TransitionType


def _byte_string(s):
"""Cast a string or byte string to an ASCII byte string."""
return s.encode('US-ASCII')

_NULL = _byte_string('\0')


def _std_string(s):
"""Cast a string or byte string to an ASCII string."""
return str(s.decode('US-ASCII'))


class Loader(object):
Expand All @@ -14,17 +31,104 @@ class Loader(object):

@classmethod
def load(cls, name):
name = decode(name)
name = _compat.decode(name)
try:
with pytz.open_resource(name) as f:
return cls._load(f)
except _compat.FileNotFoundError:
raise ValueError('Unknown timezone [{}]'.format(name))

name_parts = name.lstrip('/').split('/')
@classmethod
def _load(cls, fp):
head_fmt = '>4s c 15x 6l'
head_size = calcsize(head_fmt)
(magic, fmt, ttisgmtcnt, ttisstdcnt, leapcnt, timecnt,
typecnt, charcnt) = unpack(head_fmt, fp.read(head_size))

for part in name_parts:
if part == os.path.pardir or os.path.sep in part:
raise ValueError('Bad path segment: %r' % part)
# Make sure it is a tzfile(5) file
assert magic == _byte_string('TZif'), 'Got magic %s' % repr(magic)

filepath = os.path.join(cls.path, *name_parts)
# Read out the transition times,
# localtime indices and ttinfo structures.
data_fmt = '>%(timecnt)dl %(timecnt)dB %(ttinfo)s %(charcnt)ds' % dict(
timecnt=timecnt, ttinfo='lBB' * typecnt, charcnt=charcnt)
data_size = calcsize(data_fmt)
data = unpack(data_fmt, fp.read(data_size))

if not os.path.exists(filepath):
raise ValueError('Unknown timezone [{}]'.format(name))
# make sure we unpacked the right number of values
assert len(data) == 2 * timecnt + 3 * typecnt + 1
transition_times = tuple(trans for trans in data[:timecnt])
lindexes = tuple(data[timecnt:2 * timecnt])
ttinfo_raw = data[2 * timecnt:-1]
tznames_raw = data[-1]
del data

# Process ttinfo into separate structs
transition_types = tuple()
tznames = {}
i = 0
while i < len(ttinfo_raw):
# have we looked up this timezone name yet?
tzname_offset = ttinfo_raw[i + 2]
if tzname_offset not in tznames:
nul = tznames_raw.find(_NULL, tzname_offset)
if nul < 0:
nul = len(tznames_raw)
tznames[tzname_offset] = _std_string(
tznames_raw[tzname_offset:nul])
transition_types += (
TransitionType(
ttinfo_raw[i], bool(ttinfo_raw[i + 1]),
tznames[tzname_offset]
),
)
i += 3

# Now build the timezone object
if len(transition_times) == 0:
transitions = tuple()
else:
# calculate transition info
transitions = tuple()
for i in range(len(transition_times)):
transition_type = transition_types[lindexes[i]]

if i == 0:
pre_transition_type = transition_types[lindexes[i]]
else:
pre_transition_type = transition_types[lindexes[i - 1]]

pre_time = datetime(*local_time(transition_times[i],
pre_transition_type)[:7])
time = datetime(*local_time(transition_times[i],
transition_type)[:7])
tr = Transition(
transition_times[i],
transition_type,
pre_time,
time,
pre_transition_type
)

transitions += (tr,)

# Determine the before-first-transition type
default_transition_type_index = 0
if transitions:
index = 0
if transition_types[0].is_dst:
index = transition_types.index(transitions[0].transition_type)
while index != 0 and transition_types[index].is_dst:
index -= 1

while index != len(transitions) and transition_types[index].is_dst:
index += 1

if index != len(transitions):
default_transition_type_index = index

return Parser.parse(filepath)
return (
transitions,
transition_types,
transition_types[default_transition_type_index]
)
6 changes: 3 additions & 3 deletions pendulum/tz/local_timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import contextmanager

from .timezone import Timezone
from .parser import Parser
from .loader import Loader


class LocalTimezone(object):
Expand Down Expand Up @@ -165,7 +165,7 @@ def get_tz_name_for_unix(cls):

if not os.path.exists(tzpath):
continue
return Timezone('', *Parser.parse(tzpath))
return Timezone('', *Loader.load(tzpath))

raise RuntimeError('Can not find any timezone configuration')

Expand All @@ -176,7 +176,7 @@ def _tz_from_env(tzenv):

# TZ specifies a file
if os.path.exists(tzenv):
return Timezone('', *Parser.parse(tzenv))
return Timezone('', *Loader.load(tzenv))

# TZ specifies a zoneinfo zone.
try:
Expand Down
123 changes: 0 additions & 123 deletions pendulum/tz/parser.py

This file was deleted.

6 changes: 6 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
from distutils.command.build_ext import build_ext


try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError # py2k


def get_version():
basedir = os.path.dirname(__file__)
with open(os.path.join(basedir, 'pendulum/version.py')) as f:
Expand Down
10 changes: 10 additions & 0 deletions tests/tz_tests/test_timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pendulum
from datetime import datetime
from pendulum import timezone
from pendulum.tz.loader import Loader

from .. import AbstractTestCase

Expand Down Expand Up @@ -235,3 +236,12 @@ def test_convert_accept_pendulum_instance(self):

self.assertIsInstanceOfPendulum(new)
self.assertPendulum(new, 2016, 8, 7, 14, 53, 54)


class TimezoneLoaderTest(AbstractTestCase):

def test_load_bad_timezone(self):
self.assertRaises(ValueError, Loader.load, '---NOT A TIMEZONE---')

def test_load_valid(self):
self.assertTrue(Loader.load('America/Toronto'))