SAM002 - Storage Pool - Query HDFS
==================================

In this tutorial, you learn how to:

-   **Create an external table pointing to HDFS data in a big data
    cluster**
-   **Join this data with high-value data in the master instance**

Prerequisites
-------------

-   **SQL Server big data tools**

    -   Azure Data Studio
    -   SQL Server 2019 extension

-   **Load sample data into your big data cluster**

### Common functions

Define helper functions used in this notebook.

In [None]:
# Define `run` function for transient fault handling, suggestions on error, and scrolling updates on Windows
import sys
import os
import re
import platform
import shlex
import shutil
import datetime

from subprocess import Popen, PIPE
from IPython.display import Markdown

def run(cmd, return_output=False, no_output=False, error_hints=[], retry_hints=[], retry_count=0):
    """
    Run shell command, stream stdout, print stderr and optionally return output
    """
    max_retries = 5
    install_hint = None
    output = ""
    retry = False

    # shlex.split is required on bash and for Windows paths with spaces
    #
    cmd_actual = shlex.split(cmd)

    # When running python, use the python in the ADS sandbox ({sys.executable})
    #
    if cmd.startswith("python "):
        cmd_actual[0] = cmd_actual[0].replace("python", sys.executable)

        # On Mac, when ADS is not launched from terminal, LC_ALL may not be set, which causes pip installs to fail
        # with:
        #
        #       UnicodeDecodeError: 'ascii' codec can't decode byte 0xc5 in position 4969: ordinal not in range(128)
        #
        # Setting it to a default value of "en_US.UTF-8" enables pip install to complete
        #
        if platform.system() == "Darwin" and "LC_ALL" not in os.environ:
            os.environ["LC_ALL"] = "en_US.UTF-8"

        python_retry_hints, python_error_hints, install_hint = python_hints()
        retry_hints += python_retry_hints
        error_hints += python_error_hints

    if (cmd.startswith("kubectl ")):
        kubectl_retry_hints, kubectl_error_hints, install_hint = kubectl_hints()
        retry_hints += kubectl_retry_hints
        error_hints += kubectl_error_hints

    if (cmd.startswith("azdata ")):
        azdata_retry_hints, azdata_error_hints, install_hint = azdata_hints()
        retry_hints += azdata_retry_hints
        error_hints += azdata_error_hints

    # Find the path based location (shutil.which) of the executable that will be run (and display it to aid supportability), this
    # seems to be required for .msi installs of azdata.cmd/az.cmd.  (otherwise Popen returns FileNotFound) 
    #
    # NOTE: Bash needs cmd to be the list of the space separated values hence shlex.split.
    #
    which_binary = shutil.which(cmd_actual[0])

    if which_binary == None:
        if install_hint is not None:
            display(Markdown(f'SUGGEST: Use {install_hint} to resolve this issue.'))

        raise FileNotFoundError(f"Executable '{cmd_actual[0]}' not found in path (where/which)")
    else:   
        cmd_actual[0] = which_binary

    start_time = datetime.datetime.now().replace(microsecond=0)

    print(f"START: {cmd} @ {start_time} ({datetime.datetime.utcnow().replace(microsecond=0)} UTC)")
    print(f"       using: {which_binary} ({platform.system()} {platform.release()} on {platform.machine()})")
    print(f"       cwd: {os.getcwd()}")

    # Command-line tools such as CURL and AZDATA HDFS commands output
    # scrolling progress bars, which causes Jupyter to hang forever, to
    # workaround this, use no_output=True
    #
    try:
        if no_output:
            p = Popen(cmd_actual)
        else:
            p = Popen(cmd_actual, stdout=PIPE, stderr=PIPE, bufsize=1)
            with p.stdout:
                for line in iter(p.stdout.readline, b''):
                    line = line.decode()
                    if return_output:
                        output = output + line
                    else:
                        if cmd.startswith("azdata notebook run"): # Hyperlink the .ipynb file
                            regex = re.compile('  "(.*)"\: "(.*)"') 
                            match = regex.match(line)
                            if match:
                                if match.group(1).find("HTML") != -1:
                                    display(Markdown(f' - "{match.group(1)}": "{match.group(2)}"'))
                                else:
                                    display(Markdown(f' - "{match.group(1)}": "[{match.group(2)}]({match.group(2)})"'))
                        else:
                            print(line, end='')
        p.wait()
    except FileNotFoundError as e:
        if install_hint is not None:
            display(Markdown(f'SUGGEST: Use {install_hint} to resolve this issue.'))

        raise FileNotFoundError(f"Executable '{cmd_actual[0]}' not found in path (where/which)") from e

    if not no_output:
        for line in iter(p.stderr.readline, b''):
            line_decoded = line.decode()

            # azdata emits a single empty line to stderr when doing an hdfs cp, don't
            # print this empty "ERR:" as it confuses.
            #
            if line_decoded == "":
                continue
            
            print(f"ERR: {line_decoded}", end='')

            for error_hint in error_hints:
                if line_decoded.find(error_hint[0]) != -1:
                    display(Markdown(f'SUGGEST: Use [{error_hint[2]}]({error_hint[1]}) to resolve this issue.'))

            for retry_hint in retry_hints:
                if line_decoded.find(retry_hint) != -1:
                    if retry_count < max_retries:
                        print(f"RETRY: {retry_count} (due to: {retry_hint})")
                        retry_count = retry_count + 1
                        output = run(cmd, return_output=return_output, error_hints=error_hints, retry_hints=retry_hints, retry_count=retry_count)

                        if return_output:
                            return output
                        else:
                            return

    elapsed = datetime.datetime.now().replace(microsecond=0) - start_time

    if p.returncode != 0:
        raise SystemExit(f'Shell command:\n\n\t{cmd} ({elapsed}s elapsed)\n\nreturned non-zero exit code: {str(p.returncode)}.\n')

    print(f'\nSUCCESS: {elapsed}s elapsed\n')

    if return_output:
        return output

