In [None]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('datamining').setMaster('local')
sc = SparkContext(conf=conf)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
#andere imports
import matplotlib.pyplot as plt
from IPython.core.display import display, HTML
import pyspark.sql.functions as func
from pyspark.sql.functions import desc

In [None]:
movie = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('movies/movie.csv')
users = sqlContext.read.format('com.databricks.spark.csv').options(sep=";", header='true', inferschema='true').load('users/users.csv')
ratings_m = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('movies/rating.csv')
tags = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('movies/tag.csv')
books = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('books/books.csv')
ratings = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('books/ratings.csv')
book_tags = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('books/book_tags.csv')
tags_b = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('books/tags.csv')
to_read = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('books/to_read.csv')

In [None]:
display(HTML('<h1>De film dataset</h1>'))
movie.toPandas().head()

In [None]:
ratings_m.show() # om te laten zien dat toPandas().head() iets mooier oogt.

In [None]:
tags.toPandas().head()

In [None]:
display(HTML('<h1>De gebruikers dataset</h1>'))
users.toPandas().head()

In [None]:
display(HTML('<h1>De boek dataset</h1>'))

In [None]:
books.toPandas().head()

In [None]:
book_tags.toPandas().head()

In [None]:
tags_b.toPandas().head()

In [None]:
to_read.toPandas().head()

In [None]:
display(HTML('<h1>Rating data</h1>'))

In [None]:
#aantal ratings per rating
display(HTML('<h2>Frequentie van ratings'))
movie_with_rating = ratings_m.select('movieId','rating')
avgratingpermovie = movie_with_rating.groupBy('movieId').agg({"rating": "mean"})
avgratingpermovie = avgratingpermovie.withColumnRenamed("avg(rating)", "rating")
pandas_df = avgratingpermovie.toPandas()
pandas_df.hist(column='rating', figsize=(10,10))

In [None]:

display(HTML('<h1>Movie data</h1>'))

In [None]:
#meest gebruikt genres
display(HTML('<h2>Meest voorkomende genres</h2>'))
movies_p = movie.groupBy('genres').count().sort(desc("count")).toPandas()
movies_p[:20].plot(x='genres', y='count', kind='bar', figsize=(20,5))

In [None]:
#meest gebruikte tags bij films
display(HTML('<h2>Meest gebruikte tags bij films</h2>'))
tags_p = tags.groupBy('tag').count().sort(desc("count")).toPandas()
tags_p[:10].plot(x='tag', y='count', kind='bar', figsize=(20,5))

In [None]:
#avg rating per genre
display(HTML('<h2>Gemiddelde rating per genre</h2>'))
movie_with_genre = movie.select('movieId', 'genres')
movie_with_rating = ratings_m.select('movieId','rating')
avg_rating_per_movie = movie_with_rating.groupBy('movieId').agg({"rating": "mean"})
avg_rating_per_movie = avg_rating_per_movie.withColumnRenamed("avg(rating)", "rating")
rating_per_genre = movie_with_genre.join(avg_rating_per_movie, movie_with_genre.movieId == avg_rating_per_movie.movieId, 'inner').select(movie_with_genre.genres, avg_rating_per_movie.rating)
genre_rating = rating_per_genre.groupBy('genres').agg({"rating": "mean"})
genre_rating = genre_rating.withColumnRenamed("avg(rating)", "rating")
genre_rating = genre_rating.withColumn("rating", func.round(genre_rating["rating"],1))
sortedyears = genre_rating.orderBy(genre_rating.rating.desc())
genre_rating.createOrReplaceTempView("genre_rating");
genre_rating = sqlContext.sql("select * from genre_rating where genres not like '%|%'") #om het wat korter te maken want anders wordt de grafiek te groot
genre_rating =  genre_rating.orderBy(genre_rating.rating.desc())
pdf = genre_rating.toPandas()
pdf[:20].plot(x='genres', y='rating', kind='bar', figsize=(20,5))
plt.show()

In [None]:
#Meest populaire woorden in films
display(HTML('<h2>Meest gebruikte woorden in titels</h2>'))
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
movie.createOrReplaceTempView("movie"); #werkt anders niet
movie_title_words = sqlContext.sql("select title from movie")
stopwords = set(STOPWORDS)
woorden = []
for value in movie_title_words.collect():
    for word in value:
        woorden.append(word)
    
wordcloud = WordCloud(width=3000, height=3000,font_path ='C:/windows/Fonts/Arial.ttf',
                          background_color='white',
                          stopwords=stopwords,
                          max_words=500,
                          max_font_size=1000, 
                          random_state=42
                         ).generate(str(woorden))
fig = plt.figure(2)
plt.figure(figsize=(230,70) )
plt.imshow(wordcloud)
plt.axis('off')
plt.show()

