In [17]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [18]:
import os
os.listdir(os.getcwd())

['.config',
 'ml-latest.zip',
 'ml-latest',
 'derby.log',
 'spark-warehouse',
 'metastore_db',
 'sample_data']

In [19]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.enableHiveSupport().getOrCreate()

In [20]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
|   movies|
+---------+



In [21]:
spark.sql('show tables').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   movies|   movies|      false|
|   movies|  ratings|      false|
+---------+---------+-----------+



In [22]:
fncs =  spark.sql('show functions').collect()
len(fncs)

388

In [23]:
for i in fncs[100:111]:
    print(i[0])

current_date
current_timestamp
current_timezone
current_user
date
date_add
date_format
date_from_unix_date
date_part
date_sub
date_trunc


In [24]:
spark.sql("describe function instr").show(truncate = False)

+-----------------------------------------------------------------------------------------------------+
|function_desc                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|Function: instr                                                                                      |
|Class: org.apache.spark.sql.catalyst.expressions.StringInstr                                         |
|Usage: instr(str, substr) - Returns the (1-based) index of the first occurrence of `substr` in `str`.|
+-----------------------------------------------------------------------------------------------------+



In [26]:
spark.sql('create database movies1')

DataFrame[]

In [27]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
|   movies|
|  movies1|
+---------+



In [28]:
! wget http://files.grouplens.org/datasets/movielens/ml-latest.zip


--2022-12-01 14:58:10--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘ml-latest.zip.1’


2022-12-01 14:58:13 (74.7 MB/s) - ‘ml-latest.zip.1’ saved [277113433/277113433]



In [29]:
!unzip /content/ml-latest.zip

Archive:  /content/ml-latest.zip
replace ml-latest/links.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [32]:
spark.sql('use movies1')

DataFrame[]

In [33]:
spark.sql('create table movies \
         (movieId int,title string,genres string) \
         row format delimited fields terminated by ","\
         stored as textfile')                                              # in textfile format

spark.sql("create table ratings\
           (userId int,movieId int,rating float,timestamp string)\
           stored as ORC" )  

spark.sql("create table genres_by_count\
           ( genres string,count int)\
           stored as AVRO" )                                               # in AVRO format                                              # in ORC format

DataFrame[]

In [37]:
spark.sql('show tables').show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|  movies1|genres_by_count|      false|
|  movies1|         movies|      false|
|  movies1|        ratings|      false|
+---------+---------------+-----------+



In [38]:
spark.sql("describe formatted ratings").show(truncate = False)

+----------------------------+------------------------------------------------+-------+
|col_name                    |data_type                                       |comment|
+----------------------------+------------------------------------------------+-------+
|userId                      |int                                             |null   |
|movieId                     |int                                             |null   |
|rating                      |float                                           |null   |
|timestamp                   |string                                          |null   |
|                            |                                                |       |
|# Detailed Table Information|                                                |       |
|Database                    |movies1                                         |       |
|Table                       |ratings                                         |       |
|Owner                       |ro

In [42]:
spark.sql("load data local inpath '/content/ml-latest.zip' overwrite into table movies")

DataFrame[]

In [40]:
from pyspark.sql.types import *
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('rating', DoubleType()),
             StructField('timestamp', StringType())
            ])

In [43]:
ratings_df = spark.read.csv("/content/ml-latest/ratings.csv", schema = schema, header = True)

In [44]:
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [45]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [46]:
spark.sql("load data local inpath '/content/ml-latest/movies.csv' overwrite into table movies")

DataFrame[]

In [47]:
spark.sql("select * from movies limit 10").show(truncate = False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|null   |title                             |genres                                     |
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck

In [48]:
spark.sql("select genres, count(*) as count from movies\
          group by genres\
          having count(*) > 500 \
          order by count desc").show()

+--------------------+-----+
|              genres|count|
+--------------------+-----+
|               Drama| 7069|
|              Comedy| 4735|
|  (no genres listed)| 4135|
|         Documentary| 3777|
|        Comedy|Drama| 1879|
|       Drama|Romance| 1754|
|      Comedy|Romance| 1323|
|              Horror| 1308|
|Comedy|Drama|Romance|  856|
|      Drama|Thriller|  736|
|         Crime|Drama|  734|
|            Thriller|  732|
|     Horror|Thriller|  692|
|           Animation|  595|
|           Drama|War|  519|
+--------------------+-----+



In [49]:
spark.sql("insert into table genres_by_count \
          select genres, count(*) as count from movies\
          group by genres\
          having count(*) >= 500 \
          order by count desc")

DataFrame[]

In [50]:
spark.sql("select * from genres_by_count order by count desc limit 3").show()

+------------------+-----+
|            genres|count|
+------------------+-----+
|             Drama| 7069|
|            Comedy| 4735|
|(no genres listed)| 4135|
+------------------+-----+



In [51]:
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('tag', StringType()),
             StructField('timestamp', StringType())
            ])

tags_df = spark.read.csv("/content/ml-latest/tags.csv", schema = schema, header = True)
tags_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [52]:
tags_df.registerTempTable('tags_df_table')



In [53]:
spark.sql('show tables').show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|  movies1|genres_by_count|      false|
|  movies1|         movies|      false|
|  movies1|        ratings|      false|
|         |  tags_df_table|       true|
+---------+---------------+-----------+

