使用pyspark写hive
本质上还是写hbase，但是pyspark直接访问hbase较为麻烦，因此不如使用hive外链表
需要准备这两个jar放到spark的jars中：
spark-hive_2.12-2.4.5.jar：
https://search.maven.org/artifact/org.apache.spark/spark-hive_2.12/2.4.5/jar
hive-exec-1.2.1.spark2.jar:
https://search.maven.org/artifact/org.spark-project.hive/hive-exec/1.2.1.spark2/jar
然后将hive和hbase目录下的lib文件夹内的所有包都复制到spark的jars中


In [3]:
import  findspark
findspark.init()
from pyspark.sql import  SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster("yarn")
conf.setAppName("hive_test")
#conf.set("spark.yarn.jars", "/libs/spark/*.jar")
ss = SparkSession.builder\
    .config(conf=conf)\
    .enableHiveSupport().getOrCreate()
ss

21/12/03 22:42:27 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
# 创建数据库
ss.sql("create database hive_test").show()

21/12/01 18:41:29 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
21/12/01 18:41:29 WARN metastore.ObjectStore: Failed to get database hive_test, returning NoSuchObjectException


++
||
++
++



In [21]:
# 创建表
ss.sql("create table hive_test.test(id varchar(20),value varchar(20)) row format delimited fields terminated by ',' ")
ss.sql("desc formatted hive_test.test").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|              string|   null|
|               value|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|           hive_test|       |
|               Table|                test|       |
|               Owner|                zzti|       |
|        Created Time|Wed Dec 01 19:05:...|       |
|         Last Access|Thu Jan 01 08:00:...|       |
|          Created By|         Spark 2.4.5|       |
|                Type|             MANAGED|       |
|            Provider|                hive|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|file:/data/zzti/p...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
|        Out

21/12/01 19:05:13 WARN metastore.HiveMetaStore: Location: file:/data/zzti/proj/spark-warehouse/hive_test.db/test specified for non-external table:test


In [24]:
# 添加数据
ss.createDataFrame(zip(range(00,70),range(20,100)),['id','value']).write\
    .format('hive').insertInto("hive_test.test",overwrite=False)


In [26]:
# 查询数据
ss.sql("select * from hive_test.test").show()
ss.sql("select count(id) from hive_test.test").show()

+---+-----+
| id|value|
+---+-----+
| 50|   20|
| 51|   21|
| 52|   22|
| 53|   23|
| 54|   24|
| 55|   25|
| 56|   26|
| 57|   27|
| 58|   28|
| 59|   29|
| 60|   30|
| 61|   31|
| 62|   32|
| 63|   33|
| 64|   34|
| 65|   35|
| 66|   36|
| 67|   37|
| 68|   38|
| 69|   39|
+---+-----+
only showing top 20 rows

+---------+
|count(id)|
+---------+
|       90|
+---------+



In [8]:
# 使用happyhbase连接hbase，需要提前启动hbase的thrift服务
import happybase
conn = happybase.Connection('192.168.17.150', autoconnect=True)
conn.open()
conn.client

<thriftpy2.thrift.TClient at 0x7f607f059890>

In [12]:
# 创建HBase外接表
conn.create_table("hive_test_hbase", {'d': dict(max_versions=1, block_cache_enabled=False)})

In [1]:
# 创建hive外接HBase的表，外接表需要使用pyhive来创建
from pyhive import hive
cursor = hive.connect("localhost").cursor()
hive_test_hbase = """
 create external table if not exists hive_test.hive_test_hbase
    (id string, a string, b string, c string)
 stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
 with serdeproperties ("hbase.columns.mapping"=":key,d:a,d:b,d:c")
 tblproperties("hbase.table.name" = "hive_test_hbase")
"""
print(hive_test_hbase)
cursor.execute(hive_test_hbase)


 create external table if not exists hive_test.hive_test_hbase
    (id string, a string, b string, c string)
 stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
 with serdeproperties ("hbase.columns.mapping"="d:a,d:b,d:c")
 tblproperties("hbase.table.name" = "hive_test_hbase")



