Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement of the Presto Example : #271

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 72 additions & 51 deletions example/qubole_presto_api_example.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,89 @@

"""
This is the sample code used for submitting a Presto query (PrestoCommand) and getting the result back to local file.
Similar way can be followed for HiveCommand etc.
"""

import sys
import string
from ConfigParser import SafeConfigParser
import logging, sys, time
#from tempfile import NamedTemporaryFile
#from configparser import SafeConfigParser
from qds_sdk.qubole import Qubole
from qds_sdk.commands import *
import boto
import time

from qds_sdk.commands import PrestoCommand
#import boto
import pandas as pd

# Used for generating file name to download the result
def get_random_filename(size=10):
return "/tmp/result_" + str(int(time.time())) + ".tsv"
# Setting up the logger
logging.basicConfig(stream=sys.stdout,
format='[%(asctime)s] [%(filename)s] [%(levelname)s] %(message)s',
level=logging.WARN)
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

# Returning content from the file
def get_content(filename):
with open(filename, 'r') as content_file:
content = content_file.read()
return content

# Executing given query
def execute_query(query):
if query is None or query == "":
return None
cmd = PrestoCommand.create(query=query)
query_id = str(cmd.id)
print "Starting Command with id: " + query_id + "\nProgress: =>",
#######################
# PRIVATE FUNCTIONS
#######################
def _wait_for_cmd(cmd, update=True, timeout = 3600):
start_time = time.time()
while not PrestoCommand.is_done(cmd.status):
This conversation was marked as resolved.
Show resolved Hide resolved
print "\b=>",
cmd = PrestoCommand.find(cmd.id)
time.sleep(5)
print cmd.get_log()
if (time.time() - start_time) > timeout:
log.error(f"Timeout ({timeout} sec) has been reached! Aborting...")
break
if update: cmd = PrestoCommand.find(cmd.id) # Updating the Python object through HTTP
time.sleep(3)
if PrestoCommand.is_success(cmd.status):
print "\nCommand Executed: Completed successfully"
log.info("Command completed successfully")
else:
print "\nCommand Executed: Failed!!!. The status returned is: " + str(cmd.status)
log.error(f"Command failed! The status returned is: {cmd.status}")
return cmd

# Downloading the result
def get_results(command):
if command is None:
return None
filename = get_random_filename(10)
print filename
fp = open(filename, 'w')
command.get_results(fp, delim="\n")
print "Starting Result fetch with Command id: " + str(command.id) + "\nProgress: =>",
while not PrestoCommand.is_done(command.status):
print "\b=>",
time.sleep(5)
if PrestoCommand.is_success(command.status):
print "\nCommand Executed: Results fetch completed successfully"
else:
print "\nCommand Executed: Result fetch for original command " + str(command.id) + "Failed!!!. The status returned is: " + str(command.status)
fp.close()
content = get_content(filename)
def _get_results_locally(cmd):
filename = f"/tmp/result_{int(time.time())}.tsv"
log.debug(f"Fetching results locally into {filename} ...")
with open(filename, 'w') as f:
cmd.get_results(f, delim="\t", inline=True, arguments=['true'])
_wait_for_cmd(cmd, update=False)
return filename


#######################
# PUBLIC FUNCTIONS
#######################
def execute_presto_query(query, cluster_label='presto'):
assert query is not None
cmd = PrestoCommand.create(query=query, label=cluster_label)
log.debug(f"Starting Presto Command with id: {cmd.id}"),
cmd = _wait_for_cmd(cmd)
log.debug(f"Command {cmd.id} done. Logs are :"),
print(cmd.get_log())
return cmd

def get_raw_results(cmd, column_names = None):
assert cmd is not None
assert type(cmd) is PrestoCommand
with open(_get_results_locally(cmd), 'r') as content_file:
content = content_file.read()
return content

def get_dataframe(command, column_names = None, **kwargs):
assert command is not None
assert type(command) is PrestoCommand

if column_names is None and 'dtype' in kwargs.keys():
assert type(kwargs['dtype']) is dict
column_names = kwargs['dtype'].keys()

