SAM001 - Storage Pool - Load sample data
========================================

Description
-----------

### Common functions

Define helper functions used in this notebook.

In [None]:
%%local

# Define `run` function for transient fault handling, hyperlinked suggestions, and scrolling updates on Windows
import sys
import os
import re
import platform
import shlex
import shutil
import datetime

from subprocess import Popen, PIPE
from IPython.display import Markdown

def run(cmd, return_output=False, no_output=False, error_hints=[], retry_hints=[], retry_count=0):
    """
    Run shell command, stream stdout, print stderr and optionally return output
    """
    max_retries = 5
    install_hint = None
    output = ""
    retry = False

    # shlex.split is required on bash and for Windows paths with spaces
    #
    cmd_actual = shlex.split(cmd)

    # When running python, use the python in the ADS sandbox ({sys.executable})
    #
    if cmd.startswith("python "):
        cmd_actual[0] = cmd_actual[0].replace("python", sys.executable)

        # On Mac, when ADS is not launched from terminal, LC_ALL may not be set, which causes pip installs to fail
        # with:
        #
        #       UnicodeDecodeError: 'ascii' codec can't decode byte 0xc5 in position 4969: ordinal not in range(128)
        #
        # Setting it to a default value of "en_US.UTF-8" enables pip install to complete
        #
        if platform.system() == "Darwin" and "LC_ALL" not in os.environ:
            os.environ["LC_ALL"] = "en_US.UTF-8"

        python_retry_hints, python_error_hints, install_hint = python_hints()
        retry_hints += python_retry_hints
        error_hints += python_error_hints

    if (cmd.startswith("kubectl ")):
        kubectl_retry_hints, kubectl_error_hints, install_hint = kubectl_hints()
        retry_hints += kubectl_retry_hints
        error_hints += kubectl_error_hints

    if (cmd.startswith("azdata ")):
        azdata_retry_hints, azdata_error_hints, install_hint = azdata_hints()
        retry_hints += azdata_retry_hints
        error_hints += azdata_error_hints

    # Find the path based location (shutil.which) of the executable that will be run (and display it to aid supportability), this
    # seems to be required for .msi installs of azdata.cmd/az.cmd.  (otherwise Popen returns FileNotFound) 
    #
    # NOTE: Bash needs cmd to be the list of the space separated values hence shlex.split.
    #
    which_binary = shutil.which(cmd_actual[0])

    if which_binary == None:
        if install_hint is not None:
            display(Markdown(f'SUGGEST: Use {install_hint} to resolve this issue.'))

        raise FileNotFoundError(f"Executable '{cmd_actual[0]}' not found in path (where/which)")
    else:   
        cmd_actual[0] = which_binary

    start_time = datetime.datetime.now().replace(microsecond=0)

    print(f"START: {cmd} @ {start_time} ({datetime.datetime.utcnow().replace(microsecond=0)} UTC)")
    print(f"       using: {which_binary} ({platform.system()} {platform.release()} on {platform.machine()})")
    print(f"       cwd: {os.getcwd()}")

    # Command-line tools such as CURL and AZDATA HDFS commands output
    # scrolling progress bars, which causes Jupyter to hang forever, to
    # workaround this, use no_output=True
    #
    try:
        if no_output:
            p = Popen(cmd_actual)
        else:
            p = Popen(cmd_actual, stdout=PIPE, stderr=PIPE, bufsize=1)
            with p.stdout:
                for line in iter(p.stdout.readline, b''):
                    line = line.decode()
                    if return_output:
                        output = output + line
                    else:
                        if cmd.startswith("azdata notebook run"): # Hyperlink the .ipynb file
                            regex = re.compile('  "(.*)"\: "(.*)"') 
                            match = regex.match(line)
                            if match:
                                if match.group(1).find("HTML") != -1:
                                    display(Markdown(f' - "{match.group(1)}": "{match.group(2)}"'))
                                else:
                                    display(Markdown(f' - "{match.group(1)}": "[{match.group(2)}]({match.group(2)})"'))
                        else:
                            print(line, end='')
        p.wait()
    except FileNotFoundError as e:
        if install_hint is not None:
            display(Markdown(f'SUGGEST: Use {install_hint} to resolve this issue.'))

        raise FileNotFoundError(f"Executable '{cmd_actual[0]}' not found in path (where/which)") from e

    if not no_output:
        for line in iter(p.stderr.readline, b''):
            line_decoded = line.decode()

            # azdata emits a single empty line to stderr when doing an hdfs cp, don't
            # print this empty "ERR:" as it confuses.
            #
            if line_decoded == "":
                continue
            
            print(f"ERR: {line_decoded}", end='')

            for error_hint in error_hints:
                if line_decoded.find(error_hint[0]) != -1:
                    display(Markdown(f'SUGGEST: Use [{error_hint[2]}]({error_hint[1]}) to resolve this issue.'))

            for retry_hint in retry_hints:
                if line_decoded.find(retry_hint) != -1:
                    if retry_count < max_retries:
                        print(f"RETRY: {retry_count} (due to: {retry_hint})")
                        retry_count = retry_count + 1
                        output = run(cmd, return_output=return_output, error_hints=error_hints, retry_hints=retry_hints, retry_count=retry_count)

                        if return_output:
                            return output
                        else:
                            return

    elapsed = datetime.datetime.now().replace(microsecond=0) - start_time

    if p.returncode != 0:
        raise SystemExit(f'Shell command:\n\n\t{cmd} ({elapsed}s elapsed)\n\nreturned non-zero exit code: {str(p.returncode)}.\n')

    print(f'\nSUCCESS: {elapsed}s elapsed\n')

    if return_output:
        return output

