In [6]:
import gcp
import gcp._context
import gcp._util
import json
import os
import uuid
import time
import socket

class spark_sql_client:
  _V1_BETA1_ENDPOINT = 'https://dataproc.googleapis.com/v1beta1'
  _CLUSTERS_PATH = '/projects/{project_id}/clusters/{cluster_name}'
  _JOB_PATH = '/projects/{project_id}/jobs'
  _JOBS_PATH = _JOB_PATH + '/{job_id}'
  
  _V1_ENDPOINT = 'https://www.googleapis.com/compute/v1'
  _INSTANCES_PATH = '/projects/{project_id}/zones/{zone}/instances/{instance}'
  _PORT = 12345
  
  def __init__(self, context):
    #TODO: move cluster_name to constructor
    self.is_connected = False
    self._credentials = context.credentials
    self._project_id = context.project_id
    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self._master_node_ip = None
    self._job_id = None

  def get_cluster(self, cluster_name):
    url = spark_sql_client._V1_BETA1_ENDPOINT + spark_sql_client._CLUSTERS_PATH.format(project_id=self._project_id, cluster_name=cluster_name)
    return gcp._util.Http.request(url, '', credentials=self._credentials)  

  def get_compute_instance(self, instance_name, zone):
    url = spark_sql_client._V1_ENDPOINT + spark_sql_client._INSTANCES_PATH.format(project_id=self._project_id, zone=zone, instance = instance_name)
    return gcp._util.Http.request(url, '', credentials=self._credentials)  
    
  def get_cluster_master_node_ip(self, cluster_name):
    cluster = self.get_cluster(cluster_name)
    
    if cluster == None:
      return None
    
    zone_uri = cluster['configuration']['gceClusterConfiguration']['zoneUri']
    zone = zone_uri[zone_uri.find('/zones/') + len('/zones/'):]
    
    compute_instance = self.get_compute_instance(cluster_name + '-m', zone)
    if 'networkInterfaces' in compute_instance and len(compute_instance['networkInterfaces']) > 0:
      network_interface = compute_instance['networkInterfaces'][0]
      if 'accessConfigs' in network_interface and len(network_interface['accessConfigs']) > 0:
        accessConfig = network_interface['accessConfigs'][0]
        return accessConfig['natIP'] # TODO(alekseyv): figure out how to find correct IP if there is more than one accessConfig
    return None
  
  def get_user_name(self):
    # TODO: not sure if it'll work when deployed to cloud, figure out proper way to find user.
    user_name = os.environ['DATALAB_USER']
    if user_name is None:
      return user_name
    else:
      index = user_name.find('@')
      if index > 0:
        return user_name[:index]
      
  def jobs_list(self):
    url = spark_sql_client._V1_BETA1_ENDPOINT + '/projects/' + self._project_id + '/jobs/'
    return gcp._util.Http.request(url, '', credentials=self._credentials)
    
  def cancel_job(self, job_id):
    url = spark_sql_client._V1_BETA1_ENDPOINT + spark_sql_client._JOBS_PATH.format(project_id = self._project_id, job_id=job_id) + ":cancel"
    print(url)
    return gcp._util._http.Http.request(url, method='POST', credentials=self._credentials, raw_response=True)
    
  def get_spark_sql_proxy_job_name_prefix(self):
    return self.get_user_name() + "-spark-sql-proxy-" + str(spark_sql_client._PORT) + "-"
  
  def submit_spark_job(self, cluster_name):
    job_id = self.get_spark_sql_proxy_job_name_prefix() + str(uuid.uuid4())
    data = {
      "projectId": self._project_id,
      "job": {
        "placement": {
          "clusterName": cluster_name
        },
        "reference": {
          "jobId": job_id,
          "projectId": self._project_id
        },
        "sparkJob": {
          "args" : [str(spark_sql_client._PORT)],
          "jarFileUris": ["gs://alekseyv-test/SparkSqlProxyServer.jar"],
          "mainClass": "SparkSqlProxyServer"
        }
      }
    }
    url = spark_sql_client._V1_BETA1_ENDPOINT + spark_sql_client._JOB_PATH.format(project_id = self._project_id) + ":submit"
    gcp._util._http.Http.request(url, data=data, credentials=self._credentials, raw_response=True)
    return job_id
  
  def get_job(self, job_id):
    url = spark_sql_client._V1_BETA1_ENDPOINT + spark_sql_client._JOBS_PATH.format(project_id=self._project_id, job_id=job_id)
    return gcp._util.Http.request(url, '', credentials=self._credentials)
  
  def try_get_running_spark_sql_proxy_job(self):
    jobs = self.jobs_list()
    if not('jobs' in jobs):
      return None
    running_spark_sql_proxy_jobs = filter(lambda job:\
                                          job['reference']['jobId'].startswith(self.get_spark_sql_proxy_job_name_prefix()) and \
                                          job['status']['state'] == 'RUNNING', jobs['jobs'])
    if len(running_spark_sql_proxy_jobs) == 0:
      return None
    if len(running_spark_sql_proxy_jobs) > 1:
      print('WARNING: there are more than one spark_sql_proxy_jobs, returning the first one')
    return running_spark_sql_proxy_jobs[0]
    
  def wait_for_job_to_start(self, job_id):
    _POLLING_INTERVAL_SECONDS = 10
    _MAX_RETRY_COUNT = 6
    retry_count = 0
    while retry_count < _MAX_RETRY_COUNT:
      job = self.get_job(job_id)
      if job is None:
        return False
      job_state = job['status']['state']
      if job_state == 'RUNNING':# or job_state == 'SETUP_DONE': # it looks like SETUP_DONE state is not correct, check state/status
        return True
      if job_state == 'ERROR' or job_state == 'CANCELLED' or job_state == 'DONE':
        return False
      time.sleep(_POLLING_INTERVAL_SECONDS)
      print("waiting for job {job_id} to start, current state {job_state}, retry {retry}".format(job_id = job_id, job_state = job_state, retry = retry_count))
      retry_count += 1
    return False
    
  def connect(self, cluster_name):
    if self.is_connected:
      return True
    
    if self._master_node_ip is None:
      self._master_node_ip = self.get_cluster_master_node_ip(cluster_name)
      if self._master_node_ip is None:
        return False
      if self._job_id is None:
        running_job = self.try_get_running_spark_sql_proxy_job()
        if running_job is not None:
          self._job_id = running_job['reference']['jobId']
          print('found running spark_sql_proxy_job, jobId:' + self._job_id)
        else:
          print('starting new spark_sql_proxy_job')
          self._job_id = self.submit_spark_job(cluster_name)
          print('started new spark_sql_proxy_job, jobId:' + self._job_id)
          if self._job_id is None:
            return False
          if not self.wait_for_job_to_start(self._job_id):
            print('failed to start new spark_sql_proxy_job, jobId:' + self._job_id)
            return False
          print('setup for spark_sql_proxy_job is complete, jobId:' + self._job_id)
    print('connecting to spark_sql_proxy_job socket')
    time.sleep(20) # TODO: it is not clear how much time it takes for job to initialize, try to connect to socket by polling
    self._socket.connect((self._master_node_ip, spark_sql_client._PORT))
    self.is_connected = True
    print('connected to spark_sql_proxy_job socket')
    return True
  
  def disconnect_and_stop_spark_sql_proxy_job(self):
    if not self.is_connected or self._socket is None:
      return
    self._socket.close()
    if self._job_id is None:
      return
    self.cancel_job(self._job_id)
    
  def send(self, msg):
      totalsent = 0
      while totalsent < len(msg):
          sent = self._socket.send(msg[totalsent:])
          if sent == 0:
              raise RuntimeError("socket connection broken")
          totalsent = totalsent + sent

  def readlines(self, recv_buffer=4096, delim='\n'):
    buffer = ''
    data = True
    while data:
      data = self._socket.recv(recv_buffer)
      buffer += data

      while buffer.find(delim) != -1:
          line, buffer = buffer.split('\n', 1)
          yield line
      return

  def execute_statement(self, statement):
    if not self.is_connected:
      return
    if not statement.endswith('\n'):
      statement += '\n'
    self.send(statement)
    for line in client.readlines():
      print(line)
  
    
client = spark_sql_client(gcp.Context.default())
client.connect('cluster-1')


starting new spark_sql_proxy_job
started new spark_sql_proxy_job, jobId:alekseyv-spark-sql-proxy-12345-40acdddf-ff3e-4890-b587-6d809a9d1660
waiting for job alekseyv-spark-sql-proxy-12345-40acdddf-ff3e-4890-b587-6d809a9d1660 to start, current state SETUP_DONE, retry 0
setup for spark_sql_proxy_job is complete, jobId:alekseyv-spark-sql-proxy-12345-40acdddf-ff3e-4890-b587-6d809a9d1660
connecting to spark_sql_proxy_job socket
connected to spark_sql_proxy_job socket


True

In [7]:

client.execute_statement("create temporary table people_json3 using org.apache.spark.sql.json options (path 'gs://alekseyv-test/people.json')")
client.execute_statement("select * from people_json3")


*end*
[null,Michael]
[30,Andy]
[19,Justin]
*end*


In [4]:
client.disconnect_and_stop_spark_sql_proxy_job()

https://dataproc.googleapis.com/v1beta1/projects/datalab-spark/jobs/alekseyv-spark-sql-proxy-12345-abddf2f0-c40c-4fb3-b848-436808712c72:cancel