_NA_VALUES = list(pd.io.common._NA_VALUES) + ['\\N'] # The NA Values that should be considered for Presto
filename = _get_results_locally(command)

if __name__ == '__main__':
# Stting API token
Qubole.configure(api_token='YOUR-QUBOLE-API-TOKEN')
get_results(execute_query("select * from default.cities limit 100;"))
with open(filename) as f:
firstline = f.readline()
joint_column_names = '\t'.join(column_names)
if firstline.strip() == joint_column_names :
log.debug('It seems that the file already got the right column names...')
return pd.read_csv(filename, delimiter='\t', na_values=_NA_VALUES, **kwargs)
else :
log.debug('It seems that the column names are not present in the file. Adding them :')
log.debug(f"firstline='{firstline.strip()}', joint_column_names={joint_column_names}")
kwargs['header'] = None
kwargs['names'] = column_names
return pd.read_csv(filename, delimiter='\t', na_values=_NA_VALUES, **kwargs)
16 changes: 10 additions & 6 deletions qds_sdk/cloud/oracle_bmc_cloud.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from qds_sdk.cloud.cloud import Cloud
import json
import ast

class OracleBmcCloud(Cloud):
'''
Expand Down Expand Up @@ -112,11 +113,8 @@ def set_network_config(self,
self.network_config['subnet_id'] = subnet_id
self.network_config['compartment_id'] = compartment_id
self.network_config['image_id'] = image_id
if availability_domain_info_map and availability_domain_info_map.strip():
try:
self.network_config['availability_domain_info_map'] = json.loads(availability_domain_info_map.strip())
except Exception as e:
raise Exception("Invalid JSON string for availability domain info map: %s" % e.message)
if availability_domain_info_map:
self.network_config['availability_domain_info_map'] = availability_domain_info_map

def set_storage_config(self,
storage_tenant_id=None,
Expand All @@ -133,6 +131,12 @@ def set_storage_config(self,
self.storage_config['block_volume_size'] = block_volume_size

def set_cloud_config_from_arguments(self, arguments):
if arguments.availability_domain_info_map:
try:
arguments.availability_domain_info_map = ast.literal_eval(arguments.availability_domain_info_map)
assert isinstance(arguments.availability_domain_info_map, list)
except Exception as e:
raise Exception("Invalid List format for availability_domain_info_map: %s" % e.message)
self.set_cloud_config(compute_tenant_id=arguments.compute_tenant_id,
compute_user_id=arguments.compute_user_id,
compute_key_finger_print=arguments.compute_key_finger_print,
Expand Down Expand Up @@ -239,4 +243,4 @@ def create_parser(self, argparser):
dest="block_volume_size",
default=None,
help="size (in GB) of each block volume to be mounted to an instance",
type=int)
type=int)
6 changes: 3 additions & 3 deletions tests/test_clusterv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_oracle_bmc_network_config(self):
def test_oracle_bmc_network_config_az_info_map(self):
sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'ORACLE_BMC', 'cluster', 'create', '--label', 'test_label',
'--compartment-id', 'abc-compartment', '--image-id', 'abc-image', '--vcn-id', 'vcn-1',
'--availability-domain-info-map', '{"availability_domain": "AD-1", "subnet_id": "subnet-1"}']
'--availability-domain-info-map', str([{"availability_domain": "AD-1", "subnet_id": "subnet-1"}])]
Qubole.cloud = None
print_command()
Connection._api_call = Mock(return_value={})
Expand All @@ -159,8 +159,8 @@ def test_oracle_bmc_network_config_az_info_map(self):
'compartment_id': 'abc-compartment',
'image_id': 'abc-image',
'availability_domain_info_map':
{'availability_domain': 'AD-1',
'subnet_id': 'subnet-1'}}},
[{'availability_domain': 'AD-1',
'subnet_id': 'subnet-1'}]}},
'cluster_info': {'label': ['test_label']}})

def test_oracle_bmc_location_config(self):
Expand Down