# BDCC project 1 

_[Big Data and Cloud Computing](http://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), DCC/FCUP_


## Code necessary to run from the command line 

In [3]:
if __name__ == "__main__" :
    # This block is required to run the program from the command line
    # in interface with a single Spark instance
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    spark = SparkSession\
        .builder\
        .appName("BDCCp1")\
        .master("local[*]")\
        .getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

## Provided code - auxilliary functions

__You should not need to edit these.__

#### loadMovieLensData

In [4]:
from pyspark.sql import functions as F

def readCSV(file, debug=False):
    if debug:
      print('Reading ' + file)
    return spark.read.csv(file, inferSchema=True, header=True)

def readParquet(file, debug=False): 
    if debug:
       print('Reading ' + file)
    return spark.read.parquet(file)

def loadMovieLensData(path, format='parquet', debug=False):
    if format == 'parquet':
       movies = readParquet(path +'/movies.parquet', debug)
       ratings = readParquet(path +'/ratings.parquet', debug)
       tags = readParquet(path +'/tags.parquet', debug)
    else:
       movies = readCSV(path +'/movies.csv', debug)
       ratings = readCSV(path +'/ratings.csv', debug)
       tags = readCSV(path +'/tags.csv', debug)
    
    tags = tags.withColumn('tagl', F.explode(F.split(F.lower(F.col('tag')),'[ \*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+')))\
            .drop('tag')\
            .withColumnRenamed('tagl','tag')
    if (debug):
        print('> movies')
        movies.printSchema()
        movies.show()
        print('> ratings')
        ratings.printSchema()
        ratings.show()
        print('> tags')
        tags.printSchema()
        tags.show()
    return (movies, ratings, tags)

#### writeCSV / writeParquet (use them to write a data frame to CSV or Parquet format)

In [5]:
def writeCSV(df, path): 
    df.write.csv(path, header=True, mode='overwrite')

def writeParquet(df,path):
    df.write.parquet(path, mode='overwrite')


#### createTagListDF

In [6]:
def createTagListDF(csvTagList):
    return spark.createDataFrame([ (t,) for t in csvTagList.split(' ')], ['tag'])

#### Definition of functions available only in Spark 2.4 (GCP Spark instances run Spark 2.3) 

In [7]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,IntegerType

# Define F.array_intersect if not defined (Spark version < 2.4)
if not hasattr(F,'array_intersect'):
  F.array_intersect = spark.udf\
    .register('array_intersect', 
       lambda x,y: list(set(x) & set(y)), ArrayType(IntegerType()))

# Define F.array_union if not defined (Spark version < 2.4)
if not hasattr(F,'array_union'):
  F.array_union = spark.udf\
    .register('array_union', 
       lambda x,y: list(set(x) | set(y)), ArrayType(IntegerType()))

## Functions to define 

__This is the section that will be evaluated.__

__Include your code for the various functions required in the assigment below.__

__You may include other auxilliary functions required for computation here
but NOT test code (see below).__



#### tfidfTags

In [14]:
from pyspark.sql import functions as F
    
def getTF(tags, debug=False):
    # TODO
    movie_tags = tags\
                .groupBy('tag','movieId')\
                .agg(F.count('movieId').alias('f'))
    if debug:
        movie_tags.orderBy('tag', 'movieId',ascending=False).show()
    
    movie_tags_max = movie_tags\
                 .groupBy('movieId')\
                 .agg(F.max('f').alias('f_max'))
    if debug:
        movie_tags_max.orderBy('movieId',ascending=False).show()
    
    TF = movie_tags.join(movie_tags_max, 'movieId')\
             .withColumn('TF', F.col('f') / F.col('f_max'))    
    return TF

def getIDF(tags, debug=False):
    n_movies = tags\
           .groupBy('tag')\
           .agg(F.countDistinct('movieId').alias('n'))
    if debug:
        n_movies.orderBy('tag',ascending=False).show()
        
    size_of_N = tags.select('movieId').distinct().count()
    if debug:
        print("|N| = %d" % size_of_N)
        
    IDF = n_movies\
            .withColumn('IDF', F.log2(size_of_N / F.col('n')))\
            
    return IDF
    
def tfidfTags(tags, debug=False):
    TF = getTF(tags, debug)
    if debug:
        TF.orderBy(['tag','TF'],ascending=[1,0]).show(TF.count())
    
    IDF = getIDF(tags, debug)
    if debug:
        IDF.orderBy(['IDF','n'], ascending=[0,1]).show(IDF.count())

    TF_IDF = TF\
      .join(IDF,'tag')\
      .withColumn('TF_IDF',F.col('TF') * F.col('IDF'))
        
    if debug:
        TF_IDF.orderBy(['tag','TF_IDF','movieId'],ascending=[1,0,1]).show(TF_IDF.count())
        
    return TF_IDF


#### recommendByTag

In [67]:
from pyspark.sql import functions as F

def recommendByTag(singleTag, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    # TODO
    
    df = TFIDF_tags.join(movies, 'movieId')  
    if debug:
        df.show()
    
    df_b = df.filter(df['tag'] == singleTag)\
        .filter(df.f_max>=min_fmax)\
        .select('movieId','title','TF_IDF')\
        .orderBy('TF_IDF','title',ascending=[0,1])\
        .limit(numberOfResults)
  
    return df_b


#### recommendByTags

In [45]:
from pyspark.sql import functions as F
from functools import reduce
from pyspark.sql import DataFrame

#usado para juntar os resultados num só
def unionAll(*dfs):
    return reduce(DataFrame.unionAll,dfs)

def recommendByTags(searchTags, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    searchTagsDF = createTagListDF(searchTags)
    tags = searchTags.split()
    if debug:
        print('> Search tags DF: ' + searchTags)
        searchTagsDF.show()
    # TODO
    df = TFIDF_tags.join(movies, 'movieId')  
    c=0
    for x in tags:
        df_b = df.filter(df['tag'] == x)\
            .filter(df.f_max>=min_fmax)\
            .select('movieId','title','TF_IDF')\
            .orderBy(['TF_IDF','title'],ascending=[0,1])
        
        if c==0: #se é o primeiro
            df_f=df_b
        else:
            df_f= unionAll(df_f,df_b)
        c=c+1
        
    df_f = df_f\
           .groupBy('movieId','title')\
           .agg(F.sum('TF_IDF').alias('SUM_TF_IDF'))\
           .orderBy(['SUM_TF_IDF','title'],ascending=[0,1])\
           .limit(numberOfResults)
    
    if debug:
        df_f.show()
    
    return df_f
        


+-------+--------------------+------------------+
|movieId|               title|        SUM_TF_IDF|
+-------+--------------------+------------------+
|   1333|   Birds, The (1963)|11.910970878541328|
|   6299|Winged Migration ...| 9.407798850221976|
|    965|39 Steps, The (1935)| 7.729726945109338|
|   1086|Dial M for Murder...| 7.729726945109338|
|   2183|Man Who Knew Too ...| 7.729726945109338|
|    908|North by Northwes...| 7.729726945109338|
|    930|    Notorious (1946)| 7.729726945109338|
|   1219|       Psycho (1960)| 7.729726945109338|
|    904|  Rear Window (1954)| 7.729726945109338|
|    928|      Rebecca (1940)| 7.729726945109338|
+-------+--------------------+------------------+

+-------+--------------------+------------------+
|movieId|               title|        SUM_TF_IDF|
+-------+--------------------+------------------+
|  99114|Django Unchained ...|18.963598863331505|
| 128360|The Hateful Eight...|18.963598863331505|
|  68157|Inglourious Baste...|18.423496364059403|

#### jiMovieSimilarity

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,IntegerType


def jiMovieSimilarity(ratings, minRatings=10, debug=False):

    data = ratings.withColumnRenamed('movieId','m1').filter(ratings.rating >=4.0)
    
    df_m1 = data\
            .groupBy('m1')\
            .agg(F.collect_set(ratings.userId)\
                 .alias('fu1')\
               )\
    
    #renoemar para conseguir comparar os valores m1 e m2
    df_m2 = df_m1\
        .withColumnRenamed('m1', 'm2')\
        .withColumnRenamed('fu1', 'fu2')
    
    prod = df_m1\
        .crossJoin(df_m2)\
        .filter(df_m1.m1 < df_m2.m2)
    
    if debug:
        prod.show()
    if debug:
        df_m1.show()
        
    prod2 = prod\
       .withColumn('i',\
           F.size(F.array_intersect(df_m1.fu1,df_m2.fu2)))\
       .withColumn('u',\
           F.size(F.array_union(df_m1.fu1, df_m2.fu2)))\
       .drop('fu1','fu2')
   
    if debug:
        prod2.show()
        
    result = prod2\
       .withColumn('JI', prod2.i / prod2.u) #aplicar a formula do jaccard index
    
    return result



#### recommendBySimilarity

In [11]:
def recommendBySimilarity(movieId, movies, jiForMovies, numberOfResults=10, debug=False):
    # TODO
    if debug:
        jiForMovies.show()
    #df.show()
    
    df = jiForMovies.filter(movieId == jiForMovies.m1).select('m2','JI').orderBy('JI',ascending=[0])
    df2 = jiForMovies.filter(movieId == jiForMovies.m2).select('m1','JI').orderBy('JI',ascending=[0])
    
    df = df.withColumnRenamed('m2', 'movieId') #renomear para ser mais facil usar o join
    df2 = df2.withColumnRenamed('m1', 'movieId') 
    
    if debug:
        df.show()
        df2.show()
    
    result = df.union(df2)
    result = result.join(movies,'movieId')
    
    result = result.select('movieId','title','JI').orderBy('JI',ascending=[0]).limit(numberOfResults)
    return result


# Specify input data set and load it

In [16]:
# Load data
bucket = 'gs://bddcc_up201405426' 
path = '/p1/'
dataset = 'medium1'
fullPath = bucket + path + dataset

(movies, ratings, tags) = \
  loadMovieLensData(fullPath, format='csv', debug=True)

Reading gs://bddcc_up201405426/p1/medium1/movies.csv
Reading gs://bddcc_up201405426/p1/medium1/ratings.csv
Reading gs://bddcc_up201405426/p1/medium1/tags.csv
> movies
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
|      6|         Heat (1995)|
|      7|      Sabrina (1995)|
|      8| Tom and Huck (1995)|
|      9| Sudden Death (1995)|
|     10|    GoldenEye (1995)|
|     11|American Presiden...|
|     12|Dracula: Dead and...|
|     13|        Balto (1995)|
|     14|        Nixon (1995)|
|     15|Cutthroat Island ...|
|     16|       Casino (1995)|
|     17|Sense and Sensibi...|
|     18|   Four Rooms (1995)|
|     19|Ace Ventura: When...|
|     20|  Money Train (1995)|
+-------+--------------------+
only s

##  Test code 

__Include test code below that you may need here.__

__The initial contents are only meant as an example.__

__This section will NOT be evaluated.__

In [17]:
#jiM = jiMovieSimilarity(ratings)
#sm = recommendBySimilarity(296, movies, jiM)
#sm.show()



+-------+--------------------+-------------------+
|movieId|               title|                 JI|
+-------+--------------------+-------------------+
|    593|Silence of the La...| 0.4794952681388013|
|    318|Shawshank Redempt...| 0.4388888888888889|
|     50|Usual Suspects, T...| 0.4381625441696113|
|     47|Seven (a.k.a. Se7...| 0.4117647058823529|
|   2959|   Fight Club (1999)| 0.4053156146179402|
|    356| Forrest Gump (1994)|0.38095238095238093|
|   2571|  Matrix, The (1999)| 0.3746312684365782|
|    858|Godfather, The (1...| 0.3720136518771331|
|   1089|Reservoir Dogs (1...|0.35135135135135137|
|    608|        Fargo (1996)| 0.3462897526501767|
+-------+--------------------+-------------------+



In [51]:
# Recommend by tag 



In [None]:

#jiM.orderBy(['JI','m1','m2'], ascending=[0,1,1]).show()




In [7]:
#jiM.cache()

# Pulp Fiction
#sm = recommendBySimilarity(296, movies, jiM)
#sm.show()

# Fight club
#sm = recommendBySimilarity(2959, movies, jiM)
#sm.show()
    
# Shrek
#sm = recommendBySimilarity(4306, movies, jiM)
#sm.show()