In [None]:
display(HTML('<h1>User data</h1>'))

In [None]:
#users met de meeste rating
display(HTML('<h2>Users met de meeste stemmen</h2>'))
ratings_per_user = ratings_m.groupby("userId").count().sort(desc("count"))
ratings_per_user.show()

In [None]:
display(HTML('<h2>Accounts per geslacht</h2>'))
users_by_gender = users.groupby('gender').count()
users_by_gender.show()

In [None]:
display(HTML('<h2>Stemmen per geslacht</h2>'))
votes = ratings_m.join(users, ["userId"])
votes_by_gender = votes.groupby('gender').count()
votes_by_gender.show()

In [None]:
display(HTML('<h2>Leeftijds indeling</h2>'))
age_per_vote = users.groupby("age").count().sort(desc("count"))
age_per_vote.toPandas().plot(x='age', y='count', kind='bar', figsize=(20,5))


In [None]:
display(HTML('<h2>Stemverschil bij geslacht</h2>'))
votes_dif = ratings_m.join(users, ["userId"])
votes_dif = votes_dif.drop('timestamp')
test = votes.groupBy('Gender').agg({"rating": "mean"})
test.show()

In [None]:
#aantal stemmen per geslecht
display(HTML('<h2>Stem per geslacht per genre</h2>'))
import seaborn as sns
%matplotlib inline
from pyspark.sql.functions import col, size
votes = ratings_m.join(users, ["userId"])
votes = movie.join(votes, ["movieId"], "inner")
votes.createOrReplaceTempView("votes");
votes = sqlContext.sql("select * from votes where genres not like '%|%'") #om het wat korter te maken
sns.set(rc={'figure.figsize':(30,30)})
sns.countplot(y='genres', hue='Gender', data=votes.toPandas(), order = votes.toPandas()['genres'].value_counts().index)

In [None]:
#werk per geslacht
import seaborn as sns
%matplotlib inline
from ipywidgets import widgets
from IPython.display import display
dta = users.toPandas()
#sns.set_context(rc={"figure.figsize": (30, 30)})
plt.figure(figsize=(14,9))
sns.countplot(y='Gender', hue='Occupation', data=dta)

In [None]:
display(HTML('<h2>Gemiddelde per genre, per geslacht</h2>'))
import pandas as pd
ax = plt.gca()
votes = ratings_m.join(users, ["userId"])
avg_moviesRatings = ratings_m.groupby('movieId').mean()
avg_moviesRatings = avg_moviesRatings.drop('avg(movieId)')
avg_movies = votes.join(avg_moviesRatings, ["movieId"])
avg_movies = avg_movies.drop('avg(userId)')
avg_movies = avg_movies.join(movie, ["movieId"])
avg_movies.createOrReplaceTempView("avg_movies");
avg_movies = sqlContext.sql("select * from avg_movies where genres not like '%|%'") #om het wat korter te maken want anders wordt de grafiek te groot
#male rates
avg_movies_male = sqlContext.sql("select * from avg_movies where gender == 'M' ")
avg_movies_male = avg_movies_male.select('genres','avg(rating)')
avg_movies_male = avg_movies_male.withColumnRenamed("avg(rating)", "rating_m")
avg_movies_male = avg_movies_male.groupby('genres').mean()
#f rates
avg_movies_f = sqlContext.sql("select * from avg_movies where gender == 'F' ")
avg_movies_f = avg_movies_f.select('genres','avg(rating)')
avg_movies_f = avg_movies_f.withColumnRenamed("avg(rating)", "rating_f")
avg_movies_f = avg_movies_f.groupby('genres').mean()
avg_movies_both = avg_movies_f.join(avg_movies_male, ["genres"]) #samenzetten op basis van genress
#afronden
avg_movies_both.withColumn("avg(rating_f)", func.round(avg_movies_both["avg(rating_f)"],1))
avg_movies_both.withColumn("avg(rating_m)", func.round(avg_movies_both["avg(rating_m)"],1))
avg_movies_both.createOrReplaceTempView("both");
data = sqlContext.sql("select * from both where genres not like '%|%'") #om het wat korter te maken want anders wordt de grafiek te groot
#graph
avg_movies_pd = data.toPandas()
avg_movies_pd.plot(x="genres", y="avg(rating_f)", kind="bar", ax=ax, color="C1", alpha=0.8,figsize=(20,10))
avg_movies_pd.plot(x="genres", y="avg(rating_m)", kind="bar", ax=ax, color="C5", alpha=0.8,figsize=(20,10))

