# BDCC project 1 

_[Big Data and Cloud Computing](http://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), DCC/FCUP_


## Code necessary to run from the command line 

In [1]:
if __name__ == "__main__" :
    # This block is required to run the program from the command line
    # in interface with a single Spark instance
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    spark = SparkSession\
        .builder\
        .appName("BDCCp1")\
        .master("local[*]")\
        .getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

## Provided code - auxilliary functions

__You should not need to edit these.__

#### loadMovieLensData

In [3]:
from pyspark.sql import functions as F

def readCSV(file, debug=False):
    if debug:
      print('Reading ' + file)
    return spark.read.csv(file, inferSchema=True, header=True)

def readParquet(file, debug=False): 
    if debug:
       print('Reading ' + file)
    return spark.read.parquet(file)

def loadMovieLensData(path, format='parquet', debug=False):
    if format == 'parquet':
       movies = readParquet(path +'/movies.parquet', debug)
       ratings = readParquet(path +'/ratings.parquet', debug)
       tags = readParquet(path +'/tags.parquet', debug)
    else:
       movies = readCSV(path +'/movies.csv', debug)
       ratings = readCSV(path +'/ratings.csv', debug)
       tags = readCSV(path +'/tags.csv', debug)
    
    tags = tags.withColumn('tagl', F.explode(F.split(F.lower(F.col('tag')),'[ \*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+')))\
            .drop('tag')\
            .withColumnRenamed('tagl','tag')
    if (debug):
        print('> movies')
        movies.printSchema()
        movies.show()
        print('> ratings')
        ratings.printSchema()
        ratings.show()
        print('> tags')
        tags.printSchema()
        tags.show()
    return (movies, ratings, tags)

#### writeCSV / writeParquet (use them to write a data frame to CSV or Parquet format)

In [4]:
def writeCSV(df, path): 
    df.write.csv(path, header=True, mode='overwrite')

def writeParquet(df,path):
    df.write.parquet(path, mode='overwrite')


#### createTagListDF

In [5]:
def createTagListDF(csvTagList):
    return spark.createDataFrame([ (t,) for t in csvTagList.split(' ')], ['tag'])

#### Definition of functions available only in Spark 2.4 (GCP Spark instances run Spark 2.3) 

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,IntegerType

# Define F.array_intersect if not defined (Spark version < 2.4)
if not hasattr(F,'array_intersect'):
  F.array_intersect = spark.udf\
    .register('array_intersect', 
       lambda x,y: list(set(x) & set(y)), ArrayType(IntegerType()))

# Define F.array_union if not defined (Spark version < 2.4)
if not hasattr(F,'array_union'):
  F.array_union = spark.udf\
    .register('array_union', 
       lambda x,y: list(set(x) | set(y)), ArrayType(IntegerType()))

## Functions to define 

__This is the section that will be evaluated.__

__Include your code for the various functions required in the assigment below.__

__You may include other auxilliary functions required for computation here
but NOT test code (see below).__



#### tfidfTags

In [25]:
from pyspark.sql import functions as F
    
def tfidfTags(tags, debug=False):
    
    df_f = tags.groupBy('tag', 'movieId')\
            .agg(F.count('userId').alias('f'))
    
    df_f_max = df_f.groupBy('movieId')\
                .agg(F.max('f').alias('f_max'))
    
    #df_n = tags.groupBy('tag')\
    #        .agg(F.countDistinct('movieId').alias('n'))
    
    idf = getIDF2(tags, False, False, 'tag', 'movieId')
    #return idf
    #return df_n

    #df = df_f.join(df_n, 'tag')
    
    df = df_f.join(df_f_max, 'movieId')\
            .withColumn('TF', F.col('f') / F.col('f_max'))
    
    df = df.join(idf, 'tag')
    
    return df

#### recommendByTag

In [None]:
from pyspark.sql import functions as F

def recommendByTag(singleTag, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    # TODO
    
    return None 

#### recommendByTags

In [None]:
from pyspark.sql import functions as F

def recommendByTags(searchTags, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    searchTagsDF = createTagListDF(searchTags)
    if debug:
        print('> Search tags DF: ' + searchTags)
        searchTagsDF.show()
    # TODO
        
    return None 

#### jiMovieSimilarity

In [None]:
from pyspark.sql import functions as F

def jiMovieSimilarity(ratings, minRatings=10, debug=False):
  # TODO
  return None

#### recommendBySimilarity

In [None]:
def recommendBySimilarity(movieId, movies, jiForMovies, numberOfResults=10, debug=False):
    # TODO
        
    return None

# Specify input data set and load it

In [8]:
# Load data
bucket = 'gs://bdcc1819'
path = '/p1/data/'
dataset = 'tiny3'
fullPath = bucket + path + dataset

(movies, ratings, tags) = \
  loadMovieLensData(fullPath, format='csv', debug=True)

Reading gs://bdcc1819/p1/data/tiny3/movies.csv
Reading gs://bdcc1819/p1/data/tiny3/ratings.csv
Reading gs://bdcc1819/p1/data/tiny3/tags.csv
> movies
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
|      6|         Heat (1995)|
|      7|      Sabrina (1995)|
|      8| Tom and Huck (1995)|
|      9| Sudden Death (1995)|
|     10|    GoldenEye (1995)|
|     11|American Presiden...|
|     12|Dracula: Dead and...|
|     13|        Balto (1995)|
|     14|        Nixon (1995)|
|     15|Cutthroat Island ...|
|     16|       Casino (1995)|
|     17|Sense and Sensibi...|
|     18|   Four Rooms (1995)|
|     19|Ace Ventura: When...|
|     20|  Money Train (1995)|
+-------+--------------------+
only showing top 20 rows

##  Test code 

__Include test code below that you may need here.__

__The initial contents are only meant as an example.__

__This section will NOT be evaluated.__

In [26]:
# Get TF-IDF for tags
tfidf = tfidfTags(tags, debug=False)
tfidf.show()
#tfidf.cache()
#tfidf.orderBy('TF',ascending=True).show()


+-----------+-------+---+-----+------------------+-----+-----------------+
|        tag|movieId|  f|f_max|                TF|n_w_D|              IDF|
+-----------+-------+---+-----+------------------+-----+-----------------+
|         in|     28|  1|    1|               1.0|    2|4.426264754702098|
|       epic|    110|  1|    1|               1.0|    1|5.426264754702098|
|       pitt|     32|  1|    3|0.3333333333333333|    1|5.426264754702098|
|       hyde|     92|  1|    1|               1.0|    1|5.426264754702098|
|       game|      2|  2|    2|               1.0|    1|5.426264754702098|
|     remake|     32|  1|    3|0.3333333333333333|    3|3.841302253980942|
|apocalyptic|     32|  2|    3|0.6666666666666666|    1|5.426264754702098|
|       time|     32|  3|    3|               1.0|    1|5.426264754702098|
|       adam|    104|  1|    1|               1.0|    1|5.426264754702098|
|  beautiful|    110|  1|    1|               1.0|    1|5.426264754702098|
|         in|     40|  1|

In [None]:
# Recommend by tag 

rm = recommendByTag('cartoon', tfidf, movies)
rm.show()

rm = recommendByTag('cartoon', tfidf, movies, min_fmax=1)
rm.show()


rm = recommendByTag('cruise', tfidf, movies)
rm.show()




In [None]:




rm = recommendByTags('tom hanks cruise', tfidf, movies, numberOfResults=20)
rm.show()

rm = recommendByTags('tom hanks airport', tfidf, movies, numberOfResults=20)
rm.show()

rm = recommendByTags('tom hanks', tfidf, movies, numberOfResults=20)
rm.show()

rm = recommendByTags('hitchcock birds', tfidf, movies, numberOfResults=10)
rm.show()




In [None]:
jiM = jiMovieSimilarity(ratings)

#jiM.orderBy(['JI','m1','m2'], ascending=[0,1,1]).show()




In [None]:
#jiM.cache()

# Pulp Fiction
#sm = recommendBySimilarity(296, movies, jiM)
#sm.show()

# Fight club
#sm = recommendBySimilarity(2959, movies, jiM)
#sm.show()
    
# Shrek
#sm = recommendBySimilarity(4306, movies, jiM)
#sm.show()


In [24]:
def getTF(data, debug=False):
    f_wd = data\
       .groupBy('w','d')\
             .agg(F.count('w').alias('f_wd'))
    if debug:
        f_wd.orderBy('d','w').show()

    f_wd_max = f_wd\
             .groupBy('d')\
             .agg(F.max('f_wd').alias('f_wd_max'))
    if debug:
        f_wd_max.orderBy('d').show()
        
    TF = f_wd.join(f_wd_max, 'd')\
             .withColumn('TF', F.col('f_wd') / F.col('f_wd_max'))\
             .drop('f_wd','f_wd_max')
    return TF

def getIDF(data, debug=False):
    n_w_D = data\
           .groupBy('w')\
           .agg(F.countDistinct('d').alias('n_w_D'))
    if debug:
        n_w_D.orderBy('n_w_D',ascending=False).show()
        
    size_of_D = data.select('d').distinct().count()
    if debug:
        print("|D| = %d" % size_of_D)
    
    IDF = n_w_D\
            .withColumn('IDF', F.log2(size_of_D / F.col('n_w_D')))\
            .drop('n_w_D')
            
    return IDF

# By default, uses the standard usage of IDF -> 'w' is a word in a document 'd'
# Set w or d to use different column names
def getIDF2(data, debug=False, drop_n=True, w='w', d='d', n='n_w_D'):
    n_w_D = data\
           .groupBy(w)\
           .agg(F.countDistinct(d).alias(n))
    if debug:
        n_w_D.orderBy('n_w_D',ascending=False).show()
        
    size_of_D = data.select(d).distinct().count()
    if debug:
        print("|D| = %d" % size_of_D)
    
    IDF = n_w_D\
            .withColumn('IDF', F.log2(size_of_D / F.col(n)))
    
    if drop_n:
        IDF = IDF.drop('n_w_D')
            
    return IDF
    
def getTF_IDF(data, debug=False):
    TF = getTF(data, debug)
    if debug:
        TF.orderBy(['d','TF'],ascending=[1,0]).show(TF.count())
    
    IDF = getIDF(data, debug)
    if debug:
        IDF.orderBy(['IDF','w'], ascending=[0,1]).show(IDF.count())

    TF_IDF = TF\
      .join(IDF,'w')\
      .withColumn('TF_IDF',F.col('TF') * F.col('IDF'))
        
    if debug:
        TF_IDF.orderBy(['d','TF_IDF','w'],ascending=[1,0,1]).show(TF_IDF.count())
    return TF_IDF
