In [None]:
#  do this https://datascience-enthusiast.com/Python/hivesparkpython.html

In [1]:
import os
os.listdir(os.getcwd())

['Untitled.ipynb', '.ipynb_checkpoints']

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [3]:
os.listdir(os.getcwd())

['Untitled.ipynb', '.ipynb_checkpoints']

In [4]:
os.listdir(os.getcwd())

['Untitled.ipynb', '.ipynb_checkpoints']

In [7]:
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [8]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [9]:
fncs =  spark.sql('show functions').collect()
len(fncs)

296

In [10]:
for i in fncs[100:111]:
    print(i[0])

explode
explode_outer
expm1
factorial
filter
find_in_set
first
first_value
flatten
float
floor


In [11]:
spark.sql("describe function instr").show(truncate = False)

+-----------------------------------------------------------------------------------------------------+
|function_desc                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|Function: instr                                                                                      |
|Class: org.apache.spark.sql.catalyst.expressions.StringInstr                                         |
|Usage: instr(str, substr) - Returns the (1-based) index of the first occurrence of `substr` in `str`.|
+-----------------------------------------------------------------------------------------------------+



In [12]:
spark.sql('create database movies')

DataFrame[]

In [13]:
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
|     default|
|      movies|
+------------+



In [14]:
spark.sql('use movies')

DataFrame[]

In [16]:
spark.sql('create table movies \
         (movieId int,title string,genres string) \
         row format delimited fields terminated by ","\
         stored as textfile')                                              # in textfile format

DataFrame[]

In [17]:
spark.sql("create table ratings\
           (userId int,movieId int,rating float,timestamp string)\
           stored as ORC" )                                                # in ORC format

DataFrame[]

In [18]:
spark.sql("create table genres_by_count\
           ( genres string,count int)\
           stored as AVRO" )                                               # in AVRO format

DataFrame[]

In [19]:
spark.sql("show tables").show()

+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
|  movies|genres_by_count|      false|
|  movies|         movies|      false|
|  movies|        ratings|      false|
+--------+---------------+-----------+



In [20]:
spark.sql("describe formatted ratings").show(truncate = False)

+----------------------------+--------------------------------------------------------+-------+
|col_name                    |data_type                                               |comment|
+----------------------------+--------------------------------------------------------+-------+
|userId                      |int                                                     |null   |
|movieId                     |int                                                     |null   |
|rating                      |float                                                   |null   |
|timestamp                   |string                                                  |null   |
|                            |                                                        |       |
|# Detailed Table Information|                                                        |       |
|Database                    |movies                                                  |       |
|Table                       |ratings   

In [21]:
spark.sql("load data local inpath 'data/movies.csv'\
                 overwrite into table movies")

DataFrame[]

In [22]:
from pyspark.sql.types import *

schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('rating', DoubleType()),
             StructField('timestamp', StringType())
            ])

In [24]:
ratings_df = spark.read.csv("data/ratings.csv", schema = schema, header = True)

In [25]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [26]:
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [27]:
from pyspark.sql import Row
from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

rdd = sc.textFile("data/ratings.csv")
header = rdd.first()
ratings_df2 = rdd.filter(lambda line: line != header).map(lambda line: Row(userId = int(line.split(",")[0]),
                                                                     movieId = int(line.split(",")[1]),
                                                                     rating = float(line.split(",")[2]),
                                                                     timestamp = line.split(",")[3]
                                                                    )).toDF()
            

In [28]:
rdd2 = rdd.filter(lambda line: line != header).map(lambda line:line.split(","))
ratings_df2_b =spark.createDataFrame(rdd2, schema = schema) 

In [29]:
ratings_df2.printSchema()

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- userId: long (nullable = true)



In [30]:
ratings_df2.show(5)

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|    307|   3.5|1256677221|     1|
|    481|   3.5|1256677456|     1|
|   1091|   1.5|1256677471|     1|
|   1257|   4.5|1256677460|     1|
|   1449|   4.5|1256677264|     1|
+-------+------+----------+------+
only showing top 5 rows



In [31]:
ratings_df.createOrReplaceTempView("ratings_df_table") # we can also use registerTempTable


In [32]:
spark.sql("insert into table ratings select * from ratings_df_table")

DataFrame[]

In [33]:
spark.sql("select * from movies limit 10").show(truncate = False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|null   |title                             |genres                                     |
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck

In [34]:
spark.sql("select * from ratings limit 10").show(truncate = False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |307    |3.5   |1256677221|
|1     |481    |3.5   |1256677456|
|1     |1091   |1.5   |1256677471|
|1     |1257   |4.5   |1256677460|
|1     |1449   |4.5   |1256677264|
|1     |1590   |2.5   |1256677236|
|1     |1591   |1.5   |1256677475|
|1     |2134   |4.5   |1256677464|
|1     |2478   |4.0   |1256677239|
|1     |2840   |3.0   |1256677500|
+------+-------+------+----------+



In [36]:
spark.sql("select genres, count(*) as count from movies\
          group by genres\
          having count(*) > 2 \
          order by count desc").show()

+--------------------+-----+
|              genres|count|
+--------------------+-----+
|               Drama|   18|
|         The (1995)"|   18|
|       Drama|Romance|   13|
|              Comedy|   13|
|        Comedy|Drama|   11|
|      Comedy|Romance|   10|
|         Documentary|   10|
|Comedy|Drama|Romance|    4|
|Action|Crime|Thri...|    3|
|         Crime|Drama|    3|
|Action|Sci-Fi|Thr...|    3|
+--------------------+-----+



In [37]:
spark.sql("insert into table genres_by_count \
          select genres, count(*) as count from movies\
          group by genres\
          having count(*) >= 2 \
          order by count desc")

DataFrame[]

In [38]:
spark.sql("select * from genres_by_count order by count desc limit 3").show()


+-------------+-----+
|       genres|count|
+-------------+-----+
|        Drama|   18|
|  The (1995)"|   18|
|Drama|Romance|   13|
+-------------+-----+



In [39]:
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('tag', StringType()),
             StructField('timestamp', StringType())
            ])

tags_df = spark.read.csv("data/tags.csv", schema = schema, header = True)
tags_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [40]:
tags_df.registerTempTable('tags_df_table')

In [41]:
spark.sql('show tables').show()

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
|  movies| genres_by_count|      false|
|  movies|          movies|      false|
|  movies|         ratings|      false|
|        |ratings_df_table|       true|
|        |   tags_df_table|       true|
+--------+----------------+-----------+



In [42]:
joined = spark.sql("select m.title, m.genres, r.movieId, r.userId,  r.rating, r.timestamp as ratingTimestamp, \
               t.tag, t.timestamp as tagTimestamp from ratings as r\
               inner join tags_df_table as t\
               on r.movieId = t.movieId and r.userId = t.userId \
               inner join movies as m on r.movieId = m.movieId")

In [43]:
type(joined)

pyspark.sql.dataframe.DataFrame

In [44]:
joined.select(['title','genres','rating']).show(5, truncate = False)

+-----+------+------+
|title|genres|rating|
+-----+------+------+
+-----+------+------+



In [45]:
!pwd

/home/jovyan/hive


In [46]:
!mkdir output