In [1]:
import findspark
findspark.init()



In [2]:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
#the following 4 lines are used when running the program in cluster mode instead of locally
#con = pyspark.SparkConf().setAll([('spark.executor.memory','512m'), ('spark.executor.cores', '1'), ('spark.driver.cores','1'), ('spark.driver.memory','1g')])
#con.setMaster('spark://192.168.1.3:7077')
#spark.stop()
#spark=SparkSession.builder.config(conf=con).appName("databases2").getOrCreate()
spark=SparkSession.builder.appName("databases2").getOrCreate() #This line is commented when cluster mode is running
sc=spark.sparkContext

Exception: Java gateway process exited before sending its port number

In [None]:
spark

In [4]:
ratings=spark.read.csv('rating.csv',inferSchema=True,header=True)
ratings=ratings.withColumn('timestamp',F.to_timestamp(ratings.timestamp,'yyyy-MM-dd HH:mm:ss'))#typecasts timestamp column to a timestamp datatype
ratings.limit(5).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,2005-04-02 23:53:47
1,1,29,3.5,2005-04-02 23:31:16
2,1,32,3.5,2005-04-02 23:33:39
3,1,47,3.5,2005-04-02 23:32:07
4,1,50,3.5,2005-04-02 23:29:40


In [5]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [6]:
movies=spark.read.csv('movie.csv',inferSchema=True,header=True)
movies=movies.withColumn('year',F.regexp_extract(('title'),"(?s).*\(([^()]+)\)",1))#extracts the year from the title column
movies=movies.withColumn('year',movies['year'].cast(IntegerType())) #typecasts newly made year column to an integer type
movies=movies.withColumn('title',F.regexp_replace('title','\([^)]*\)',''))#removes parentheses
movies=movies.withColumn('title',F.regexp_replace('title','^\s+',''))#removes whitespace begining of line
movies=movies.withColumn('title',F.regexp_replace('title','\s+$',''))#removes whitespace end of line
movies=movies.withColumn('title',F.initcap('title'))#makes the entire of title column to have 1st letter of every word capital case while the rest become lower case
movies=movies.filter(movies.year.isNotNull())#drop 22 movies for the dataset as they dont follow the norm of having (year) after the title in the title collumn
#the next eleven lines are used to find out the different categories in genres column they are displayed alphabetically in col2[0]
moviesplit=movies.withColumn('genres',F.explode(F.split(('genres'),'\|')))   
movies2=movies.select('movieId','title','year',F.split('genres','\|').alias('col2')) 
movies2_sizes=movies2.select(F.size('col2').alias('col2'))
movies2_max=movies2_sizes.agg(F.max('col2'))
nb_columns=movies2_max.collect()[0][0]
moviesres=movies2.select('title','movieId','year',*[movies2['col2'][i] for i in range(nb_columns)])
moviesres=moviesres.sort('col2[0]').dropDuplicates(subset=['col2[0]'])
genres=moviesres.drop('year','title','movieId','col2[1]','col2[2]','col2[3]','col2[4]','col2[5]','col2[6]','col2[7]','col2[8]','col2[9]')
genres = genres.withColumnRenamed("col2[0]", "Allgenres")
genres.sort('Allgenres').show()

+------------------+
|         Allgenres|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



In [7]:
movies.show(10)

+-------+--------------------+--------------------+----+
|movieId|               title|              genres|year|
+-------+--------------------+--------------------+----+
|      1|           Toy Story|Adventure|Animati...|1995|
|      2|             Jumanji|Adventure|Childre...|1995|
|      3|    Grumpier Old Men|      Comedy|Romance|1995|
|      4|   Waiting To Exhale|Comedy|Drama|Romance|1995|
|      5|Father Of The Bri...|              Comedy|1995|
|      6|                Heat|Action|Crime|Thri...|1995|
|      7|             Sabrina|      Comedy|Romance|1995|
|      8|        Tom And Huck|  Adventure|Children|1995|
|      9|        Sudden Death|              Action|1995|
|     10|           Goldeneye|Action|Adventure|...|1995|
+-------+--------------------+--------------------+----+
only showing top 10 rows



In [8]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- year: integer (nullable = true)



In [9]:
tags=spark.read.csv('tag.csv',inferSchema=True,header=True)
tags=tags.withColumn('timestamp',F.to_timestamp(tags.timestamp,'yyyy-MM-dd HH:mm:ss'))#typecasts timestamp column to a timestamp datatype
tags=tags.withColumn('tag',F.regexp_replace('tag','\([^)]*\)',''))#removes parentheses
tags=tags.withColumn('tag',F.regexp_replace('tag','^\s+',''))#removes whitespace begining of line
tags=tags.withColumn('tag',F.regexp_replace('tag','\s+$',''))#removes whitespace end of line
tags=tags.withColumn('tag',F.initcap('tag'))#makes the entire of tag column to have 1st letter of every word capital case while the rest become lower case
tags=tags.filter(tags.timestamp.isNotNull()) #drop rows where timestamp collumn does not contain a timestamp
tags.limit(5).toPandas()