def azdata_hints():

    retry_hints = [
        "Endpoint sql-server-master does not exist",
        "Endpoint livy does not exist",
        "Failed to get state for cluster",
        "Endpoint webhdfs does not exist",
        "Adaptive Server is unavailable or does not exist",
        "Error: Address already in use",
        "Timed out getting health status after 5000 milliseconds"
    ]

    error_hints = [
    ["""azdata login""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""The token is expired""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""Reason: Unauthorized""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""Max retries exceeded with url: /api/v1/bdc/endpoints""", """../common/sop028-azdata-login.ipynb""", """SOP028 - azdata login"""],
    ["""Look at the controller logs for more details""", """../diagnose/tsg027-observe-bdc-create.ipynb""", """TSG027 - Observe cluster deployment"""],
    ["""provided port is already allocated""", """../log-files/tsg062-tail-bdc-previous-container-logs.ipynb""", """TSG062 - Get tail of all previous container logs for pods in BDC namespace"""],
    ["""Create cluster failed since the existing namespace""", """../install/sop061-delete-bdc.ipynb""", """SOP061 - Delete a big data cluster"""],
    ["""Failed to complete kube config setup""", """../repair/tsg067-failed-to-complete-kube-config-setup.ipynb""", """TSG067 - Failed to complete kube config setup"""],
    ["""Error processing command: "ApiError""", """../repair/tsg110-azdata-returns-apierror.ipynb""", """TSG110 - Azdata returns ApiError"""],
    ["""Error processing command: "ControllerError""", """../log-analyzers/tsg036-get-controller-logs.ipynb""", """TSG036 - Controller logs"""],
    ["""ERROR: 500""", """../log-analyzers/tsg046-get-knox-logs.ipynb""", """TSG046 - Knox gateway logs"""],
    ["""Data source name not found and no default driver specified""", """../install/sop069-install-odbc-driver-for-sql-server.ipynb""", """SOP069 - Install ODBC for SQL Server"""],
    ["""Can't open lib 'ODBC Driver 17 for SQL Server""", """../install/sop069-install-odbc-driver-for-sql-server.ipynb""", """SOP069 - Install ODBC for SQL Server"""]
]

    install_hint = "[SOP055 - Install azdata command line interface](../install/sop055-install-azdata.ipynb)'"

    return retry_hints, error_hints, install_hint

print('Common functions defined successfully.')

### Instantiate Kubernetes client

In [None]:
%%local

# Instantiate the Python Kubernetes client into 'api' variable

import os

try:
    from kubernetes import client, config
    from kubernetes.stream import stream

    if "KUBERNETES_SERVICE_PORT" in os.environ and "KUBERNETES_SERVICE_HOST" in os.environ:
        config.load_incluster_config()
    else:
        config.load_kube_config()

    api = client.CoreV1Api()

    print('Kubernetes client instantiated')
except ImportError:
    from IPython.display import Markdown
    display(Markdown(f'SUGGEST: Use [SOP059 - Install Kubernetes Python module](../install/sop059-install-kubernetes-module.ipynb) to resolve this issue.'))
    raise

### Get the namespace for the big data cluster

Get the namespace of the big data cluster from the Kuberenetes API.

NOTE: If there is more than one big data cluster in the target
Kubernetes cluster, then set \[0\] to the correct value for the big data
cluster.

In [None]:
%%local

# Place Kubernetes namespace name for BDC into 'namespace' variable

try:
    namespace = api.list_namespace(label_selector='MSSQL_CLUSTER').items[0].metadata.name
except IndexError:
    from IPython.display import Markdown
    display(Markdown(f'SUGGEST: Use [TSG081 - Get namespaces (Kubernetes)](../monitor-k8s/tsg081-get-kubernetes-namespaces.ipynb) to resolve this issue.'))
    display(Markdown(f'SUGGEST: Use [TSG010 - Get configuration contexts](../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb) to resolve this issue.'))
    display(Markdown(f'SUGGEST: Use [SOP011 - Set kubernetes configuration context](../common/sop011-set-kubernetes-context.ipynb) to resolve this issue.'))
    raise

