# Data Cleaning
### This notebook cleans and reorganzie the logs in data_collection/records.txt using spark. 
### This produces 
#### a. Two folders - userRating and userWatchLength, each containing a set of partition files about the users' ratings and users' watch history, a.k.a the time users spend on each movie <br>
#### b. Two files -  users.txt and movies.txt, containing user IDs and movie IDs that occurred in the past history.

### Create spark context and read in the records.txt

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext(appName="hw")
sqlContext = SQLContext(sc)

print("Spark context started")

In [None]:
recordsRDD = sc.textFile('records.txt')

In [None]:
recordsRDD.count()

### Separate different types of messages

In [None]:
watchRDD = recordsRDD.filter(lambda rec: '/data/m' in rec)
ratingRDD = recordsRDD.filter(lambda rec: '/rate/' in rec)
recommendRDD = recordsRDD.filter(lambda rec: 'recommendation' in rec)

In [None]:
watchRDD.count()

In [None]:
ratingRDD.count()

In [None]:
recommendRDD.count()

### Parse users' watch history

In [None]:
watchListRDD = watchRDD.map(lambda rec: rec.split(','))

In [None]:
watchPairRDD = watchListRDD.map(lambda lst: (lst[1], lst[2].split('/')))

In [None]:
userMoviePairRDD = watchPairRDD.map(lambda tup: ((tup[0], tup[1][3]), tup[1][4])).groupByKey().mapValues(list)

In [None]:
userMoviePairRDD = userMoviePairRDD.map(lambda tup: (tup[0], tup[1][-1]))

In [None]:
import re
userMoviePairRDD = userMoviePairRDD.mapValues(lambda v: int(re.sub('[^0-9]', '', v)))

In [None]:
userMoviePairRDD.take(5)

In [None]:
userMoviePairRDD.saveAsTextFile('KafkaData/userWatchLength')

### Parse users' ratings

In [None]:
ratingListRDD = ratingRDD.map(lambda rec: rec.split(','))
ratingPairRDD = ratingListRDD.map(lambda lst: (lst[1], lst[2].split('/')))

In [None]:
userRatingRDD = ratingPairRDD.map(lambda tup: (tup[0], tup[1][2].split('=')))

In [None]:
userRatingPairRDD = userRatingRDD.map(lambda tup: ((tup[0], tup[1][0]), tup[1][1]))

In [None]:
userRatingPairRDD.take(5)

In [None]:
userRatingPairRDD.saveAsTextFile('Kafka/userRating')

### Get all users IDs and movie IDs

In [None]:
usersRDD = recordsRDD.map(lambda rec: rec.split(',')[1]).distinct().filter(lambda s: s.isdigit())

In [None]:
usersRDD.count()

In [None]:
usersRDD.take(5)

In [None]:
allUsers = sorted(usersRDD.collect())
with open('users.txt', 'w') as file:
    for user in allUsers:
        file.writelines(user + '\n')

In [None]:
watchedMovie = userMoviePairRDD.map(lambda tup: tup[0][1]).distinct()
ratedMovie = userRatingPairRDD.map(lambda tup: tup[0][1]).distinct()

In [None]:
allMoviesRDD = watchedMovie.union(ratedMovie).distinct()

In [None]:
allMoviesRDD.take(5)

In [None]:
allMovies = allMoviesRDD.collect()
with open('movies.txt', 'w') as file:
    for movie in allMovies:
        file.write(movie + '\n')

In [None]:
allMoviesRDD.count()