# Leveraging Hive with Spark using Python

We will see how to use Spark with Hive, particularly:

- how to create and use Hive databases
- how to create Hive tables
- how to load data to Hive tables
- how to insert data into Hive tables
- how to read data from Hive tables
- we will also see how to save data frames to any Hadoop supported file system

First we need to get a SparkSession with Hive (and AVRO) support

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json,col
from pyspark.sql.types import *
from os.path import abspath

spark = SparkSession\
        .builder\
        .appName("movielens")\
        .master("spark://spark-master:7077")\
        .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
        .config("spark.sql.warehouse.dir", "hdfs://namenode:8020/user/hive/warehouse")\
        .config("spark.executor.memory", "1g")\
        .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.0")\
        .enableHiveSupport()\
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Now, we can use Hive commands to see databases and tables. 

Let's show all the existing databases. At this point, we can also compare this to the output of `show databases` in Hue.

In [None]:
spark.sql('show databases').show()

We can see the functions in Spark.SQL using the command below. At the time of this writing, we have about 360 functions.

In [None]:
fncs =  spark.sql('show functions').collect()
len(fncs)

Let's see some of them

In [None]:
for i in fncs[150:161]:
    print(i[0])

By the way, we can see what a function is used for and what the arguments are as below.

In [None]:
spark.sql("describe function instr").show(truncate = False)

Now, let’s download the data. The data we will use is MovieLens 20M Dataset. We will use movies, ratings and tags data sets. 

> Note: In Jupyter Notebook `!` enables us to use shell commands.

In [None]:
# To download the data you would use the following commands:
# !wget -P /tmp/ http://files.grouplens.org/datasets/movielens/ml-latest.zip
# !unzip /tmp/ml-latest.zip -d /tmp
# !mv /tmp/ml-latest .

# In our case the data has been downloaded in advance, so let's see it
!ls -l ml-latest


Let's drop the `movies` database (in case it was previously created)

In [None]:
spark.sql('drop database if exists movies cascade')

Now we can create the `movies` database

In [None]:
spark.sql('create database if not exists movies')

Now if we show all the existing databases we should see the `movies` database. At this point, we can also compare this to the output of `show databases` in Hue.

In [None]:
spark.sql('show databases').show()

Now, let’s create tables: in text file format, in ORC and in AVRO format. But first, we have to make sure we are using the movies database by switching to it using the command below.

In [None]:
spark.sql('use movies')

Let's show the tables existing inside the `movies` database. Since we just created the database, it shouldn't contain any tables

In [None]:
spark.sql('show tables').show()

The movies dataset has movieId, title and genres fields. The rating dataset, on the other hand, as userId, movieID, rating and timestamp fields. Now, let’s create the tables.

In [None]:
spark.sql("drop table if exists movies")
# TEXTFILE format
spark.sql('''
create table movies
(
    movieId int,
    title string,
    genres string
)
row format delimited fields terminated by ","
stored as TEXTFILE
''')    

In [None]:
spark.sql("drop table if exists ratings")
# ORC format
spark.sql("""
create table ratings
(
    userId int,
    movieId int,
    rating float,
    timestamp string
)
stored as ORC
""")                                               

In [None]:
spark.sql("drop table if exists genres_by_count")
# AVRO format
spark.sql("""
create table genres_by_count
( 
    genres string,
    count int
)
stored as AVRO
""")                                              

Now, let’s see if the tables have been created.

In [None]:
spark.sql("show tables").show()

We see all the tables we created above.
We can get information about a table as below. If we do not include formatted or extended in the command, we see only information about the columns. But now, we see even its location, the database and other attributes.

In [None]:
spark.sql("describe formatted ratings").show(truncate = False)

Now let’s load data into the movies table. We can load data from a local file system or from any hadoop supported file system. If we are using a hadoop directory, we have to remove local from the command below. Please refer the Hive manual for details. If we are loading it just one time, we do not need to include overwrite. However, if there is possibility that we could run the code more than one time, including overwrite is important not to append the same dataset to the table again and again. Hive does not do any transformation while loading data into tables. Load operations are currently pure copy/move operations that move datafiles into locations corresponding to Hive tables. Hive does some minimal checks to make sure that the files being loaded match the target table. So, pay careful attention to your code.

