Skip to content

Commit

Permalink
refactor: Python functions
Browse files Browse the repository at this point in the history
  • Loading branch information
definite committed Apr 30, 2018
1 parent 3118129 commit f45e635
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,4 +1,5 @@
*~
*.bak
*.pyc
*.swp

182 changes: 182 additions & 0 deletions ZanataFunctions.py
@@ -0,0 +1,182 @@
#!/usr/bin/env python
"""Generic Helper Function"""

import argparse
import errno
import logging
import os
import subprocess # nosec
import sys
import urllib2 # noqa: F401 # pylint: disable=import-error,unused-import
import urlparse # noqa: F401 # pylint: disable=import-error,unused-import

try:
from typing import List, Any # noqa: F401 # pylint: disable=unused-import
except ImportError:
sys.stderr.write("python typing module is not installed" + os.linesep)


SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
ZANATA_ENV_FILE = os.path.join(SCRIPT_DIR, 'zanata-env.sh')
BASH_CMD = '/bin/bash'


def logging_init(level=logging.INFO):
# type (int) -> logging.Logger
"""Initialize logging"""
logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s')
logger = logging.getLogger()
logger.setLevel(level)
return logger


def read_env(filename):
# type (str) -> dict
"""Read environment variables by sourcing a bash file"""
proc = subprocess.Popen( # nosec
[BASH_CMD, '-c',
"source %s && set -o posix && set" % (filename)],
stdout=subprocess.PIPE)
return {kv[0]: kv[1] for kv in [
s.strip().split('=', 1)
for s in proc.stdout.readlines() if '=' in s]}


ZANATA_ENV = read_env(ZANATA_ENV_FILE)


class HTTPBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
"""Handle Basic Authentication"""
def http_error_401(self, req, fp, code, msg, headers): # noqa: E501 # pylint: disable=invalid-name,unused-argument,too-many-arguments
"""retry with basic auth when facing a 401"""
host = req.get_host()
realm = None
return self.retry_http_basic_auth(host, req, realm)

def http_error_403(self, req, fp, code, msg, hdrs): # noqa: E501 # pylint: disable=invalid-name,unused-argument,too-many-arguments
"""retry with basic auth when facing a 403"""
host = req.get_host()
realm = None
return self.retry_http_basic_auth(host, req, realm)


class SshHost(object):
"""SSH/SCP helper functions"""

SCP_CMD = '/usr/bin/scp'
SSH_CMD = '/usr/bin/ssh'

@staticmethod
def create_parent_parser():
# type () -> argparse.ArgumentParser
"""Create parent parser for SSH related program"""
parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument(
'-i', '--identity-file', type=str,
help='SSH/SCP indent-files')
parent_parser.add_argument(
'host', type=str,
help='host with/without username,'
+ ' e.g. user@host.example or host.example')
parent_parser.add_argument(
'-t', '--dest-path', type=str, help='Destination path')
return parent_parser

def __init__(self, host, identity_file=None):
# type (str, str) -> None
self.host = host
self.identity_file = identity_file
if self.identity_file:
self.opt_list = ['-i', identity_file]
else:
self.opt_list = []

def run_check_call(self, command, sudo=False):
# type (str, bool) -> None
"""Run command though ssh"""
cmd_list = [SshHost.SSH_CMD]
cmd_list += self.opt_list
cmd_list += [
self.host,
('sudo ' if sudo else '') + command]
logging.info(' '.join(cmd_list))

subprocess.check_call(cmd_list) # nosec

def scp_to_remote(
self, source_path, dest_path,
sudo=False, rm_old=False):
# type (str, str, bool, bool) -> None
"""scp to remote host"""
if rm_old:
self.run_check_call(
"rm -fr %s" % dest_path, sudo)

cmd_list = [
'scp', '-p'] + self.opt_list + [
source_path,
"%s:%s" % (self.host, dest_path)]

logging.info(' '.join(cmd_list))

subprocess.check_call(cmd_list) # nosec


class UrlHelper(object):
"""URL helper functions"""
def __init__(self, base_url, user, token):
"""install the authentication handler."""
self.base_url = base_url
auth_handler = HTTPBasicAuthHandler()
auth_handler.add_password(
realm=None,
uri=self.base_url,
user=user,
passwd=token)
opener = urllib2.build_opener(auth_handler)
# install it for all urllib2.urlopen calls
urllib2.install_opener(opener)