Unnamed: 0,userId,movieId,tag,timestamp
0,18,4141,Mark Waters,2009-04-24 18:19:40
1,65,208,Dark Hero,2013-05-10 01:41:18
2,65,353,Dark Hero,2013-05-10 01:41:19
3,65,521,Noir Thriller,2013-05-10 01:39:43
4,65,592,Dark Hero,2013-05-10 01:41:18


In [10]:
tags.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [11]:
genometags=spark.read.csv('genome_tags.csv',inferSchema=True,header=True)
genometags=genometags.withColumn('tag',F.regexp_replace('tag','\([^)]*\)',''))#removes parentheses
genometags=genometags.withColumn('tag',F.regexp_replace('tag','^\s+',''))#removes whitespace begining of line
genometags=genometags.withColumn('tag',F.regexp_replace('tag','\s+$',''))#removes whitespace end of line
genometags=genometags.withColumn('tag',F.initcap('tag'))#makes the column tag to have 1st letter of every word capital case while the rest become lower case
genometags.limit(5).toPandas()

Unnamed: 0,tagId,tag
0,1,007
1,2,007
2,3,18th Century
3,4,1920s
4,5,1930s


In [12]:
genometags.printSchema()

root
 |-- tagId: integer (nullable = true)
 |-- tag: string (nullable = true)



In [13]:
q1=movies.join(ratings,['movieId'],'inner').select('title','UserId').where(movies.title=='Jumanji').groupBy('title').count()
q1.show(1)

+-------+-----+
|  title|count|
+-------+-----+
|Jumanji|22243|
+-------+-----+



In [14]:
q2=movies.join(tags,['movieId'],'inner').select('title','tag').where(tags.tag=='Boring').dropDuplicates(subset=['title']).orderBy('title')
q2.show(5)

+--------------------+------+
|               title|   tag|
+--------------------+------+
|       101 Reykjavik|Boring|
|    12 Years A Slave|Boring|
|                1408|Boring|
|1492: Conquest Of...|Boring|
|2001: A Space Ody...|Boring|
+--------------------+------+
only showing top 5 rows



In [15]:
q3=ratings.join(tags,['userId','movieID'],'inner').select('rating','userId').where(tags.tag=='Bollywood')
q3=q3.filter(q3.rating>3).dropDuplicates(subset=['userId']).sort('userId')
q3.show(5)

+------+------+
|rating|userId|
+------+------+
|   4.0| 10573|
|   5.0| 19837|
|   4.5| 23333|
|   5.0| 25004|
|   4.5| 31338|
+------+------+
only showing top 5 rows



In [15]:
#πριν την διορθωση της εκφωνησης ειχα υποθεση πως αναφεροσασταν για τις ταινιες που βγηκαν το 2005 και οχι για τα ratings καθως δουλευει το query απλα το γραφω σε comment αντι να το σβησω
#q4=movies.join(ratings,['movieId'],'inner').select('title','rating','year').where(movies.year==2005).groupBy('title').avg('rating').sort(['avg(rating)'],ascending=False).limit(10).sort('title')
#q4.show()
q4=movies.join(ratings,['movieId'],'inner').select('title','rating','timestamp').groupBy('title').avg('rating').sort(['avg(rating)'],ascending=False).limit(10).sort('title')
q4.show()

+--------------------+-----------+
|               title|avg(rating)|
+--------------------+-----------+
|                1971|        5.0|
|           Abendland|        5.0|
|Codes Of Gender, The|        5.0|
|            Giorgino|        5.0|
|Into The Middle O...|        5.0|
|     Latin Music Usa|        5.0|
|        Rocaterrania|        5.0|
| The Floating Castle|        5.0|
|Victor And The Se...|        5.0|
|Year Zero: The Si...|        5.0|
+--------------------+-----------+



In [17]:
q5=movies.join(tags,['movieId'],'inner').select('title','tag','year').where(movies.year==2015).sort('title').dropDuplicates(subset=['title'])
q5.show(5)
#σε περιπτωση που στην εκφωνηση ζηταγατε για τα timestamps που εγιναν το 2015 και οχι τις ταινιες που βγηκαν το 2015 το κατω query ειναι το σωστο
#q5=movies.join(tags,['movieId'],'inner').select('title','tag','year','timestamp').where(F.year('timestamp')==2015).sort('title').dropDuplicates(subset=['title'])
#q5.limit(5).toPandas()

+-------------------+----------------+----+
|              title|             tag|year|
+-------------------+----------------+----+
|   A Grain Of Truth|   Borys Lankosz|2015|
|A Walk In The Woods|      Ken Kwapis|2015|
|       Advantageous|  Jennifer Phang|2015|
|As We Were Dreaming| Based On A Book|2015|
|    Average Italian|Marcello Macchia|2015|
+-------------------+----------------+----+
only showing top 5 rows



In [18]:
q6=movies.join(ratings,['movieId'],'inner').select('title','rating').groupBy('title').count().sort('count',ascending=False)
q6.show(5)

+--------------------+-----+
|               title|count|
+--------------------+-----+
|        Pulp Fiction|67310|
|        Forrest Gump|66172|
|Shawshank Redempt...|63366|
|Silence Of The La...|63299|
|       Jurassic Park|59715|
+--------------------+-----+
only showing top 5 rows