def kubectl_hints():

    retry_hints = [
        "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"
    ]

    error_hints = [
    ["""no such host""", """../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb""", """TSG010 - Get configuration contexts"""],
    ["""no such host""", """../repair/tsg011-restart-sparkhistory-server.ipynb""", """TSG011 - Restart sparkhistory server"""],
    ["""No connection could be made because the target machine actively refused it""", """../repair/tsg056-kubectl-no-connection-could-be-made.ipynb""", """TSG056 - Kubectl fails with No connection could be made because the target machine actively refused it"""]
]

    install_hint = "[SOP036 - Install kubectl command line interface](../install/sop036-install-kubectl.ipynb)'"

    return retry_hints, error_hints, install_hint


def azdata_hints():

    retry_hints = [
        "Endpoint sql-server-master does not exist",
        "Endpoint livy does not exist",
        "Failed to get state for cluster",
        "Endpoint webhdfs does not exist",
        "Adaptive Server is unavailable or does not exist",
        "Error: Address already in use",
        "Timed out getting health status after 5000 milliseconds"
    ]

    error_hints = [
    ["""azdata login""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""The token is expired""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""Reason: Unauthorized""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""Max retries exceeded with url: /api/v1/bdc/endpoints""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""Look at the controller logs for more details""", """../diagnose/tsg027-observe-bdc-create.ipynb""", """TSG027 - Observe cluster deployment"""],
    ["""provided port is already allocated""", """../log-files/tsg062-tail-bdc-previous-container-logs.ipynb""", """TSG062 - Get tail of all previous container logs for pods in BDC namespace"""],
    ["""Create cluster failed since the existing namespace""", """../install/sop061-delete-bdc.ipynb""", """SOP061 - Delete a big data cluster"""],
    ["""Failed to complete kube config setup""", """../repair/tsg067-failed-to-complete-kube-config-setup.ipynb""", """TSG067 - Failed to complete kube config setup"""],
    ["""Error processing command: "ApiError""", """../repair/tsg110-azdata-returns-apierror.ipynb""", """TSG110 - Azdata returns ApiError"""],
    ["""Error processing command: "ControllerError""", """../log-analyzers/tsg036-get-controller-logs.ipynb""", """TSG036 - Controller logs"""],
    ["""ERROR: 500""", """../log-analyzers/tsg046-get-knox-logs.ipynb""", """TSG046 - Knox gateway logs"""],
    ["""Data source name not found and no default driver specified""", """../install/sop069-install-odbc-driver-for-sql-server.ipynb""", """SOP069 - Install ODBC for SQL Server"""],
    ["""Can't open lib 'ODBC Driver 17 for SQL Server""", """../install/sop069-install-odbc-driver-for-sql-server.ipynb""", """SOP069 - Install ODBC for SQL Server"""]
]

    install_hint = "[SOP055 - Install azdata command line interface](../install/sop055-install-azdata.ipynb)'"

    return retry_hints, error_hints, install_hint

print('Common functions defined successfully.')

### Instantiate Kubernetes client

In [None]:
# Instantiate the Python Kubernetes client into 'api' variable

import os

try:
    from kubernetes import client, config
    from kubernetes.stream import stream

    if "KUBERNETES_SERVICE_PORT" in os.environ and "KUBERNETES_SERVICE_HOST" in os.environ:
        config.load_incluster_config()
    else:
        config.load_kube_config()

    api = client.CoreV1Api()

    print('Kubernetes client instantiated')
except ImportError:
    from IPython.display import Markdown
    display(Markdown(f'SUGGEST: Use [SOP059 - Install Kubernetes Python module](../install/sop059-install-kubernetes-module.ipynb) to resolve this issue.'))
    raise

### Get the namespace for the big data cluster

Get the namespace of the big data cluster from the Kuberenetes API.

NOTE: If there is more than one big data cluster in the target
Kubernetes cluster, then set \[0\] to the correct value for the big data
cluster.

In [None]:
# Place Kubernetes namespace name for BDC into 'namespace' variable

try:
    namespace = api.list_namespace(label_selector='MSSQL_CLUSTER').items[0].metadata.name
