# BDCC project - Spark data processing

**[Big Data and Cloud Computing](https://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), Eduardo R. B. Marques, DCC/FCUP**


## Spark setup

In [1]:

def setupSpark():
  # Spark needs to run with Java 8 ... 
  !pip install -q findspark
  !apt-get install openjdk-8-jdk-headless > /dev/null
  !echo 2 | update-alternatives --config java > /dev/null
  # !java -version
  import os, findspark
  os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
  # !echo JAVA_HOME=$JAVA_HOME
  !pip install -q pyspark
  findspark.init(spark_home='/usr/local/lib/python3.6/dist-packages/pyspark')
  !pyspark --version

setupSpark()

from pyspark import SparkContext
from pyspark.sql import SparkSession
    
spark = SparkSession\
        .builder\
        .master('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_242
Branch HEAD
Compiled by user centos on 2020-02-02T19:38:06Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/drive/My Drive/bdcc-colab.json' 
!echo $GOOGLE_APPLICATION_CREDENTIALS

/content/drive/My Drive/bdcc-colab.json


In [4]:
from google.cloud import storage

storage_client = storage.Client()
buckets = storage_client.list_buckets()
print('-- List of buckets in project \"' + storage_client.project + '\"')

for b in buckets:
  print(b.name)


-- List of buckets in project "bdcc20-p1"
bdcc20-movie_data


In [5]:
# To enable the GPU access Edit > Notebook settings and set the Hardware accelerator to GPU.

%tensorflow_version 2.x 
import tensorflow as tf

print("GPU device: " + tf.test.gpu_device_name())

from tensorflow.python.client import device_lib

tf_devices = device_lib.list_local_devices()

for x in tf_devices:
  print('------')
  print(x)


GPU device: 
------
name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 2734688099200934691

------
name: "/device:XLA_CPU:0"
device_type: "XLA_CPU"
memory_limit: 17179869184
locality {
}
incarnation: 10272481926892409763
physical_device_desc: "device: XLA_CPU device"



## Parameters

In [0]:

#@markdown ---
DEBUG = True #@param {type: "boolean"} 
PROJECT_ID = 'bdcc20-p1'  #@param {type: "string"}
INPUT_BUCKET = 'bdcc20-movie_data/bdcc1920_project_datasets' #@param {type: "string"}
DATASET = 'tiny1' #@param ["tiny1", "tiny2", "tiny3", "tiny4", "medium1", "medium2", "medium3", "medium4", "large1", "large2", "large3", "large4", "large5"] {allow-input: true}
OUTPUT_BUCKET = 'bdcc20-movie_data/bdcc1920_project_outputs' #@param {type: "string"}
OUTPUT_ZIP_FILE = 'output.zip' #@param {type: "string"}
COPY_PARQUET_FILES_TO_OUTPUT_BUCKET = True #@param {type: "boolean"} 
MIN_TF_IDF = 0.1 #@param {type:"slider", min:0, max:1, step:0.05}
SEND_PUBSUB_MESSAGE = True #@param {type: "boolean"} 
PUBSUB_TOPIC = 'dispatcher' #@param {type: "string"}
#@markdown ---


## Authenticate to GCP

In [7]:
# The authentication method 
def google_colab_authenticate(projectId, keyFile=None, debug=True):  
    import os
    from google.colab import auth
    if keyFile == None:
      keyFile='/content/bdcc-colab.json'
    if os.access(keyFile,os.R_OK):
      if debug:
        print('Using key file "%s"' % keyFile)
      os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '%s' % keyFile
      os.environ['GCP_PROJECT'] = 'bdcc20-p1' 
      os.environ['GCP_ACCOUNT'] = 'bdcc-colab@' + projectId + '.iam.gserviceaccount.com'
      !gcloud auth activate-service-account --key-file="$GOOGLE_APPLICATION_CREDENTIALS" --project="$GCP_PROJECT"
    else:
      if debug:
        print('No key file given. You may be redirected to the verification code procedure.')
      auth.authenticate_user()
      !gcloud config set project $projectId
    !gcloud info | grep -e Account -e Project

# Copy key file from Google Drive if available 
# to a path without spaces (it usually creates problems)
!test -f "/content/drive/My Drive/bdcc-colab.json" && cp "/content/drive/My Drive/bdcc-colab.json" /content/bdcc-colab.json

google_colab_authenticate(PROJECT_ID)

Using key file "/content/bdcc-colab.json"
Activated service account credentials for: [bdcc-cloud@bdcc20-p1.iam.gserviceaccount.com]
Account: [bdcc-cloud@bdcc20-p1.iam.gserviceaccount.com]
Project: [bdcc20-p1]


## Transfer dataset files (if necessary) from GCS

In [8]:
!test -d $DATASET || gsutil -m cp -r gs://"$INPUT_BUCKET"/"$DATASET" .
!du --human $DATASET

24K	tiny1/genres.parquet
104K	tiny1/output/tfidf.parquet
176K	tiny1/output/jaccardIndex.parquet
88K	tiny1/output/movies_agg.parquet
372K	tiny1/output
16K	tiny1/tags.parquet
24K	tiny1/actors.parquet
16K	tiny1/movies.parquet
16K	tiny1/ratings.parquet
524K	tiny1


## Parquet file read/write methods


In [0]:
def readCSV(file):
    global spark
    if DEBUG:
       print('==> Reading ' + file)
    if DEBUG:
      df.printSchema()
      df.show(10)
    return spark.read.csv(file, inferSchema=True, header=True)

def readParquet(file):
    global spark
    if DEBUG:
       print('==> Reading ' + file)
    df = spark.read.parquet(file)
    if DEBUG:
      df.printSchema()
      df.show(10)
    return df
 
def writeParquet(df,path):
    if DEBUG:
        print('==> Writing ' + path)
    !rm -fr $path
    df.write.parquet(path, mode='overwrite')


def readFile(file, format):
  file += '.'
  file += format
  return (readParquet(file) if format == 'parquet' 
          else readCSV(file))

def loadMovieLensData(path, format='parquet'):
    actors = readFile(path +'actors', format)
    genres = readFile(path +'genres', format)
    movies = readFile(path +'movies', format)
    ratings = readFile(path +'ratings', format)
    tags = readFile(path +'tags', format)
    return actors, genres, movies, ratings, tags

## Load input dataset from Parquet files

In [10]:
bucket = 'gs://bdcc20-movie_data/' 
path = 'bdcc1920_project_datasets/'
dataset = 'tiny1/'
fullPath = bucket + path + dataset

actors, genres, movies, ratings, tags = \
  loadMovieLensData(DATASET+'/')
"""
movies =  readParquet(DATASET + '/movies.parquet')
actors =  readParquet(DATASET + '/actors.parquet')
genres =  readParquet(DATASET + '/genres.parquet')
ratings = readParquet(DATASET + '/ratings.parquet')
tags =    readParquet(DATASET + '/tags.parquet')
"""

==> Reading tiny1/actors.parquet
root
 |-- movieId: integer (nullable = true)
 |-- name: string (nullable = true)

+-------+--------------+
|movieId|          name|
+-------+--------------+
|      1|     Tim Allen|
|      1|   Don Rickles|
|      1|    Jim Varney|
|      1|     Tom Hanks|
|      2| Kirsten Dunst|
|      2|Robin Williams|
|      2| Jonathan Hyde|
|      2|   Bonnie Hunt|
|      3|   Jack Lemmon|
|      3|Walter Matthau|
+-------+--------------+
only showing top 10 rows

==> Reading tiny1/genres.parquet
root
 |-- movieId: integer (nullable = true)
 |-- genre: string (nullable = true)

+-------+---------+
|movieId|    genre|
+-------+---------+
|      1|Adventure|
|      1|Animation|
|      1| Children|
|      1|   Comedy|
|      1|  Fantasy|
|      2|Adventure|
|      2| Children|
|      2|  Fantasy|
|      3|   Comedy|
|      3|  Romance|
+-------+---------+
only showing top 10 rows

==> Reading tiny1/movies.parquet
root
 |-- movieId: integer (nullable = true)
 |-- titl

"\nmovies =  readParquet(DATASET + '/movies.parquet')\nactors =  readParquet(DATASET + '/actors.parquet')\ngenres =  readParquet(DATASET + '/genres.parquet')\nratings = readParquet(DATASET + '/ratings.parquet')\ntags =    readParquet(DATASET + '/tags.parquet')\n"

## Calculate aggregate movie data

The aim is to derive a data frame with the same data as __movies__, augmented
with __numRatings__ and __avgRating__ columns, respectively representing the 
number of ratings and average rating per movie.

In [11]:
movies.createOrReplaceTempView('movies')
ratings.createOrReplaceTempView('ratings')

if DEBUG:
  print(movies.count())
  movies.printSchema()
  movies.orderBy('movieId').show()

ratings_agg = spark.sql(
    '''
    SELECT movieId,
          COUNT(*) AS numRatings,
          AVG(rating) AS avgRating
    FROM ratings
    GROUP BY movieId
    '''
)
ratings_agg.createOrReplaceTempView('ratings_agg')

if DEBUG:
  print(ratings_agg.count())
  ratings_agg.printSchema()
  ratings_agg.orderBy('movieId').show()

movies_agg = spark.sql(
    '''
    SELECT movieId, title, year, imdbId, 
           ifnull(numRatings, 0) as numRatings,
           ifnull(avgRating, 0.0) as avgRating
    FROM movies LEFT OUTER JOIN ratings_agg USING(movieId)
    ORDER BY movieId
    '''
)
if DEBUG:
  print(movies_agg.count())
  movies_agg.printSchema()
  movies_agg.show()

# no need for these, clean up
spark.catalog.dropGlobalTempView('agg_ratings')
spark.catalog.dropGlobalTempView('ratings')
ratings.unpersist()

10
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- imdbId: integer (nullable = true)

+-------+--------------------+----+------+
|movieId|               title|year|imdbId|
+-------+--------------------+----+------+
|      1|           Toy Story|1995|114709|
|      2|             Jumanji|1995|113497|
|      3|    Grumpier Old Men|1995|113228|
|      4|   Waiting to Exhale|1995|114885|
|      5|Father of the Bri...|1995|113041|
|      6|                Heat|1995|113277|
|      7|             Sabrina|1995|114319|
|      8|        Tom and Huck|1995|112302|
|      9|        Sudden Death|1995|114576|
|     10|           GoldenEye|1995|113189|
+-------+--------------------+----+------+

10
root
 |-- movieId: integer (nullable = true)
 |-- numRatings: long (nullable = false)
 |-- avgRating: double (nullable = true)

+-------+----------+------------------+
|movieId|numRatings|         avgRating|
+-------+----------+----

DataFrame[movieId: int, userId: int, rating: double]

## Derive words for TF-IDF processing

In [12]:
from pyspark.sql import functions as F

# Utility method to explode strings contained in the given field into a 'word' column
def wordDf(df, field):
  wdf = df.withColumn('word',
                      F.explode(F.split(F.lower(F.col(field)),'[ \s\*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+'))).\
                      drop(field)

  return wdf


actor_words = wordDf(actors, 'name')
genre_words = wordDf(genres,'genre')
tag_words = wordDf(tags.drop('userId'), 'tag')
title_words = wordDf(movies.select('movieId','title'), 'title')

all_words = actor_words.union(genre_words)\
                       .union(tag_words)\
                       .union(title_words)\
                       
if DEBUG:
  print(all_words.count())
  all_words.orderBy('movieId','word').show()


149
+-------+---------+
|movieId|     word|
+-------+---------+
|      1|adventure|
|      1|    allen|
|      1|animation|
|      1| children|
|      1|   comedy|
|      1|      don|
|      1|  fantasy|
|      1|      fun|
|      1|    hanks|
|      1|      jim|
|      1|    pixar|
|      1|    pixar|
|      1|  rickles|
|      1|    story|
|      1|      tim|
|      1|      tom|
|      1|      toy|
|      1|   varney|
|      2|adventure|
|      2|    board|
+-------+---------+
only showing top 20 rows



## Calculate TF-IDF metric (TODO)

The aim is to derive a data frame with __(moviedId, word, tf_idf)__ values
where the TF-IDF calculation is similar to the [one we discussed before during classes](https://colab.research.google.com/drive/15yMxBf1ltxrIaHCZUqqUWZBm6cqdcErF), i.e., the ${\rm TFIDF}$ term we consider for a movie $m$ in the set of movies $M$ and word $w$ is given by
   
   $$
   {\rm TFIDF}(m,w) = {\rm TF}(m,w) \:\times \:{\rm IDF}(w,M)
   $$

Only TF_IDF values equal or higher than the __MIN\_TF_\_IDF__ threshold should be written to the output files. The standard value to use for __MIN\_TF_\_IDF__ is  __0.1__ but you may adjust this value for testing purposes.


__Important note__: you should __ONLY__ use the [Spark Data Frame API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [Spark SQL](https://docs.databricks.com/spark/latest/spark-sql/language-manual/select.html) in your code (__NOT__ Pandas, any other Spark or generic Python libraries).




In [0]:
class Constants:
  f = 'f'
  f_max = 'f_max'
  tf = 'TF'
  n = 'n' 
  idf = 'IDF'
  tf_idf = 'tfidf'
  nr = 'nr'

def show(debug, dataframe, message=None, rows=10):
    if debug:
        if message: 
          print("> %s (%d rows)" % (message, dataframe.count()))
        dataframe.show(rows)

def get_idf(data, word='word', doc='doc', nr=None, debug=False):
    """Calculates the Inverse Document Frequency (IDF) of a DataFrame

    Args:
        data: A DataFrame instance.
        word: Column word for 'word'
        doc: Column name for 'documents'
        nr: number of documents in which 'word' appears

    Returns:
        DataFrame ('word', 'IDF, [nr])
    """
    n_w_D = data\
           .groupBy(word)\
           .agg(F.countDistinct(doc).alias('n_w_D'))
    show(debug, n_w_D.orderBy('n_w_D',ascending=False))
        
    size_of_D = data.select(doc).distinct().count()
    if debug: print("|D| = %d" % size_of_D)
    
    IDF = n_w_D.withColumn(Constants.idf, F.log2(size_of_D / F.col('n_W_D')))
    return IDF.withColumnRenamed('n_w_D', Constants.nr) if nr else IDF.drop('n_w_D')

def tf_idf(data, word, doc, debug):
    # f - nr of times word has been associated with doc by user
    #           result -> (word, doc, Constants.f)
    f = data.groupBy(word, doc)\
               .agg(F.count(doc).alias(Constants.f))
    show(debug, f, "group by ($word, $doc) count frequency done")
    # data.orderBy(word).show()
    # f.orderBy(word).show()
    
    # f_max - maximum absolute frequency of any word used for doc
    #         result -> (doc, Constants.f_max)
    f_max = f.groupBy(doc)\
                   .agg(F.max(Constants.f).alias(Constants.f_max))
    show(debug, f_max, "Max frequency per movie done")
    
    # call external function to calculate IDF
    idf = get_idf(data, word, doc, Constants.nr, debug)
    show(debug, idf, "IDF done")
    
    # join Constants.f_max on doc, calculate TF, join with IDF on word
    df = f.join(f_max, doc)\
             .withColumn(Constants.tf, F.col(Constants.f) / F.col(Constants.f_max))\
             .join(idf, word)
    show(debug, df, "TF done")
    
    # return dataframe with TF_IDF
    return df.withColumn(Constants.tf_idf, df.TF * df.IDF)

In [14]:
# Get TF-IDF for tags
word = 'tag'
wordFinal = 'word'
doc = 'movieId'

tfidf = tf_idf(tags, word, doc, False)
tfidf.cache()

# guarantee all columns are present
tfidf = tfidf.drop(Constants.f)\
          .drop(Constants.f_max)\
          .drop(Constants.tf)\
          .drop(Constants.nr)\
          .drop(Constants.idf)\
          .withColumnRenamed(word, wordFinal)

# assert tfidf.columns == [word, doc, Constants.f, Constants.f_max, Constants.tf, Constants.nr, Constants.idf, Constants.tf_idf],\
assert tfidf.columns == [wordFinal, doc, Constants.tf_idf],\
    "Columns do not match expected values for tfidfTags"
# preview the dataframe
tfidf.orderBy([Constants.f,Constants.tf_idf, doc,word], ascending=[0,0,1,1]).show()

+----------------+-------+------------------+
|            word|movieId|             tfidf|
+----------------+-------+------------------+
|           pixar|      1| 2.321928094887362|
|  Robin Williams|      2| 2.321928094887362|
|         fantasy|      2| 2.321928094887362|
|            game|      2| 2.321928094887362|
|magic board game|      2| 2.321928094887362|
|           moldy|      3| 2.321928094887362|
|             old|      3| 2.321928094887362|
|       pregnancy|      5| 2.321928094887362|
|          remake|      5|1.3219280948873624|
|          remake|      7|1.3219280948873624|
|             fun|      1| 1.160964047443681|
+----------------+-------+------------------+



## Movie similarity based on the Jaccard index (TODO for bonus grading)

For every pair of movies $m_1$ and $m_2$ compute a similary ratio based on the __Jaccard index__: 

  $$
  {\rm JI}(m_1, m_2) =  \frac{| {\rm urt}(m_1) \cap {\rm urt}(m_2)|}{ |{\rm urt}(m_1) \cup {\rm urt}(m_2)|}   
  $$

where ${\rm urt}(m)$ is defined as the set of users who have tagged or rated a movie $m$.

For further reference on the Jaccard index metric see:

  - [Mining of Massive Data Sets, sec. 3.3.1](http://infolab.stanford.edu/%7Eullman/mmds/book.pdf)
  - [Wikipedia page for the Jaccard Index](https://en.wikipedia.org/wiki/Jaccard_index)





Calculates the Jaccard index to measure similarity between movies based on user ratings.

Linking a movie means rating >= 4.0

In [15]:
def jaccard_index(df1, df2, column="user", sort="movie"):
    # sort    - prefix of the columns to sort
    # column  - prefix of the columns to use as sets
    
    # 1. product - cross join to get movie1, movie2
    # 2. count intersect set
    # 3. count union set
    # 4. calculate Jaccard Index
    # 5. remove unwanted columns
    users = [column + "1", column + "2"]
    sorts = ["%s1" % (sort), "%s2" % (sort)]
    df1 = df1.crossJoin(df2)\
              .filter("%s < %s" % (sorts[0], sorts[1]))
    df1 = df1.withColumn("user", F.size(F.array_union(users[0], users[1])))\
              .withColumn("index", F.size(F.array_intersect(users[0], users[1])))\
              .withColumn("jaccard_index", F.col("index")/F.col("user"))
    return df1.drop(users[0], users[1])
              
def movieSimilarity(ratings, minRatings=10, threshold=4.0, debug=False):

    ratings = ratings.filter("rating >= %i" % (threshold))
    
    ratings=ratings.drop("rating")
    show(debug, ratings, "like's dataframe")
        
    # filter movies with less than minRatings ratings
    # obtain set of users that LIKED a given movie
    df_m1 = ratings.groupBy("movieId")\
                    .agg(F.collect_set(ratings.userId).alias("user1"))\
                    .withColumnRenamed("movieId", "movie1")\
                    .filter(minRatings < F.size("user1"))
    show(debug, df_m1, "movie with users that liked it")
        
    # duplicate dataframe for cross join
    df_m2 = df_m1.withColumnRenamed("user1", "user2")\
                 .withColumnRenamed("movie1", "movie2")
    show(debug, df_m2, "movie with users that liked it - copy renamed")
        
    return jaccard_index(df_m1, df_m2) 

ji = movieSimilarity(ratings).orderBy(['index','jaccard_index','movie1','movie2'], ascending=[0,0,1,1])
assert ji.columns == ["movie1", "movie2", "user", "index", "jaccard_index"], "unexpected column value"
ji.show()

+------+------+----+-----+--------------------+
|movie1|movie2|user|index|       jaccard_index|
+------+------+----+-----+--------------------+
|     1|     6| 189|   27| 0.14285714285714285|
|     1|     2| 176|   21| 0.11931818181818182|
|     1|    10| 187|   19| 0.10160427807486631|
|     6|    10| 117|   11| 0.09401709401709402|
|     1|     3| 154|   11| 0.07142857142857142|
|     2|    10| 100|    9|                0.09|
|     2|     6| 111|    8| 0.07207207207207207|
|     1|     5| 152|    7|0.046052631578947366|
|     1|     7| 160|    7|             0.04375|
|     2|     7|  64|    6|             0.09375|
|     3|     7|  33|    5| 0.15151515151515152|
|     2|     3|  63|    5| 0.07936507936507936|
|     7|    10|  74|    5| 0.06756756756756757|
|     3|     5|  26|    4| 0.15384615384615385|
|     5|     7|  28|    4| 0.14285714285714285|
|     2|     5|  58|    4| 0.06896551724137931|
|     3|     6|  83|    4| 0.04819277108433735|
|     3|    10|  74|    3| 0.04054054054

## Write output data to Parquet files and generate ZIP file

In [16]:
# Clean up first
!rm -fr "$DATASET"/output 
!rm -f "$DATASET"/"$OUTPUT_ZIP_FILE"

if DEBUG:
  !ls -l $DATASET

writeParquet(movies_agg, DATASET + '/output/' + 'movies_agg.parquet')
writeParquet(tfidf, DATASET + '/output/' + 'tfidf.parquet')
# bonus
writeParquet(ji, DATASET + '/output/' + 'jaccardIndex.parquet')

if DEBUG:
  print('Creating ZIP file ...')

!cd "$DATASET"/output  && zip -9qr ../"$OUTPUT_ZIP_FILE" .

if DEBUG:
  !ls -l $DATASET "$DATASET"/output
 

total 20
drwxr-xr-x 2 root root 4096 Apr  3 14:08 actors.parquet
drwxr-xr-x 2 root root 4096 Apr  3 14:08 genres.parquet
drwxr-xr-x 2 root root 4096 Apr  3 14:08 movies.parquet
drwxr-xr-x 2 root root 4096 Apr  3 14:08 ratings.parquet
drwxr-xr-x 2 root root 4096 Apr  3 14:08 tags.parquet
==> Writing tiny1/output/movies_agg.parquet
==> Writing tiny1/output/tfidf.parquet
==> Writing tiny1/output/jaccardIndex.parquet
Creating ZIP file ...
tiny1:
total 76
drwxr-xr-x 2 root root  4096 Apr  3 14:08 actors.parquet
drwxr-xr-x 2 root root  4096 Apr  3 14:08 genres.parquet
drwxr-xr-x 2 root root  4096 Apr  3 14:08 movies.parquet
drwxr-xr-x 5 root root  4096 Apr  3 17:48 output
-rw-r--r-- 1 root root 51340 Apr  3 17:49 output.zip
drwxr-xr-x 2 root root  4096 Apr  3 14:08 ratings.parquet
drwxr-xr-x 2 root root  4096 Apr  3 14:08 tags.parquet

tiny1/output:
total 12
drwxr-xr-x 2 root root 4096 Apr  3 17:48 jaccardIndex.parquet
drwxr-xr-x 2 root root 4096 Apr  3 17:48 movies_agg.parquet
drwxr-xr-x 2 

## Copy output ZIP file to output bucket

In [17]:
! gsutil cp $DATASET/output.zip gs://"$OUTPUT_BUCKET"/"$DATASET"/output.zip 

Copying file://tiny1/output.zip [Content-Type=application/zip]...
/ [1 files][ 50.1 KiB/ 50.1 KiB]                                                
Operation completed over 1 objects/50.1 KiB.                                     


## Copy Parquet files to output bucket (optional)

In [18]:
if COPY_PARQUET_FILES_TO_OUTPUT_BUCKET:
  ! gsutil -m cp -r $DATASET/output/movies_agg.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/
  ! gsutil -m cp -r $DATASET/output/tfidf.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/
  ! gsutil -m cp -r $DATASET/output/jaccardIndex.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/

Copying file://tiny1/output/movies_agg.parquet/part-00000-1fba380d-3923-43ee-a216-c68e41b41394-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file://tiny1/output/movies_agg.parquet/_SUCCESS [Content-Type=application/octet-stream]...
/ [0/22 files][    0.0 B/ 15.9 KiB]   0% Done                                   Copying file://tiny1/output/movies_agg.parquet/.part-00001-1fba380d-3923-43ee-a216-c68e41b41394-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
/ [0/22 files][    0.0 B/ 15.9 KiB]   0% Done                                   / [0/22 files][    0.0 B/ 15.9 KiB]   0% Done                                   Copying file://tiny1/output/movies_agg.parquet/.part-00006-1fba380d-3923-43ee-a216-c68e41b41394-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
/ [0/22 files][    0.0 B/ 15.9 KiB]   0% Done                                   Copying file://tiny1/output/movies_agg.parquet/.part-00008-1fba380d-3923-43ee-a216-c68e41b413

## Send PubSub cloud message 

This will trigger the LCF cloud function. 

In [19]:
if SEND_PUBSUB_MESSAGE:
 !gcloud pubsub topics publish $PUBSUB_TOPIC --message $DATASET

messageIds:
- '1112573012219345'