@staticmethod
def read(url):
# type (str) -> str
"""Read URL"""
logging.info("Reading from %s", url)
return urllib2.urlopen(url).read() # nosec

@staticmethod
def download_file(url, dest_file='', dest_dir='.'):
# type (str, str, str) -> None
"""Download file"""
target_file = dest_file
if not target_file:
url_parsed = urlparse.urlparse(url)
target_file = os.path.basename(url_parsed.path)
chunk = 128 * 1024 # 128 KiB
target_dir = os.path.abspath(dest_dir)
target_path = os.path.join(target_dir, target_file)
try:
os.makedirs(target_dir)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(target_dir):
pass
else:
raise

logging.info("Downloading to %s from %s", target_path, url)
response = urllib2.urlopen(url) # nosec
chunk_count = 0
with open(target_path, 'wb') as out_file:
while True:
buf = response.read(chunk)
if not buf:
break
out_file.write(buf)
chunk_count += 1
if chunk_count % 100 == 0:
sys.stderr.write('#')
sys.stderr.flush()
elif chunk_count % 10 == 0:
sys.stderr.write('.')
sys.stderr.flush()
return response
Empty file added __init__.py
Empty file.
20 changes: 20 additions & 0 deletions py-test-all
@@ -0,0 +1,20 @@
#!/bin/bash
### NAME
### py-test-all - test all python files
###

set -eu
PY_SOURCES=*.py

## Pylint
echo "====== pylint ======" > /dev/stderr
python2 -m pylint $PY_SOURCES

## flake8
echo "====== flake8 =====" > /dev/stderr
flake8 --benchmark $PY_SOURCES

## pytest
echo "====== pytest-2 ======" > /dev/stderr
pytest-2 $PY_SOURCES

24 changes: 24 additions & 0 deletions pylintrc
@@ -0,0 +1,24 @@
[MASTER]

# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=print-statement,parameter-unpacking,unpacking-in-except,old-raise-syntax,backtick,long-suffix,old-ne-operator,old-octal-literal,import-star-module-level,raw-checker-failed,bad-inline-option,locally-disabled,locally-enabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,apply-builtin,basestring-builtin,buffer-builtin,cmp-builtin,coerce-builtin,execfile-builtin,file-builtin,long-builtin,raw_input-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,no-absolute-import,old-division,dict-iter-method,dict-view-method,next-method-called,metaclass-assignment,indexing-exception,raising-string,reload-builtin,oct-method,hex-method,nonzero-method,cmp-method,input-builtin,round-builtin,intern-builtin,unichr-builtin,map-builtin-not-iterating,zip-builtin-not-iterating,range-builtin-not-iterating,filter-builtin-not-iterating,using-cmp-argument,eq-without-hash,div-method,idiv-method,rdiv-method,exception-message-attribute,invalid-str-codec,sys-max-int,bad-python3-import,deprecated-string-function,deprecated-str-translate-call,superfluous-parens,relative-import,invalid-name

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=


[FORMAT]
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=8

33 changes: 33 additions & 0 deletions testZanataFunctions.py
@@ -0,0 +1,33 @@
#!/usr/bin/env python
"""Test the ZanataFunctions"""
import subprocess # nosec
import unittest
import ZanataFunctions


class ZanataFunctionsTestCase(unittest.TestCase):
"""Test Case for ZanataFunctions"""
def test_read_env(self):
"""Test read_env()"""
zanata_env = ZanataFunctions.read_env(
ZanataFunctions.ZANATA_ENV_FILE)
self.assertEqual(zanata_env['EXIT_OK'], '0')


class SshHostTestCase(unittest.TestCase):
"""Test SSH with localhost
thus set up password less SSH is required"""
def setUp(self):
self.ssh_host = ZanataFunctions.SshHost('localhost')

def test_run_check_call(self):
"""Test SssHost.run_check_call"""
self.ssh_host.run_check_call('true')
self.assertRaises(
subprocess.CalledProcessError,
self.ssh_host.run_check_call,
'false')


if __name__ == '__main__':
unittest.main()

0 comments on commit f45e635

Please sign in to comment.