## Transformation

In [None]:
%pyspark 

data = [1,2,3,4,5]
rdd = sc.parallelize(data, 4) 
rdd.collect()

In [None]:
%pyspark

lines = sc.textFile('file:/tmp/trump.txt') 
lines.take(3)

In [None]:
%pyspark
def timesTwo(e):
    return e * 2
    
rdd = sc.parallelize([1, 2, 3, 4]) 
rdd2 = rdd.map(timesTwo)
rdd2.collect()

In [None]:
%pyspark

rdd = sc.parallelize([1, 2, 3, 4]) 
rdd2 = rdd.map(lambda e: e * 2)
rdd2.collect()

In [None]:
%pyspark
rdd2 = rdd.filter(lambda x: x % 2 == 0)
rdd2.collect()

In [None]:
%pyspark
rdd = sc.parallelize([1,4,2,2,3])
rdd2 = rdd.distinct()
rdd2.collect()

In [None]:
%pyspark
rdd=sc.parallelize([1,2,3])
rdd2 = rdd.map(lambda x:[x,x+5])
rdd2.collect()

In [None]:
%pyspark
rdd2 = rdd.flatMap(lambda x:[x,x+5])
rdd2.collect()

## Action

In [None]:
rdd=sc.parallelize([1,2,3])
rdd.reduce(lambda x,y: x + y)

In [None]:
rdd.take(2)

In [None]:
rdd.collect()

In [None]:
rdd=sc.parallelize([5,3,1,2]) 
rdd.takeOrdered(3,lambda s:-1*s)

## Key-Value RDD

In [None]:
rdd  = sc.parallelize([(1,2), (3,4), (3,6)]) 
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
rdd2.collect()

In [None]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
rdd3 = rdd2.sortByKey()
rdd3.collect()

In [None]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
rdd3 = rdd2.groupByKey()
rdd3.collect()

## Broadcast, Accumulator

In [None]:
%pyspark
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar.value

In [None]:
%pyspark
accum = sc.accumulator(0) 
rdd = sc.parallelize([1, 2, 3, 4])

def f(x):
    global accum 
    accum += x
    
rdd.foreach(f) 

accum.value

## WordCount

In [None]:
%pyspark
rdd = sc.textFile('/tmp/trump.txt', 4)
rdd2 = rdd.flatMap(lambda e: e.lower().split())
dict = rdd2.countByValue()
dict

## Movie Likes

In [None]:
%pyspark
lines = sc.textFile("file:///tmp/u.data")
#lines.take(3)
movies = lines.map(lambda e: (e.split()[1],1) )
#movies.take(3)
movieCounts = movies.reduceByKey(lambda x,y: x+ y)
res = movieCounts.sortBy(lambda a : -a[1])
res.take(3)

In [None]:
%pyspark

def loadMovieNames(): 
    movieNames = {}
    with open('/tmp/u.item', 'r',encoding = 'latin1') as f:
        for line in f:
            fields = line.strip().split('|')
            movieNames[fields[0]] = fields[1] 
    return movieNames
    
nameDict = sc.broadcast(loadMovieNames())

In [None]:
%pyspark
#loadMovieNames()
res = movieCounts.sortBy(lambda a: -a[1])
res2 = res.map(lambda e: (nameDict.value.get(e[0]), e[1]))
res2.take(10)

## Spark DataFrame

In [None]:
%pyspark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
%pyspark
data_file = '/tmp/ratings.txt'
raw_data  = sc.textFile(data_file, 4)
raw_data.take(3)

header = raw_data.first()
header

skip_data = raw_data.filter(lambda line: line != header)
skip_data.take(3)


csv_data = skip_data.map(lambda l: l.split("::"))
csv_data.take(3)

In [None]:
%pyspark

from pyspark.sql import Row
row_data = csv_data.map(
        lambda p: 
            Row(
                userid=p[0], 
                itemid=p[1], 
                rating=int(p[2]) 
            )
)
row_data.take(3)

In [None]:
%pyspark
a = [{'userid':0, 'itemid':0, 'rating':4},
 {'userid':0, 'itemid':1, 'rating':5}
]
# padas df
import pandas
pandas_df = pandas.DataFrame(a)
pandas_df
pandas_df[['itemid', 'rating']].groupby('itemid')['rating'].mean()

In [None]:
%pyspark
df = sqlContext.createDataFrame(row_data)
df.take(3)
df.show(3)

In [None]:
%pyspark
# select itemid, AVG(rating) from df gorup by itemid  
df.select("itemid", "rating").groupBy("itemid").avg().show(3)

In [None]:
%pyspark

df.registerTempTable("ratings")
df.printSchema()

In [None]:
%pyspark
ratings_data = sqlContext.sql("""
    SELECT itemid,AVG(rating) FROM ratings GROUP BY itemid
""") 
ratings_data.show(3)

## Using SparkSQL to Analyze Movie Ratings

In [None]:
%pyspark
movies = sc.textFile('file:///tmp/u.item', 4)
movies_data = movies.map(lambda l:l.split('|')) 
#movies_data.take(3)
movies_row_data = movies_data.map(lambda p:
    Row(movieid=p[0], moviename=p[1] ) )
movies_row_data.take(4)

In [None]:
%pyspark
movies_df = sqlContext.createDataFrame(movies_row_data)
movies_df.registerTempTable("movies")

In [None]:
%pyspark
ratings = sc.textFile('file:///tmp/u.data', 4) 
ratings_data = ratings.map(lambda l:l.split()) 
ratings_row_data = ratings_data.map(lambda p:
    Row( userid=p[0], movieid=p[1], rating=int(p[2]) ) )

ratings_row_data.take(4)
df = sqlContext.createDataFrame(ratings_row_data) 
df.registerTempTable("ratings")

In [None]:
%pyspark

best_movies = sqlContext.sql("""
SELECT moviename,avg(rating) as avg_rating, count(1) as cnt 
FROM movies INNER JOIN ratings ON ratings.movieid = movies.movieid 
GROUP BY moviename 
ORDER by cnt DESC LIMIT 10
""") 
best_movies.show(5)

## Movie Recommendation

In [None]:
%pyspark
rawData = sc.textFile("/tmp/u.data") 
rawData.first() 

rawRatings = rawData.map(lambda e: e.split()) 
rawRatings.take(3)

In [None]:
%pyspark
from pyspark.sql import Row
ratingsRDD = rawRatings.map(\
    lambda p: Row(userId=int(p[0]), \
    movieId=int(p[1]), \
    rating=float(p[2]), \
    timestamp=int(p[3])))
    
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [None]:
%pyspark
from pyspark.ml.recommendation import ALS
als = ALS(rank=50, maxIter=10, regParam=0.01, \
    userCol="userId", itemCol="movieId", \
    ratingCol="rating")
model = als.fit(training)

In [None]:
%pyspark
model.userFactors.count()

In [None]:
%pyspark
model.itemFactors.count()

In [None]:
%pyspark
userRecs = model.recommendForAllUsers(10)
userRecs.show(3)

In [None]:
%pyspark
movieRecs = model.recommendForAllItems(10)
movieRecs.show(3)