In [None]:
# If we upload to HDFS (and give permissions) we can use this:
# df = spark.sql("load data inpath 'hdfs://namenode:8020/savas/movies.csv' overwrite into table movies")

# If file is local then we use this:
df = spark.sql("load data local inpath 'ml-latest/movies.csv' overwrite into table movies")


In [None]:
spark.sql("select count(*) from movies").show()

Rather than loading the data as a bulk, we can pre-process it and create a data frame and insert our data frame into the table. Let’s insert the rating data by first creating a data frame.

We can create dataframes in two ways:

1. By using the Spark SQL read function such as spark.read.csv, spark.read.json, spark.read.orc, spark.read.avro, spark.rea.parquet, etc.
2. By reading it in as an RDD and converting it to a dataframe after pre-processing it

Let’s specify schema for the ratings dataset.

In [None]:
from pyspark.sql.types import *
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('rating', DoubleType()),
             StructField('timestamp', StringType())
            ])

Now, we can read it in as dataframe using dataframe reader as below.

In [None]:
ratings_df = spark.read.csv("ml-latest/ratings.csv", schema = schema, header = True)

We can see the schema of the dataframe as:

In [None]:
ratings_df.printSchema()

We can also display the first five records from the dataframe.

In [None]:
ratings_df.show(5)

The second option to create a data frame is to read it in as RDD and change it to data frame by using the `toDF` data frame function or `createDataFrame` from `SparkSession`. Remember, we have to use the `Row` function from `pyspark.sql` to use `toDF`.

In [None]:
from pyspark.sql import Row

rdd = spark._sc.textFile("/opt/workspace/ml-latest/ratings.csv")
header = rdd.first()
ratings_df2 = rdd\
    .filter(lambda line: line != header)\
    .map(lambda line: 
            Row(
                userId = int(line.split(",")[0]),
                movieId = int(line.split(",")[1]),
                rating = float(line.split(",")[2]),
                timestamp = line.split(",")[3]
            )
).toDF()

ratings_df2.printSchema()
ratings_df2.show(5)

We can also do as below:

In [None]:
rdd2 = rdd.filter(lambda line: line != header).map(lambda line:line.split(","))
ratings_df2_b = spark.createDataFrame(rdd2, schema = schema)

We see the schema and the the first five records from ratings_df and ratings_df2 are the same.

In [None]:
ratings_df2.printSchema()
ratings_df2.show(5)

To insert a dataframe into a Hive table, we have to first create a temporary table as below.

In [None]:
ratings_df.createOrReplaceTempView("ratings_df_table")
# ratings_df2.createOrReplaceTempView("ratings_df_table")

 Now, let’s insert the data to the ratings Hive table.

In [None]:
%%time
spark.sql("insert overwrite ratings select * from ratings_df_table")

Next, let’s check if the movies and ratings Hive tables have the data.

In [None]:
spark.sql(
"""
    select 'movies' as tbl, count(*) as cnt from movies
    UNION ALL
    select 'ratings' as tbl, count(*) as cnt from ratings
"""
).show()

spark.sql("select * from movies limit 10").show(truncate = False)
spark.sql("select * from ratings limit 10").show(truncate = False)

We see that we can put our data in Hive tables by either directly loading data in a local or hadoop file system or by creating a data frame and registering the data frame as a temporary table.

We can also query data in Hive table and save it another Hive table. Let’s calculate a number of movies by genres and insert those genres which occur more than 500 times to genres_by_count AVRO Hive table we created above.

In [None]:
spark.sql("""
select 
    genres, 
    count(*) as count 
from movies
group by genres
having count(*) > 500 
order by count desc
"""
).show()

In [None]:
spark.sql(
"""
insert into table genres_by_count
select genres, count(*) as count from movies
group by genres
having count(*) >= 500
order by count desc
""")

Now, we can check if the data has been inserted to the Hive table appropriately:

In [None]:
spark.sql("select * from genres_by_count order by count desc limit 5").show()

We can also use data in Hive tables with other data frames by first registering the data frames as temporary tables.
Now, let’s create a temporary table from the tags dataset and then we will join it with movies and rating tables which are in Hive.

In [None]:
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('tag', StringType()),
             StructField('timestamp', StringType())
            ])