In [None]:
display(HTML('<h1>Verkennen van de boek dataset</h1>'))
display(HTML('<h2>Gemiddelde rating per jaar</h2>'))
bookwithyear = books.select('book_id', 'original_publication_year')
bookwithrating = ratings.select('book_id','rating')
avgratingperbook = bookwithrating.groupBy('book_id').agg({"rating": "mean"})
avgratingperbook = avgratingperbook.withColumnRenamed("avg(rating)", "rating")
ratingperyear = bookwithyear.join(avgratingperbook, bookwithyear.book_id == avgratingperbook.book_id, 'inner').select(bookwithyear.original_publication_year, avgratingperbook.rating)
filtered_df = ratingperyear.filter(ratingperyear.original_publication_year > 1900)
year_rating = filtered_df.groupBy('original_publication_year').agg({"rating": "mean"})
year_rating = year_rating.withColumnRenamed("avg(rating)", "rating")
year_rating = year_rating.withColumn("rating", func.round(year_rating["rating"],1))
sortedyears = year_rating.orderBy(year_rating.original_publication_year.asc())
pdf = sortedyears.toPandas()
pdf.plot(x="original_publication_year", y="rating", figsize=(20,5))
plt.show()

In [None]:
display(HTML('<h2>Meest aangerade auteurs</h2>'))
authorandbook = books.select('book_id', 'authors')
authors_toread = authorandbook.join(to_read, authorandbook.book_id == to_read.book_id, 'inner').select(to_read.book_id, authorandbook.authors)
bestauthors = authors_toread.groupby("authors").count()
bestauthors = bestauthors.withColumnRenamed("count", "times_recommended")
bestauthors = bestauthors.orderBy(bestauthors.times_recommended.desc())
pandasDF = bestauthors.toPandas()
pandasDF[:10].plot(x='authors', y='times_recommended', kind='bar', figsize=(20,5))

In [None]:
display(HTML('<h2>Meest aangerade boek</h2>'))
mostrecommended = to_read.groupby('book_id').count()
mostrecommended = mostrecommended.withColumnRenamed("count", "times_recommended")
mostrecommended = mostrecommended.orderBy(mostrecommended.times_recommended.desc())
bookswithtitle = books.select('book_id', 'title')
bestbook_toread = mostrecommended.join(bookswithtitle, mostrecommended.book_id == bookswithtitle.book_id, 'inner').select(mostrecommended.times_recommended, bookswithtitle.title)
pandasBook = bestbook_toread.toPandas()
pandasBook[:10].plot(x='title', y='times_recommended', kind='bar', figsize=(20,5))

In [None]:
#vervuiling
tags_b.orderBy(tags_b.tag_name.desc()).show()

In [None]:
display(HTML('<h2>Meest gebruikte boektag</h2>'))
tagwithcount = book_tags.select('tag_id', 'count')
tagwithcount = tagwithcount.withColumnRenamed("count", "times_used")
mostused_tag = tagwithcount.join(tags_b, tagwithcount.tag_id == tags_b.tag_id, 'inner').select(tagwithcount.times_used, tags_b.tag_name)
pandasTags = mostused_tag.toPandas()
pandasTags[:10].plot(x='tag_name', y='times_used', kind='bar', figsize=(20,5))

In [None]:
moviesTitles = movie.drop('genres')
moviesRatings = ratings_m.drop('userId')
moviesRatings = moviesRatings.drop('timestamp')
avg_moviesRatings = moviesRatings.groupby('movieId').mean()
avg_moviesRatings = moviesRatings.drop('avg(movieId)')
#avg_moviesRatings = avg_moviesRatings.withColumnRenamed("rating", "rating_movie")
avg_movies = moviesTitles.join(avg_moviesRatings, on='movieId', how='inner')

avg_movies = avg_movies.drop('movieId')
booksTitles = books.drop('book_id')
avg_movies_split = pyspark.sql.functions.split(avg_movies['title'], '\\(')
testSP = avg_movies
testSP = testSP.withColumn('title', avg_movies_split.getItem(0))
testBook = boksTitles.selectExpr("original_title as title", "average_rating as rating_book")
compare = testSP.join(testBook, ["title"])
compare = compare.groupBy("title").agg({"rating": "avg", "rating_book": "avg"})
compare = compare.withColumn("avg(rating)", func.round(compare["avg(rating)"],1))
compare = compare.withColumn("avg(rating_book)", func.round(compare["avg(rating_book)"],1))
#compare.show()
#graph
ax = plt.gca()
compare.toPandas()[:100].plot(x="title", y="avg(rating)", kind="bar", ax=ax, color="Green", alpha=0.8,figsize=(20,10))
compare.toPandas()[:100].plot(x="title", y="avg(rating_book)", kind="bar", ax=ax, color="Red", alpha=0.8,figsize=(20,10))
plt.show()