In [15]:
# 尝试向HBase写入数据
import random
table = conn.table("hive_test_hbase")
print(table.families())
for id in range(20):
    table.put(row=str(id),
              data={"d:a":str(random.randint(1,100)),
                    "d:b":str(random.randint(100,200)),
                    "d:c":str(random.randint(200,300))},
              timestamp=1,
              wal=False)

{b'd': {'name': b'd:', 'max_versions': 1, 'compression': b'NONE', 'in_memory': False, 'bloom_filter_type': b'NONE', 'bloom_filter_vector_size': 0, 'bloom_filter_nb_hashes': 0, 'block_cache_enabled': False, 'time_to_live': 2147483647}}


In [16]:
# 使用hbase读数据
for row,val in table.scan():
    val_str = ",".join([ (c.decode()+": "+val[c].decode()) for c in val ])
    print(str(row,encoding="utf8"),val_str)

0 d:a: 100,d:b: 188,d:c: 298
1 d:a: 13,d:b: 166,d:c: 250
10 d:a: 37,d:b: 190,d:c: 255
11 d:a: 66,d:b: 162,d:c: 219
12 d:a: 6,d:b: 191,d:c: 290
13 d:a: 40,d:b: 126,d:c: 239
14 d:a: 33,d:b: 194,d:c: 257
15 d:a: 74,d:b: 145,d:c: 226
16 d:a: 84,d:b: 133,d:c: 275
17 d:a: 41,d:b: 166,d:c: 214
18 d:a: 94,d:b: 172,d:c: 249
19 d:a: 29,d:b: 150,d:c: 223
2 d:a: 90,d:b: 119,d:c: 228
3 d:a: 62,d:b: 181,d:c: 224
4 d:a: 45,d:b: 153,d:c: 253
5 d:a: 43,d:b: 182,d:c: 277
6 d:a: 89,d:b: 146,d:c: 243
7 d:a: 3,d:b: 190,d:c: 268
8 d:a: 77,d:b: 121,d:c: 297
9 d:a: 90,d:b: 115,d:c: 286


In [17]:
# 使用hive读数据
cursor.execute("select * from hive_test.hive_test_hbase")
for row in cursor.fetchall():
    print(row)

('0', '100', '188', '298')
('1', '13', '166', '250')
('10', '37', '190', '255')
('11', '66', '162', '219')
('12', '6', '191', '290')
('13', '40', '126', '239')
('14', '33', '194', '257')
('15', '74', '145', '226')
('16', '84', '133', '275')
('17', '41', '166', '214')
('18', '94', '172', '249')
('19', '29', '150', '223')
('2', '90', '119', '228')
('3', '62', '181', '224')
('4', '45', '153', '253')
('5', '43', '182', '277')
('6', '89', '146', '243')
('7', '3', '190', '268')
('8', '77', '121', '297')
('9', '90', '115', '286')


In [4]:
# 用spark读这个表的数据
ss.sql("select * from hive_test.hive_test_hbase").show()

21/12/03 22:43:06 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
21/12/03 22:43:08 WARN mapreduce.TableInputFormatBase: You are using an HTable instance that relies on an HBase-managed Connection. This is usually due to directly creating an HTable, which is deprecated. Instead, you should create a Connection object and then request a Table instance from it. If you don't need the Table instance for your own use, you should instead use the TableInputFormatBase.initalizeTable method directly.
[Stage 0:>                                                          (0 + 1) / 1]

+---+---+---+---+
| id|  a|  b|  c|
+---+---+---+---+
|  0|100|188|298|
|  1| 13|166|250|
| 10| 37|190|255|
| 11| 66|162|219|
| 12|  6|191|290|
| 13| 40|126|239|
| 14| 33|194|257|
| 15| 74|145|226|
| 16| 84|133|275|
| 17| 41|166|214|
| 18| 94|172|249|
| 19| 29|150|223|
|  2| 90|119|228|
|  3| 62|181|224|
|  4| 45|153|253|
|  5| 43|182|277|
|  6| 89|146|243|
|  7|  3|190|268|
|  8| 77|121|297|
|  9| 90|115|286|
+---+---+---+---+



                                                                                

In [None]:
cursor.close()

In [22]:
conn.close()

In [6]:
ss.stop()