tags_df = spark.read.csv("/opt/workspace/ml-latest/tags.csv", schema = schema, header = True)
tags_df.printSchema()

Next, register the dataframe as temporary table.tags_df.registerTempTable('tags_df_table')

In [None]:
tags_df.registerTempTable('tags_df_table')

From the show tables Hive command below, we see that three of them are permanent but two of them are temporary tables.

In [None]:
spark.sql('show tables').show()

Now, lets’ join the three tables by using inner join. The result is a dataframe.

In [None]:
joined = spark.sql(
"""
    select 
        m.title, 
        m.genres, 
        r.movieId, 
        r.userId, 
        r.rating, 
        r.timestamp as ratingTimestamp,
        t.tag, 
        t.timestamp as tagTimestamp 
    from ratings as r 
    inner join tags_df_table as t
        on r.movieId = t.movieId and r.userId = t.userId 
    inner join movies as m 
        on r.movieId = m.movieId
""")

type(joined)


We can see the first five records as below.

In [None]:
%%time
joined.select(['title','genres','rating']).show(5, truncate = False)

We can also save our dataframe in another file system.
Let’s create a new directory and save the dataframe in csv, json, orc and parquet formats.
Let’s see two ways to do that:

In [None]:
%%time
!pwd
!rm -rf output
!mkdir output
joined.write.csv("/opt/workspace/output/joined.csv", header = True)
joined.write.json("/opt/workspace/output/joined.json")
joined.write.orc("/opt/workspace/output/joined_orc")
joined.write.parquet("/opt/workspace/output/joined_parquet" )

Now, let’s check if the data is there in the formats we specified.

In [None]:
! ls -l output

The second option to save data:

In [None]:
joined.write.format('csv').save("/opt/workspace/output/joined2.csv" , header = True)
joined.write.format('json').save("/opt/workspace/output/joined2.json" )
joined.write.format('orc').save("/opt/workspace/output/joined2_orc" )
joined.write.format('parquet').save("/opt/workspace/output/joined2_parquet" )

Now, let’s see if we have data from both options.

In [None]:
! ls output

Similarly, let’s see two ways to read the data.
First option:

In [None]:
read_csv = spark.read.csv('/opt/workspace/output/joined.csv', header = True)
read_orc = spark.read.orc('/opt/workspace/output/joined_orc')
read_parquet = spark.read.parquet('/opt/workspace/output/joined_parquet')
read_orc.printSchema()

second option:

In [None]:
read2_csv = spark.read.format('csv').load('/opt/workspace/output/joined.csv', header = True)
read2_orc = spark.read.format('orc').load('/opt/workspace/output/joined_orc')
read2_parquet = spark.read.format('parquet').load('/opt/workspace/output/joined_parquet')
read2_parquet.printSchema()

We can also write a data frame into a Hive table by using insertInto. This requires that the schema of the DataFrame is the same as the schema of the table.
Let’s see the schema of the joined dataframe and create two Hive tables: one in ORC and one in PARQUET formats to insert the dataframe into.

In [None]:
joined.printSchema()

Create ORC Hive Table:

In [None]:
spark.sql(
"""
create table joined_orc
(
    title string,
    genres string, 
    movieId int, 
    userId int,  
    rating float,
    ratingTimestamp string,
    tag string, 
    tagTimestamp string 
)
stored as ORC
"""
)

Create PARQUET Hive Table:

In [None]:
spark.sql(
"""
create table joined_parquet
(
    title string,
    genres string, 
    movieId int, 
    userId int,  
    rating float,
    ratingTimestamp string,
    tag string, 
    tagTimestamp string 
)
stored as PARQUET
"""
)

Let’s see if the tables have been created.

In [None]:
spark.sql('show tables').show()

They are there. Now, let’s insert dataframe into the tables.

In [None]:
joined.write.insertInto('joined_orc')
joined.write.insertInto('joined_parquet')

Finally, let’s check if the data has been inserted into the Hive tables.

In [None]:
spark.sql(
    'select title, genres, rating from joined_orc order by rating desc limit 5'
).show(truncate = False)

In [None]:
spark.sql(
    'select title, genres, rating  from joined_parquet order by rating desc limit 5'
).show(truncate = False)

Everything looks great!