In [1]:
import numpy as np
import os
from collections import namedtuple
from pyspark.sql import SparkSession, Row

In [2]:
# config
NUMBER_OF_ROWS = 1000
NUMBER_OF_COLS = 8
NUMBER_BLOCKS = 2
NUMBER_PARTITIONS = NUMBER_BLOCKS

storage_platform = "file:///spark/data"


hive_table_ddl = """
CREATE EXTERNAL TABLE hive_table(
   X_1 double,
   X_2 double,
   X_3 double,
   X_4 double,
   X_5 double,
   X_6 double,
   X_7 double,
   X_8 double,
   part_id1 string,
   part_id2 string
)
STORED AS parquet"""

In [3]:
sc = spark.sparkContext
sc.setLogLevel('WARN')

In [4]:
#
# HIVE oriented operations
#
def combineData(x):
    x[0].update(x[1])
    return x[0]

def generate_some_data(seed):
    np.random.seed(seed)

    colnames = ['X_' + str(i + 1) for i in range(NUMBER_OF_COLS)]
    numerics = np.round(np.random.randn(NUMBER_OF_ROWS, NUMBER_OF_COLS), 3)
    numeric_list = numerics.tolist()
    numeric_data = [dict(zip(colnames, a_row)) for a_row in numeric_list]

    chr_list = zip([str(x) for x in np.random.choice(list('abcde'), NUMBER_OF_ROWS)],
                   [str(x) for x in np.random.choice(list('xyz'), NUMBER_OF_ROWS)])
    chrnames = ['part_id1', 'part_id2']
    chr_data = [dict(zip(chrnames, a_row)) for a_row in chr_list]

    return [Row(**kw) for kw in map(combineData, zip(numeric_data, chr_data))]

my_rdd = sc.parallelize(range(NUMBER_BLOCKS), NUMBER_PARTITIONS).flatMap(generate_some_data)
my_df = spark.createDataFrame(my_rdd)

my_df.write.parquet(os.path.join(storage_platform, 'my_data_parquet'),
                    mode='overwrite')

In [5]:
# clean old table
spark.sql("DROP TABLE IF EXISTS hive_table PURGE")

hive_sql_cmd = hive_table_ddl + " LOCATION '" + os.path.join(storage_platform,"my_data_parquet") + "'"

spark.sql(hive_sql_cmd)

answer_df = spark.sql("select * from hive_table limit 5")

assert answer_df.first().X_1 == 1.624
assert answer_df.first().X_8 == -0.761

answer_df.show()

+------+------+------+------+------+------+------+------+--------+--------+
|   X_1|   X_2|   X_3|   X_4|   X_5|   X_6|   X_7|   X_8|part_id1|part_id2|
+------+------+------+------+------+------+------+------+--------+--------+
| 1.764|   0.4| 0.979| 2.241| 1.868|-0.977|  0.95|-0.151|       b|       x|
|-0.103| 0.411| 0.144| 1.454| 0.761| 0.122| 0.444| 0.334|       b|       y|
| 1.494|-0.205| 0.313|-0.854|-2.553| 0.654| 0.864|-0.742|       d|       z|
|  2.27|-1.454| 0.046|-0.187| 1.533| 1.469| 0.155| 0.378|       b|       y|
|-0.888|-1.981|-0.348| 0.156|  1.23| 1.202|-0.387|-0.302|       b|       z|
+------+------+------+------+------+------+------+------+--------+--------+



In [6]:
spark.sql("select count(*) as row_count from hive_table").show()

+---------+
|row_count|
+---------+
|     2000|
+---------+