In [19]:
q795=ratings.select('userId','rating').where(F.year('timestamp')==1995).groupBy('userId').count().sort('count',ascending=False).show(10)
#q796=ratings.select('userId','rating').where(F.year('timestamp')==1996).groupBy('userId').count().sort('count',ascending=False).show(10)
#q797=ratings.select('userId','rating').where(F.year('timestamp')==1997).groupBy('userId').count().sort('count',ascending=False).show(10)
#q798=ratings.select('userId','rating').where(F.year('timestamp')==1998).groupBy('userId').count().sort('count',ascending=False).show(10)
#q799=ratings.select('userId','rating').where(F.year('timestamp')==1999).groupBy('userId').count().sort('count',ascending=False).show(10)
#q700=ratings.select('userId','rating').where(F.year('timestamp')==2000).groupBy('userId').count().sort('count',ascending=False).show(10)
#q701=ratings.select('userId','rating').where(F.year('timestamp')==2001).groupBy('userId').count().sort('count',ascending=False).show(10)
#q702=ratings.select('userId','rating').where(F.year('timestamp')==2002).groupBy('userId').count().sort('count',ascending=False).show(10)
#q703=ratings.select('userId','rating').where(F.year('timestamp')==2003).groupBy('userId').count().sort('count',ascending=False).show(10)
#q704=ratings.select('userId','rating').where(F.year('timestamp')==2004).groupBy('userId').count().sort('count',ascending=False).show(10)
#q705=ratings.select('userId','rating').where(F.year('timestamp')==2005).groupBy('userId').count().sort('count',ascending=False).show(10)
#q706=ratings.select('userId','rating').where(F.year('timestamp')==2006).groupBy('userId').count().sort('count',ascending=False).show(10)
#q707=ratings.select('userId','rating').where(F.year('timestamp')==2007).groupBy('userId').count().sort('count',ascending=False).show(10)
#q708=ratings.select('userId','rating').where(F.year('timestamp')==2008).groupBy('userId').count().sort('count',ascending=False).show(10)
#q709=ratings.select('userId','rating').where(F.year('timestamp')==2009).groupBy('userId').count().sort('count',ascending=False).show(10)
#q710=ratings.select('userId','rating').where(F.year('timestamp')==2010).groupBy('userId').count().sort('count',ascending=False).show(10)
#q711=ratings.select('userId','rating').where(F.year('timestamp')==2011).groupBy('userId').count().sort('count',ascending=False).show(10)
#q712=ratings.select('userId','rating').where(F.year('timestamp')==2012).groupBy('userId').count().sort('count',ascending=False).show(10)
#q713=ratings.select('userId','rating').where(F.year('timestamp')==2013).groupBy('userId').count().sort('count',ascending=False).show(10)
#q714=ratings.select('userId','rating').where(F.year('timestamp')==2014).groupBy('userId').count().sort('count',ascending=False).show(10)
#q715=ratings.select('userId','rating').where(F.year('timestamp')==2015).groupBy('userId').count().sort('count',ascending=False).show(10)

+------+-----+
|userId|count|
+------+-----+
|131160|    3|
| 28507|    1|
+------+-----+



In [20]:
q8=movies.join(ratings,['movieId'],'inner').select('title','rating','genres').withColumn('genres',F.explode(F.split(('genres'),'\|')))
q8=q8.filter(q8.genres!='(no genres listed)')
q8=q8.groupBy('genres','title').count().orderBy(F.asc('genres'),F.desc('count')).dropDuplicates(subset=['genres']).sort('genres')
q8.show(5)

+---------+-------------+-----+
|   genres|        title|count|
+---------+-------------+-----+
|   Action|Jurassic Park|59715|
|Adventure|Jurassic Park|59715|
|Animation|    Toy Story|49695|
| Children|    Toy Story|49695|
|   Comedy| Pulp Fiction|67310|
+---------+-------------+-----+
only showing top 5 rows



In [21]:
q9=ratings.select(F.year('timestamp').alias('year'),F.month('timestamp').alias('month'),F.dayofmonth('timestamp').alias('day'),F.hour('timestamp').alias('hour'),'movieID').groupBy('year','month','day','hour','movieID').count()
q9=q9.filter('count>1')
q9=q9.agg(F.sum('count').alias('Users'))
q9.show()

+-------+
|  Users|
+-------+
|4281178|
+-------+



In [22]:
temp=movies.join(tags,['movieId'],'inner').select('userId','title','genres','movieID').where(tags.tag=='Funny')
q10=temp.join(ratings,['userId','movieID'],'inner').select('title','genres').where(ratings.rating>3.5).withColumn('genres',F.explode(F.split(('genres'),'\|')))
q10.groupBy('genres').count().sort('genres').show(5)

+---------+-----+
|   genres|count|
+---------+-----+
|   Action|  363|
|Adventure|  401|
|Animation|  253|
| Children|  255|
|   Comedy| 1329|
+---------+-----+
only showing top 5 rows

