In [1]:
import findspark
findspark.init()

findspark.find()

import pyspark
findspark.find()

'C:\\spark\\spark-2.4.5-bin-hadoop2.7'

In [2]:
import os
import sys

spark_path = 'C:\spark\spark-2.4.5-bin-hadoop2.7'
os.environ['SPARK_HOME']= spark_path
os.environ['HADOOP_HOME']=spark_path
sys.path.append(spark_path+'\bin')
sys.path.append(spark_path+'\python')
sys.path.append(spark_path+'\python\pyspark')
sys.path.append(spark_path+'\python\lib')
sys.path.append(spark_path+'\python\lib\pyspark.zip')
sys.path.append(spark_path+'\python\lib\py4j-0.10.7-src.zip')

In [3]:
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.conf import SparkConf

warehouse_location = abspath('C:\tmp\hive')

spark = (SparkSession
         .builder
         .appName('example-pyspark-read-and-write-from-hive')
         .config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf())
         .enableHiveSupport()
         .getOrCreate()
        )
    

In [12]:

    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    spark.sql("LOAD DATA LOCAL INPATH 'C:/spark/spark-2.4.5-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src")

    # Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show()
   

+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
|409|val_409|
|255|val_255|
|278|val_278|
| 98| val_98|
|484|val_484|
|265|val_265|
|193|val_193|
|401|val_401|
|150|val_150|
|273|val_273|
|224|val_224|
|369|val_369|
| 66| val_66|
|128|val_128|
|213|val_213|
+---+-------+
only showing top 20 rows



In [13]:
    # Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show()
   

+--------+
|count(1)|
+--------+
|     500|
+--------+



In [15]:
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
   

Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 2, Value: val_2
Key: 4, Value: val_4
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 8, Value: val_8
Key: 9, Value: val_9


In [16]:
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()


+---+------+---+------+
|key| value|key| value|
+---+------+---+------+
|  2| val_2|  2| val_2|
|  4| val_4|  4| val_4|
|  5| val_5|  5| val_5|
|  5| val_5|  5| val_5|
|  5| val_5|  5| val_5|
|  8| val_8|  8| val_8|
|  9| val_9|  9| val_9|
| 10|val_10| 10|val_10|
| 11|val_11| 11|val_11|
| 12|val_12| 12|val_12|
| 12|val_12| 12|val_12|
| 15|val_15| 15|val_15|
| 15|val_15| 15|val_15|
| 17|val_17| 17|val_17|
| 18|val_18| 18|val_18|
| 18|val_18| 18|val_18|
| 19|val_19| 19|val_19|
| 20|val_20| 20|val_20|
| 24|val_24| 24|val_24|
| 24|val_24| 24|val_24|
+---+------+---+------+
only showing top 20 rows



In [17]:
spark.stop()