except IndexError:
    from IPython.display import Markdown
    display(Markdown(f'SUGGEST: Use [TSG081 - Get namespaces (Kubernetes)](../monitor-k8s/tsg081-get-kubernetes-namespaces.ipynb) to resolve this issue.'))
    display(Markdown(f'SUGGEST: Use [TSG010 - Get configuration contexts](../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb) to resolve this issue.'))
    display(Markdown(f'SUGGEST: Use [SOP011 - Set kubernetes configuration context](../common/sop011-set-kubernetes-context.ipynb) to resolve this issue.'))
    raise

print('The kubernetes namespace for your big data cluster is: ' + namespace)

### Create an external table to HDFS

The storage pool contains web clickstream data in a .csv file stored in
HDFS. Use the following steps to define an external table that can acces
the data in that file.

1.  In Azure Data Studio, connect to the SQL Server master instance of
    the big data cluster.

2.  Double-click on the connection in the Servers window to show the
    server dashboard for the SQL Server master instance. Select **New
    Query**.

3.  Run the following T-SQL command to create your own database in the
    master instance.

In [None]:
from IPython.display import Markdown

try:
    %load_ext sql
except ModuleNotFoundError:
    display(Markdown(f'SUGGEST: Use [SOP062 - Install ipython-sql and pyodbc modules](../install/sop062-install-ipython-sql-module.ipynb) to resolve this issue.'))
    raise

import json
import base64

controller_username = run(f'kubectl get secret/controller-login-secret -n {namespace} -o jsonpath={{.data.username}}', return_output=True)
controller_username = base64.b64decode(controller_username).decode('utf-8')

controller_password = run(f'kubectl get secret/controller-login-secret -n {namespace} -o jsonpath={{.data.password}}', return_output=True)
controller_password = base64.b64decode(controller_password).decode('utf-8')

master_endpoint_details = run('azdata bdc endpoint list --endpoint="sql-server-master"', return_output=True)
json = json.loads(master_endpoint_details)
sql_master_tcp_and_port = json['endpoint']

%sql mssql+pyodbc://{controller_username}:{controller_password}@{sql_master_tcp_and_port}/master?driver=SQL+Server+Native+Client+11.0&autocommit=True

In [None]:
%%sql
-- Create the new database if it does not exist already
IF NOT EXISTS (
    SELECT [name]
        FROM sys.databases
        WHERE [name] = N'Testing'
)
CREATE DATABASE Testing

### Run the following Transact-SQL command to change context to the database you created in the master instance

In [None]:
%%sql
USE Testing

### Define the format of the .csv or Parquet file to read from HDFS

For CSV:

In [None]:
%%sql

CREATE EXTERNAL FILE FORMAT csv_file
WITH (
    FORMAT_TYPE = DELIMITEDTEXT,
    FORMAT_OPTIONS(
        FIELD_TERMINATOR = ',',
        STRING_DELIMITER = '"',
        USE_TYPE_DEFAULT = TRUE)
)

For Parquet:

In [None]:
%%sql

CREATE EXTERNAL FILE FORMAT PARQUET
    WITH (
            FORMAT_TYPE = PARQUET
         )

### Create an external data source to the storage pool if it does not already exist

In [None]:
%%sql
--DROP EXTERNAL DATA SOURCE SqlStoragePool
IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlStoragePool')
BEGIN
  CREATE EXTERNAL DATA SOURCE SqlStoragePool
  WITH (LOCATION = 'sqlhdfs://controller-svc/default')
END

### Create an external table that can read the `/clickstream_data` from the storage pool

The SQLStoragePool is accesible from the master instance of a big data
cluster.

For CSV:

In [None]:
%%sql

CREATE EXTERNAL TABLE [clickstream_data_table_csv]
("NumberID" BIGINT ,
"Name" Varchar(120) ,
"Name2" Varchar(120),
"Price" Decimal ,
"Discount" Decimal ,
"Money" Decimal,
"Money2" Decimal,
"Type" Varchar(120),
 "Space" Varchar(120))
WITH
(
    DATA_SOURCE = SqlStoragePool,
    LOCATION = '/clickstream_data',
    FILE_FORMAT = csv_file
)

For Parquet:

In [None]:
%%sql

CREATE EXTERNAL TABLE [clickstream_data_table_parquet]
("NumberID" BIGINT ,
"Name" Varchar(120) ,
"Name2" Varchar(120),
"Price" BIGINT ,
"Discount" FLOAT,
"Money" FLOAT,
"Money2" FLOAT,
"Type" Varchar(120),
"Space" Varchar(120))
WITH
(
    DATA_SOURCE = SqlStoragePool,
    LOCATION = '/clickstream_data_parquet',
    FILE_FORMAT = PARQUET
)

### Query the data

1.  Run the following query to join the HDFS data in the
    `clickstream_hdfs` external table with teh relational data in the
    local database you loaded the data in.

For CSV:

In [None]:
%%sql

select * from [clickstream_data_table_csv]

For Parquet:

In [None]:
%%sql

select * from [clickstream_data_table_parquet]