# Data Lake Showcase

## Environment Preparation

In [1]:
import os

import pyspark
import pyspark.pandas
import pandas as pd

from pyspark.sql import SparkSession
from io import StringIO

from hive_metastore_client.builders import DatabaseBuilder
from hive_metastore_client import HiveMetastoreClient
from thrift_files.libraries.thrift_hive_metastore_client.ttypes import Table, FieldSchema
from hive_metastore_client.builders import (
    ColumnBuilder,
    SerDeInfoBuilder,
    StorageDescriptorBuilder,
    TableBuilder,
)

ModuleNotFoundError: No module named 'pyspark.pandas'

We use a data source in order to utilize Lynx's permissions

## Uploading a test file to the bucket

In [3]:
UPLOADED_FILE_PATH = os.path.join("showcase_data_lake", "data", "alltypes_dictionary.parquet")
SHOW_CASE_BUCKET_DIR_PATH = f"{data_source.dir}/showcase/"

s3_client = create_s3_client(org_name=data_source.organization_name)
s3_target_key = f"{SHOW_CASE_BUCKET_DIR_PATH}{os.path.basename(UPLOADED_FILE_PATH)}"

print(f"Uploading file {UPLOADED_FILE_PATH} to {s3_target_key}")
s3_client.upload_file(
    Bucket=data_source.bucket,
    Key=s3_target_key,
    Filename=UPLOADED_FILE_PATH,
)
print("File uploaded successfully.")

Uploading file showcase_data_lake/data/alltypes_dictionary.parquet to visits.csv-08109379/showcase/alltypes_dictionary.parquet
File uploaded successfully.


## Registering the file in Hive metastore

### Create Hive metastore database

Configure Hive

In [4]:
HIVE_HOST = "localhost"
HIVE_PORT = 9083
HIVE_METASTORE_DB_NAME = data_source.dataset.programmatic_name.replace("-", "_")
HIVE_METASTORE_DB_NAME

'Test_Silent_3ef9f5e2'

In [5]:
# Creates a Hive metastore database
database = DatabaseBuilder(name=HIVE_METASTORE_DB_NAME).build()
with HiveMetastoreClient(HIVE_HOST, HIVE_PORT) as hive_metastore_client:
    hive_metastore_client.create_database(database)

AlreadyExistsException: AlreadyExistsException(message='Database Test_Silent_3ef9f5e2 already exists')

### Create Hive metastore table

#### Infer schema columns

In [None]:
df = pd.read_parquet(UPLOADED_FILE_PATH)

# [ColumnMetadata(is_primary=True, name='color', primitive_data_type=<PrimitiveDataType.STRING: 'STRING'>),
#  ColumnMetadata(is_primary=False, name='value', primitive_data_type=<PrimitiveDataType.STRING: 'STRING'>),
#  ColumnMetadata(is_primary=False, name='number', primitive_data_type=<PrimitiveDataType.NUMBER: 'NUMBER'>),
#  ColumnMetadata(is_primary=False, name='bool', primitive_data_type=<PrimitiveDataType.BOOLEAN: 'BOOLEAN'>)]
inferred_metadata = infer_columns_metadata(df)
inferred_metadata

#### Create hive columns

In [None]:
hive_metastore_columns = [
#     ColumnBuilder("id", "string", "col comment").build(),
#     ColumnBuilder("client_name", "string").build(),
]

def map_primitive_data_type_to_hive_data_type(primitive_data_type: PrimitiveDataType):
    """
    Hive data types: https://cwiki.apache.org/confluence/display/hive/languagemanual+types
    
    :return: The Hive datatype
    """
    return {
        PrimitiveDataType.STRING.value: "string",
        PrimitiveDataType.NUMBER.value: "double",
        PrimitiveDataType.DATETIME.value: "timestamp",
        PrimitiveDataType.BOOLEAN.value: "boolean"
    }[primitive_data_type.value]

for column in inferred_metadata:
    hive_metastore_columns.append(
        ColumnBuilder(
            column.name,
            map_primitive_data_type_to_hive_data_type(column.primitive_data_type),
        ).build()
    )
    
hive_metastore_columns

#### Create the Hive metastore table

Example: https://github.com/quintoandar/hive-metastore-client/blob/main/examples/create_external_table.py

In [None]:
# Underscores are not allowed: https://stackoverflow.com/questions/59631666/create-hive-table-with-hyphen-in-table-name
HIVE_TABLE_NAME = data_source.programmatic_name.replace("-", "_")
HIVE_TABLE_NAME

In [None]:
# If you table has partitions create a list with the partition columns
# This list is similar to the columns list, and the year, month and day
# columns are the same.
partition_keys = [
#     ColumnBuilder("year", "string").build(),
#     ColumnBuilder("month", "string").build(),
#     ColumnBuilder("day", "string").build(),
]

serde_info = SerDeInfoBuilder(
    serialization_lib="org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
).build()

storage_descriptor = StorageDescriptorBuilder(
    columns=hive_metastore_columns,
    location=f"s3a://{data_source.bucket}/{SHOW_CASE_BUCKET_DIR_PATH}",
    input_format="org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
    output_format="org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
    serde_info=serde_info,
).build()

table = TableBuilder(
    table_name=HIVE_TABLE_NAME,
    db_name=HIVE_METASTORE_DB_NAME,
    owner="owner name",
    storage_descriptor=storage_descriptor,
    partition_keys=partition_keys,
).build()

with HiveMetastoreClient(HIVE_HOST, HIVE_PORT) as hive_metastore_client:
    print(f"Creating table {HIVE_TABLE_NAME} in database {HIVE_METASTORE_DB_NAME}")
    # Creating new table from thrift table object
    hive_metastore_client.create_external_table(table)

## Query the data

### Spark

Docs:
- https://spark.apache.org/docs/latest/configuration.html
- https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

In [None]:
import os

spark = (
    SparkSession 
        .builder
        .master("spark://localhost:7077")
        .appName("SparkHiveMetastoreTest")
        .config("spark.sql.uris", "thrift://localhost:9083")
        .config("hive.metastore.warehouse.dir", "thrift://localhost:9083")
        .enableHiveSupport()
        .getOrCreate()
)

In [None]:
spark.sql("SHOW TABLES").show()