In [0]:
from pyspark.sql import SparkSession

In [0]:
spark=SparkSession.builder.appName('movieAnalytics').master('local').config('spark.ui.port','12443').config('spark.sql.warehouse.dir','/user/itv003220/warehouse/').enableHiveSupport().getOrCreate()


spark


# Analytical Queries solved using RDD
1. What are the top 10 most viewed movies?
2. What are the distinct list of genres available?
3. How many movies for each genre?
4. How many movies are starting with numbers or letters (Example: Starting with 1/2/3../A/B/C..Z)?
5. List the latest released movies

In [0]:
sc=spark.sparkContext

In [0]:
top_10=sc.textFile('dbfs:/FileStore/tables/ratings.dat').map(lambda x:x.split('::')).map(lambda x:(x[1],1)).map(lambda x:(int(x[0]),1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False)

In [0]:
movie_title=sc.textFile('dbfs:/FileStore/tables/movies.dat').map(lambda x:x.split('::')).map(lambda x:(int(x[0]),x[1]))
all_top_movies=movie_title.join(top_10)
all_top_movies.sortBy(lambda x:x[1][1],ascending=False).take(10)


Out[28]: [(2858, ('American Beauty (1999)', 3428)),
 (260, ('Star Wars: Episode IV - A New Hope (1977)', 2991)),
 (1196, ('Star Wars: Episode V - The Empire Strikes Back (1980)', 2990)),
 (1210, ('Star Wars: Episode VI - Return of the Jedi (1983)', 2883)),
 (480, ('Jurassic Park (1993)', 2672)),
 (2028, ('Saving Private Ryan (1998)', 2653)),
 (589, ('Terminator 2: Judgment Day (1991)', 2649)),
 (2571, ('Matrix, The (1999)', 2590)),
 (1270, ('Back to the Future (1985)', 2583)),
 (593, ('Silence of the Lambs, The (1991)', 2578))]

`What are the distinct list of genres available?`

In [0]:
genres=sc.textFile('dbfs:/FileStore/tables/movies.dat').map(lambda x:x.split('::')[2].split('|')).flatMap(lambda x:x)
genre_len=len(set(genres.collect()))
set(genres.collect())

Out[13]: {'Action',
 'Adventure',
 'Animation',
 "Children's",
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western'}

`How many movies for each genre?`

In [0]:
no_of_movies_each_genre=genres.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).toDF(schema='Genre string,no int')

In [0]:
display(no_of_movies_each_genre)

Genre,no
Children's,251
Fantasy,68
Romance,471
Drama,1603
Action,503
Thriller,492
Horror,343
Sci-Fi,276
Documentary,127
Musical,114


In [0]:
import re

pattern_n=re.compile('^\d+')

def movie_name_(str):
    if len(pattern_n.findall(str))>0:
        return ('startsWithDigit',1)
    else: return ('startsWithLetter',1)

* `How many movies are starting with numbers or letters (Example: Starting with 1/2/3../A/B/C..Z)?`

In [0]:
movie_name=sc.textFile('dbfs:/FileStore/tables/movies.dat').map(lambda x:x.split('::')[1]).map(lambda x : movie_name_(x)).reduceByKey(lambda x,y:x+y).toDF()

In [0]:
display(movie_name)

_1,_2
startsWithLetter,3853
startsWithDigit,30


`List the latest released movies`

In [0]:
latest=sc.textFile('dbfs:/FileStore/tables/movies.dat').map(lambda x:x.split("::")).map(lambda x:x[1]).map(lambda x:(x,int(re.findall('\d+',x.split()[-1])[0])))
display(latest.sortBy(lambda x:x[1],ascending=False).toDF())

_1,_2
Supernova (2000),2000
Down to You (2000),2000
Isn't She Great? (2000),2000
Scream 3 (2000),2000
Gun Shy (2000),2000
"Beach, The (2000)",2000
Snow Day (2000),2000
"Tigger Movie, The (2000)",2000
Trois (2000),2000
Boiler Room (2000),2000


# Using spark sql to do the following tasks
1. Create tables for movies.dat, users.dat and ratings.dat: Saving Tables from Spark SQL
2. Find the list of the oldest released movies.
3. How many movies are released each year?
4. How many number of movies are there for each rating?
5. How many users have rated each movie?

In [0]:
spark.sql('drop database if exists movies_ cascade')
spark.sql('create database if not exists movies_ ')

Out[54]: DataFrame[]

In [0]:
spark.sql('use movies_')
spark.sql('''
create  table ratings (user_id string ,movie_id string,ratings string,time_stamp string) 
row format 
  delimited fields terminated by '::'
  stored as textfile
  

''')


spark.sql('''
create table movies (movie_id string,title STRING,genre STRING)
row format
  delimited fields terminated by "::"
  stored as textfile
  



''')

# we have user_id,gender,age,OCcupation ,zip_code

spark.sql('''
create table users (user_id string,gender STRING,age string,ouccupation string,zip_code STRING)
row format
  delimited fields terminated by "::"
stored as textfile




''')



Out[55]: DataFrame[]

In [0]:
spark.read.csv('dbfs:/FileStore/tables/movies.dat',sep='::').write.insertInto('movies')
spark.read.csv('dbfs:/FileStore/tables/users.dat',sep='::').write.insertInto('users')
spark.read.csv('dbfs:/FileStore/tables/ratings.dat',sep='::').write.insertInto('ratings')


`list of oldest released movies`

In [0]:
from pyspark.sql.functions import pandas_udf,PandasUDFType,udf,to_date,col,date_format
@udf('string')
def getting_Year(str1):
    return str1.split()[-1]

moviesM=spark.read.table('movies')
display(moviesM.select(date_format(to_date(getting_Year(col('title')),'(yyyy)'),'yyyy').alias('year'),'title').orderBy(col('year')))

year,title
,Heidi Fleiss
,Die Hard
,Navigator
,Amityville 1992
,History of the World
,Tales from the Darkside
,Kids in the Hall
,Amityville
,Police Academy 2
,Under Siege 2


How many movies are released each year?

In [0]:
display(moviesM.select(date_format(to_date(getting_Year(col('title')),'(yyyy)'),'yyyy').alias('year')).groupBy(col('year')).count())

year,count
1953.0,14
1957.0,20
1987.0,64
1956.0,18
1936.0,8
1958.0,22
1943.0,10
1972.0,22
1931.0,7
1988.0,58


In [0]:
ratings=spark.read.csv('dbfs:/FileStore/tables/ratings.dat',sep='::').toDF('user_id','movie_id','ratings','timestamp')
ratings.createOrReplaceTempView('ratings_')
display(spark.sql('select count(ratings) as no_of_ratings,ratings from ratings_ group by ratings'))

# how many movies for each ratings

no_of_ratings,ratings
261197,3
226310,5
56174,1
348971,4
107557,2


In [0]:
movies=spark.read.csv('dbfs:/FileStore/tables/movies.dat',sep='::').toDF('movie_id','title','genre')
movies.createOrReplaceTempView('movies_')
display(spark.sql('''
select b.title,a.no_of_ratings from (select movie_id,count(movie_id) as no_of_ratings from ratings_ group by movie_id) as a left join movies_ as b on
 a.movie_id=b.movie_id


'''))
# no of ratings for each movie

title,no_of_ratings
Antz (1998),645
Platoon (1986),1143
Pulp Fiction (1994),2171
"Nutty Professor, The (1963)",222
Fast Times at Ridgemont High (1982),886
Live Nude Girls (1995),54
Popeye (1980),471
Mrs. Winterbourne (1996),121
Joe's Apartment (1996),131
"NeverEnding Story II: The Next Chapter, The (1990)",177


# Spark Data Frames
1. Prepare Movies data: Extracting the Year and Genre from the Text
2. Prepare Users data: Loading a double delimited csv file
3. Prepare Ratings data: Programmatically specifying a schema for the data frame
4. Import Data from URL: Scala
5. Save table without defining DDL in Hive
6. Broadcast Variable example
7. Accumulator example

In [0]:
movies1=spark.read.csv('dbfs:/FileStore/tables/movies.dat',sep='::').toDF('movie_id','title','genre')
from pyspark.sql.functions import array
@udf
def get_array(str1):
    return str1.split('|')
display(movies1.select(date_format(to_date(getting_Year(col('title')),'(yyyy)'),'yyyy').alias('year'),col('title').alias('title'),'movie_id',get_array(col('genre')).alias('genre')))

year,title,movie_id,genre
1995.0,Toy Story (1995),1,"[Animation, Children's, Comedy]"
1995.0,Jumanji (1995),2,"[Adventure, Children's, Fantasy]"
1995.0,Grumpier Old Men (1995),3,"[Comedy, Romance]"
1995.0,Waiting to Exhale (1995),4,"[Comedy, Drama]"
1995.0,Father of the Bride Part II (1995),5,[Comedy]
1995.0,Heat (1995),6,"[Action, Crime, Thriller]"
1995.0,Sabrina (1995),7,"[Comedy, Romance]"
1995.0,Tom and Huck (1995),8,"[Adventure, Children's]"
1995.0,Sudden Death (1995),9,[Action]
1995.0,GoldenEye (1995),10,"[Action, Adventure, Thriller]"


# Prepare Users data: Loading a double delimited csv file

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
# user_id,gender,age,occupation,zip
schema=StructType([StructField('user_id',IntegerType(),True),StructField('gender',StringType(),True),StructField('age',IntegerType(),True),StructField('occupation',IntegerType(),True),StructField('zip_code',IntegerType(),True)])
users1=spark.read.csv('dbfs:/FileStore/tables/users.dat',sep='::',schema=schema)
display(users1)

user_id,gender,age,occupation,zip_code
1,F,1,10,48067.0
2,M,56,16,70072.0
3,M,25,15,55117.0
4,M,45,7,2460.0
5,M,25,20,55455.0
6,F,50,9,55117.0
7,M,35,1,6810.0
8,M,25,12,11413.0
9,M,25,17,61614.0
10,F,35,1,95370.0


# Prepare Ratings data: Programmatically specifying a schema for the data frame

In [0]:
ratings_schema=StructType([StructField('user_id',IntegerType(),True),StructField('movie_id',IntegerType(),True),StructField('rating',IntegerType(),True),StructField('timestamp',StringType(),True)])
# user_id,movie_id,rating,timestamp

In [0]:
ratings_=spark.read.csv('dbfs:/FileStore/tables/ratings.dat',sep='::',schema=ratings_schema)
# import data from url 
ratings_.createOrReplaceTempView('ratings_1')

In [0]:
# loading data fom url

In [0]:
url = "https://raw.githubusercontent.com/Thomas-George-T/Movies-Analytics-in-Spark-and-Scala/master/Movielens/users.dat"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

df = spark.read.csv("file://"+SparkFiles.get("users.dat"), header=True, inferSchema= True)

we have our ratings parquet
1. lets list all the tables and current database
2. spark.catalog.createTable(tableName='ratings_parquet',path='/user/itv003220/ratings_parquet',schema=schema,source='parquet')

In [0]:
spark.catalog.currentDatabase()

Out[86]: 'movies_'

In [0]:
spark.catalog.listTables()

Out[87]: [Table(name='movies', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ratings', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='users', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='movies_', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings_', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings_1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
spark.sql('drop table ddl_table')

spark.catalog.createTable(tableName='ddl_table',schema=ratings_schema,source='parquet')

Out[106]: DataFrame[user_id: int, movie_id: int, rating: int, timestamp: string]

In [0]:
spark.catalog.listTables()



Out[107]: [Table(name='ddl_table', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='movies', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ratings', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='users', database='movies_', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='movies_', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings_', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings_1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
spark.sql('select * from ddl_table')



Out[108]: DataFrame[user_id: int, movie_id: int, rating: int, timestamp: string]

In [0]:
ratings_.write.insertInto('ddl_table')
spark.sql('select * from ddl_table').show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|    1193|     5|978300760|
|      1|     661|     3|978302109|
|      1|     914|     3|978301968|
|      1|    3408|     4|978300275|
|      1|    2355|     5|978824291|
|      1|    1197|     3|978302268|
|      1|    1287|     5|978302039|
|      1|    2804|     5|978300719|
|      1|     594|     4|978302268|
|      1|     919|     4|978301368|
|      1|     595|     5|978824268|
|      1|     938|     4|978301752|
|      1|    2398|     4|978302281|
|      1|    2918|     4|978302124|
|      1|    1035|     5|978301753|
|      1|    2791|     4|978302188|
|      1|    2687|     3|978824268|
|      1|    2018|     4|978301777|
|      1|    3105|     5|978301713|
|      1|    2797|     4|978302039|
+-------+--------+------+---------+
only showing top 20 rows



# example of braodcast variable
# lets see the occupation mappings in read.me in ml-1m
# we will use the occuoation mapping to explain braodcast varaibles

In [0]:
dic={
     0:  "other", 
    1:  "academic/educator",
    2:  "artist",
    3:  "clerical/admin",
    4:  "college/grad student",
    5:  "customer service",
    6:  "doctor/health care",
    7:  "executive/managerial",
    8:  "farmer",
    9:  "homemaker",
    10:  "K-12 student",
    11:  "lawyer",
    12:  "programmer",
    13:  "retired",
    14:  "sales/marketing",
    15:  "scientist",
    16:  "self-employed",
    17:  "technician/engineer",
    18:  "tradesman/craftsman",
    19:  "unemployed",
    20:  "writer"
    
}

In [0]:
broad_cast=sc.broadcast(dic)

In [0]:
@udf
def get_occu(int1):
    return broad_cast.value[int1]

In [0]:
users_=spark.read.csv('dbfs:/FileStore/tables/users.dat',sep='::',schema=schema)
users_accu=users_.select(get_occu(col('occupation')).alias('actual_occupation'),col('user_id'))
users_accu.show()

+--------------------+-------+
|   actual_occupation|user_id|
+--------------------+-------+
|        K-12 student|      1|
|       self-employed|      2|
|           scientist|      3|
|executive/managerial|      4|
|              writer|      5|
|           homemaker|      6|
|   academic/educator|      7|
|          programmer|      8|
| technician/engineer|      9|
|   academic/educator|     10|
|   academic/educator|     11|
|          programmer|     12|
|   academic/educator|     13|
|               other|     14|
|executive/managerial|     15|
|               other|     16|
|   academic/educator|     17|
|      clerical/admin|     18|
|        K-12 student|     19|
|     sales/marketing|     20|
+--------------------+-------+
only showing top 20 rows



In [0]:
acc=sc.accumulator(0)
# Accumulators are shared varaibles across the cluster
# lets how mny programmers are there in ocuupation with the help of accumulator

  # gettings no of partitions in accumulator      

In [0]:
users_accu.printSchema()

root
 |-- actual_occupation: string (nullable = true)
 |-- user_id: integer (nullable = true)



In [0]:
users_accu.rdd.glom().foreach(lambda x:acc.add(1))

In [0]:
acc.value
accu_pro=sc.accumulator(0)

In [0]:
@udf
def getting_progr(str1):
    if str1=="programmer":
        accu_pro.add(1)
        return True
    else:
        return False
        
        

In [0]:
users_accu.filter(getting_progr(col('actual_occupation'))==True).show()

+-----------------+-------+
|actual_occupation|user_id|
+-----------------+-------+
|       programmer|      8|
|       programmer|     12|
|       programmer|     43|
|       programmer|     49|
|       programmer|     55|
|       programmer|     65|
|       programmer|    104|
|       programmer|    105|
|       programmer|    108|
|       programmer|    113|
|       programmer|    155|
|       programmer|    180|
|       programmer|    195|
|       programmer|    198|
|       programmer|    205|
|       programmer|    207|
|       programmer|    220|
|       programmer|    252|
|       programmer|    267|
|       programmer|    268|
+-----------------+-------+
only showing top 20 rows



In [0]:
accu_pro.value

Out[123]: 388

In [0]:
users_accu.filter(col('actual_occupation')=='programmer').count()

Out[124]: 388