print('The kubernetes namespace for your big data cluster is: ' + namespace)

### Get required user credentials

Get the credentials from the Kuberenetes secret store required to
perform the tasks below

In [None]:
%%local

import base64

controller_secret = api.read_namespaced_secret('controller-login-secret', namespace)

bdc_controller_username = base64.b64decode(controller_secret.data['username']).decode()
bdc_controller_password = base64.b64decode(controller_secret.data['password']).decode()

gateway_secret = api.read_namespaced_secret('gateway-secret', namespace)

bdc_knox_password = base64.b64decode(gateway_secret.data['knox-admin-password']).decode()

print ('Credentials retrieved')

### Tutorial

1.  To be able to get the cluster endpoints, login.

In [None]:
%%local
import os

os.environ["AZDATA_PASSWORD"] = bdc_controller_password

run(f'azdata login -n {namespace} --username {bdc_controller_username} --accept-eula yes')

os.environ["AZDATA_PASSWORD"] = ""

1.  Now we will get the cluster endopoints and we will get the HDFS
    address. This will be used for our next step when creating the .csv
    file and sending it to HDFS.

In [None]:
%%local
import json

cluster_res = run('azdata bdc endpoint list --endpoint="webhdfs"', return_output=True)
json = json.loads(cluster_res)
hdfs_addr = json['endpoint']

print(f'The HDFS address is: {hdfs_addr}')

1.  This code will upload this data into HDFS.

In [None]:
%%local
import os
import csv
import tempfile

items = [ [1,"Eldon Base for stackable storage shelf, platinum","Muhammed MacIntyre",3,-213.25,38.94,35,"Nunavut,Storage & Organization",0.8 ],
[2,"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators","Barry French",293,457.81,208.16,68.02,"Nunavut,Appliances",0.58], 
[3,"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl","Barry French",293,46.71,8.69,2.99,"Nunavut","Binders and Binder Accessories",0.39],
[4,"R380","Clay Rozendal",483,1198.97,195.99,3.99,"Nunavut","Telephones and Communication",0.58],
[5,"Holmes HEPA Air Purifier","Carlos Soltero",515,30.94,21.78,5.94,"Nunavut","Appliances",0.5],
[6,"G.E. Longer-Life Indoor Recessed Floodlight Bulbs","Carlos Soltero",515,4.43,6.64,4.95,"Nunavut","Office Furnishings",0.37],
[7,"Angle-D Binders with Locking Rings, Label Holders","Carl Jackson",613,-54.04,7.3,7.72,"Nunavut","Binders and Binder Accessories",0.38],
[8,"SAFCO Mobile Desk Side File, Wire Frame","Carl Jackson",613,127.7,42.76,6.22,"Nunavut","Storage & Organization",],
[9,"SAFCO Commercial Wire Shelving, Black","Monica Federle",643,-695.26,138.14,35,"Nunavut","Storage & Organization",],
[10,"Xerox 198","Dorothy Badders",678,-226.36,4.98,8.33,"Nunavut","Paper",0.38 ] ]

import requests
import io

url = hdfs_addr + '/clickstream_data/datasampleCS.csv?op=CREATE&overwrite=true'
output = io.StringIO()

csv.writer(output, quoting=csv.QUOTE_NONNUMERIC).writerows(items)
r = requests.put(url, allow_redirects=True, auth=('root', bdc_knox_password), data=output.getvalue().encode('utf-8'), verify=False, headers={'content-type':'application/octet-stream'})

print (f"CSV uploaded to: {url}")
print (f"CSV:\r\n{output.getvalue()}")


### Convert CSV to Parquet PYSPARK3

The following steps will allow you to convert your .csv file to parquet

In [None]:
%%local
import json

cluster_res = run('azdata bdc endpoint list --endpoint="livy"', return_output=True)
json = json.loads(cluster_res)
livy_adrss = json['endpoint']

print(f'The Livy address is: {livy_adrss}')

%_do_not_call_change_endpoint --username=root --password={bdc_knox_password} --server={livy_adrss} --auth=Basic_Access

1.  First open the .csv file and convert it to a data frame object.

In [None]:
results = spark.read.option("inferSchema", "true").csv('/clickstream_data/datasampleCS.csv').toDF("NumberID", "Name", "Name2", "Price", "Discount", "Money", "Money2", "Type", "Space")

1.  Verify the schema using the following command.

In [None]:
results.printSchema()

1.  You can now see the first 20 lines of this data using the following
    command.

In [None]:
results.show()

1.  Now let’s turn your .csv file to a parquet file following this
    commands.

In [None]:
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [None]:
results.write.mode("overwrite").parquet('/clickstream_data_parquet')

1.  You can verify the parquet file using the following commands.

In [None]:
result_parquet = spark.read.parquet('/clickstream_data_parquet')

In [None]:
result_parquet.show()

In [None]:
print('Notebook execution complete.')